Python如何接品易HTTP demo小课堂

使用教程发布日期:2021-01-26

Python客户,使用代理ip,参考如下demo案例


Python—API教程(下载)


Python—账密教程(下载)


Python—API


# coding=utf-8

# !/usr/bin/env python

import json

import threading

import time

import requests as rq


headers = {

    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",

    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",

    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",

    "Accept-Encoding": "gzip, deflate, br"

}

testUrl = 'https://api.myip.la/en?json'



# 核心业务

def testPost(host, port):

    proxies = {

        'http': 'http://{}:{}'.format(host, port),

        'https': 'http://{}:{}'.format(host, port),

    }

    res = ""


    while True:

        try:

            res = rq.get(testUrl, proxies=proxies, timeout=5)

            # print(res.status_code)

            print(res.status_code, "***", res.text)

            break

        except Exception as e:

            print(e)

            break


    return



class ThreadFactory(threading.Thread):

    def __init__(self, host, port):

        threading.Thread.__init__(self)

        self.host = host

        self.port = port


    def run(self):

        testPost(self.host, self.port)



# 提取代理的链接  json类型的返回值

tiqu = '提取链接'


while 1 == 1:

    # 每次提取10个,放入线程中

    resp = rq.get(url=tiqu, timeout=5)

    try:

        if resp.status_code == 200:

            dataBean = json.loads(resp.text)

        else:

            print("获取失败")

            time.sleep(1)

            continue

    except ValueError:

        print("获取失败")

        time.sleep(1)

        continue

    else:

        # 解析json数组

        print("code=", dataBean["code"])

        code = dataBean["code"]

        if code == 0:

            threads = []

            for proxy in dataBean["data"]:

                threads.append(ThreadFactory(proxy["ip"], proxy["port"]))

            for t in threads:  # 开启线程

                t.start()

                time.sleep(0.01)

            for t in threads:  # 阻塞线程

                t.join()

    # break

    time.sleep(1)


Python—账密


import _thread

import time


import requests


headers = {

    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",

    "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_3 like Mac OS X) AppleWebKit/603.3.8 (KHTML, like Gecko) Mobile/14G60 MicroMessenger/6.5.19 NetType/4G Language/zh_TW",

}


mainUrl = 'https://api.myip.la/en?json'



def testUrl():

    # 账密

    entry = 'http://{}:{}@ip:port'.format("账户", "密码")


    proxy = {

        'http': entry,

        'https': entry,

    }

    try:

       res = requests.get(mainUrl, headers=headers, proxies=proxy, timeout=10)

        print(res.status_code, res.text)

    except Exception as e:

        print("访问失败", e)

        pass



for port in range(0, 10):

    _thread.start_new_thread(testUrl, ())

    time.sleep(0.1)


time.sleep(10)


挂件 关闭
客服
二维码
客服二维码

加微信 领流量

大客户经理二维码

售前咨询,企业定制

专属客服竭诚为您服务