Spaces:
Running
Running
File size: 3,286 Bytes
70422d8 5684fd5 70422d8 5684fd5 70422d8 79bf545 70422d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
import random
import requests
import time
import threading
def fetch_proxies():
"""Fetch a list of proxy servers from proxyscrape.com.
Returns:
list: A list of proxy servers in the format "IP:Port".
"""
url = "https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all"
response = requests.get(url)
if response.status_code == 200:
return response.text.split("\r\n")[:-1]
print(f"Error fetching proxies: {response.status_code}")
return []
def test_proxy(proxy, prompt, timeout):
"""Test the given proxy server with a specified prompt and timeout.
Args:
proxy (str): The proxy server in the format "IP:Port".
prompt (str): The test prompt to be used for testing.
timeout (int): The maximum time in seconds allowed for the test.
"""
try:
start_time = time.time()
# res = gpt3.Completion.create(prompt=prompt, proxy=proxy)
end_time = time.time()
response_time = end_time - start_time
if response_time < timeout:
response_time = int(response_time*1000)
print(f'proxy: {proxy} [{response_time}ms] ✅')
add_working_proxy((proxy))
except Exception as e:
pass
def add_working_proxy(proxy):
"""Add a working proxy server to the global working_proxies list.
Args:
proxy (str): The proxy server in the format "IP:Port".
"""
global working_proxies
working_proxies.append(proxy)
def remove_proxy(proxy):
"""Remove a proxy server from the global working_proxies list.
Args:
proxy (str): The proxy server in the format "IP:Port".
"""
global working_proxies
if proxy in working_proxies:
working_proxies.remove(proxy)
def get_working_proxies(prompt, timeout=5):
"""Fetch and test proxy servers, adding working proxies to the global working_proxies list.
Args:
prompt (str): The test prompt to be used for testing.
timeout (int, optional): The maximum time in seconds allowed for testing. Defaults to 5.
"""
proxy_list = fetch_proxies()
threads = []
for proxy in proxy_list:
thread = threading.Thread(target=test_proxy, args=(
proxy, prompt, timeout))
threads.append(thread)
thread.start()
for t in threads:
t.join(timeout)
def update_working_proxies():
"""Continuously update the global working_proxies list with working proxy servers."""
global working_proxies
test_prompt = "What is the capital of France?"
while True:
working_proxies = [] # Clear the list before updating
get_working_proxies(test_prompt)
print('proxies updated')
time.sleep(1800) # Update proxies list every 30 minutes
def get_random_proxy():
"""Get a random working proxy server from the global working_proxies list.
Returns:
str: A random working proxy server in the format "IP:Port".
"""
global working_proxies
return random.choice(working_proxies)
|