技术支持发布日期:2021-07-14
代码参考:
import _thread
import time
import requests
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_3 like Mac OS X) AppleWebKit/603.3.8 (KHTML, like Gecko) Mobile/14G60 MicroMessenger/6.5.19 NetType/4G Language/zh_TW",
}
mainUrl = 'https://api.myip.la/en?json'
def testUrl():
username = '账户'
password = '密码'
entry = 'http://{}:{}@host:port'.format(username, password)
proxy = {
'http': entry,
'https': entry,
}
try:
res = requests.get(mainUrl, headers=headers, proxies=proxy, timeout=10)
print(res.status_code, res.text)
except Exception as e:
print("访问失败", e)
pass
for port in range(0, 10):
_thread.start_new_thread(testUrl, ())
time.sleep(0.1)
time.sleep(10)
# coding=utf-8
# !/usr/bin/env python
import json
import threading
import time
import requests as rq
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate, br"
}
targetUrl = "https://api.myip.la/en?json"
# 核心业务
def testPost(host, port):
proxies = {
'http': 'http://{}:{}'.format(host, port),
'https': 'http://{}:{}'.format(host, port),
}
res = ""
while True:
try:
res = rq.get(targetUrl, proxies=proxies, timeout=5)
# print(res.status_code)
print(res.status_code, "***", res.text)
break
except Exception as e:
print(e)
break
return
class ThreadFactory(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
self.host = host
self.port = port
def run(self):
testPost(self.host, self.port)
# 提取代理的链接 json类型的返回值
tiqu = 'http://tiqu.py.cn/getProxyIp?num=10®ions=kr&return_type=json'
while 1 == 1:
# 每次提取10个,放入线程中
resp = rq.get(url=tiqu, timeout=5)
try:
if resp.status_code == 200:
dataBean = json.loads(resp.text)
else:
print("获取失败")
time.sleep(1)
continue
except ValueError:
print("获取失败")
time.sleep(1)
continue
else:
# 解析json数组
print("code=", dataBean["Code"])
code = dataBean["Code"]
if code == 0:
threads = []
for proxy in dataBean["Data"]:
threads.append(ThreadFactory(proxy["ip"], proxy["port"]))
for t in threads: # 开启线程
t.start()
time.sleep(0.01)
for t in threads: # 阻塞线程
t.join()
# break
time.sleep(1)
import string
import time
import zipfile
from selenium import webdriver
def create_proxyauth_extension(proxy_host, proxy_port, proxy_username, proxy_password,
scheme='http', plugin_path=None):
if plugin_path is None:
# 插件地址
plugin_path = 'vimm_chrome_proxyauth_plugin.zip'
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"]
},
"minimum_chrome_version":"22.0.0"
}
"""
background_js = string.Template(
"""
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "${scheme}",
host: "${host}",
port: parseInt(${port})
},
bypassList: ["foobar.com"]
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
return {
authCredentials: {
username: "${username}",
password: "${password}"
}
};
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
);
"""
).substitute(
host=proxy_host,
port=proxy_port,
username=proxy_username,
password=proxy_password,
scheme=scheme,
)
with zipfile.ZipFile(plugin_path, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
return plugin_path
#
proxyauth_plugin_path = create_proxyauth_extension(
proxy_host="host",
proxy_port="port",
proxy_username="账户",
proxy_password="密码"
)
option = webdriver.ChromeOptions()
option.add_argument("--start-maximized")
option.add_extension(proxyauth_plugin_path)
# 将配置放入浏览器驱动
browser = webdriver.Chrome(options=option)
time.sleep(1)
# 设置访问的目标网站
browser.get('https://api.myip.la/en?json')
专属客服竭诚为您服务