Python如何接品易海外HTTP demo小课堂

技术支持发布日期:2021-07-14

代码参考:

import _thread

import time


import requests


headers = {

    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",

    "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_3 like Mac OS X) AppleWebKit/603.3.8 (KHTML, like Gecko) Mobile/14G60 MicroMessenger/6.5.19 NetType/4G Language/zh_TW",

}


mainUrl = 'https://api.myip.la/en?json'



def testUrl():

    username = '账户'

    password = '密码'


    entry = 'http://{}:{}@host:port'.format(username, password)


    proxy = {

        'http': entry,

        'https': entry,

    }

    try:

        res = requests.get(mainUrl, headers=headers, proxies=proxy, timeout=10)

        print(res.status_code, res.text)

    except Exception as e:

        print("访问失败", e)

        pass



for port in range(0, 10):

    _thread.start_new_thread(testUrl, ())

    time.sleep(0.1)


time.sleep(10)


# coding=utf-8

# !/usr/bin/env python

import json

import threading

import time

import requests as rq


headers = {

    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",

    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",

    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",

    "Accept-Encoding": "gzip, deflate, br"

}

targetUrl = "https://api.myip.la/en?json"



# 核心业务

def testPost(host, port):

    proxies = {

        'http': 'http://{}:{}'.format(host, port),

        'https': 'http://{}:{}'.format(host, port),

    }

    res = ""


    while True:

        try:

            res = rq.get(targetUrl, proxies=proxies, timeout=5)

            # print(res.status_code)

            print(res.status_code, "***", res.text)

            break

        except Exception as e:

            print(e)

            break


    return



class ThreadFactory(threading.Thread):

    def __init__(self, host, port):

        threading.Thread.__init__(self)

        self.host = host

        self.port = port


    def run(self):

        testPost(self.host, self.port)



# 提取代理的链接  json类型的返回值

tiqu = 'http://tiqu.py.cn/getProxyIp?num=10&regions=kr&return_type=json'


while 1 == 1:

    # 每次提取10个,放入线程中

    resp = rq.get(url=tiqu, timeout=5)

    try:

        if resp.status_code == 200:

            dataBean = json.loads(resp.text)

        else:

            print("获取失败")

            time.sleep(1)

            continue

    except ValueError:

        print("获取失败")

        time.sleep(1)

        continue

    else:

        # 解析json数组

        print("code=", dataBean["Code"])

        code = dataBean["Code"]

        if code == 0:

            threads = []

            for proxy in dataBean["Data"]:

                threads.append(ThreadFactory(proxy["ip"], proxy["port"]))

            for t in threads:  # 开启线程

                t.start()

                time.sleep(0.01)

            for t in threads:  # 阻塞线程

                t.join()

    # break

    time.sleep(1)

import string

import time

import zipfile


from selenium import webdriver



def create_proxyauth_extension(proxy_host, proxy_port, proxy_username, proxy_password,

                               scheme='http', plugin_path=None):

    if plugin_path is None:

        # 插件地址

        plugin_path = 'vimm_chrome_proxyauth_plugin.zip'


    manifest_json = """

        {

            "version": "1.0.0",

            "manifest_version": 2,

            "name": "Chrome Proxy",

            "permissions": [

                "proxy",

                "tabs",

                "unlimitedStorage",

                "storage",

                "<all_urls>",

                "webRequest",

                "webRequestBlocking"

            ],

            "background": {

                "scripts": ["background.js"]

            },

            "minimum_chrome_version":"22.0.0"

        }

        """


    background_js = string.Template(

        """

        var config = {

                mode: "fixed_servers",

                rules: {

                  singleProxy: {

                    scheme: "${scheme}",

                    host: "${host}",

                    port: parseInt(${port})

                  },

                  bypassList: ["foobar.com"]

                }

              };


        chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});


        function callbackFn(details) {

            return {

                authCredentials: {

                    username: "${username}",

                    password: "${password}"

                }

            };

        }


        chrome.webRequest.onAuthRequired.addListener(

                    callbackFn,

                    {urls: ["<all_urls>"]},

                    ['blocking']

        );

        """

    ).substitute(

        host=proxy_host,

        port=proxy_port,

        username=proxy_username,

        password=proxy_password,

        scheme=scheme,

    )

    with zipfile.ZipFile(plugin_path, 'w') as zp:

        zp.writestr("manifest.json", manifest_json)

        zp.writestr("background.js", background_js)

    return plugin_path



#

proxyauth_plugin_path = create_proxyauth_extension(

    proxy_host="host",

    proxy_port="port",

    proxy_username="账户",

    proxy_password="密码"

)


option = webdriver.ChromeOptions()

option.add_argument("--start-maximized")

option.add_extension(proxyauth_plugin_path)


# 将配置放入浏览器驱动

browser = webdriver.Chrome(options=option)


time.sleep(1)

# 设置访问的目标网站

browser.get('https://api.myip.la/en?json')



挂件 关闭
客服
二维码
客服二维码

加微信 领流量

大客户经理二维码

售前咨询,企业定制

专属客服竭诚为您服务