import os import re import stem import socket import requests from contextlib import closing import undetected_chromedriver as uc import threading def find_free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] def create_tor_proxy(socks_port,control_port): TOR_PATH = os.environ['TOR_PATH'] try: tor_process = stem.process.launch_tor_with_config( config = { 'SocksPort': str(socks_port), 'ControlPort' : str(control_port), 'MaxCircuitDirtiness' : '300', 'DataDirectory' : "tor-data-dir-" + (str(threading.get_native_id())) }, init_msg_handler = lambda line: print(line) if re.search('Bootstrapped', line) else False, tor_cmd = TOR_PATH ) print("[INFO] Tor connection created.") except Exception as e : tor_process = None print("[ERROR] Starting new TOR", e) print("[INFO] Using existing tor connection.") return tor_process def start_browser(use_tor=False): options = uc.ChromeOptions() options.add_argument('--no-first-run') options.add_argument('--password-store=basic') tor_process = None if use_tor: SOCKS_PORT = find_free_port() CONTROL_PORT = find_free_port() tor_process = create_tor_proxy(SOCKS_PORT, CONTROL_PORT) proxies = {'http' : f'socks5://localhost:{SOCKS_PORT}','https' : f'socks5://localhost:{SOCKS_PORT}'} options.add_argument(f'--proxy-server=socks5://localhost:{SOCKS_PORT}') else: proxies = [] ip = requests.get("http://httpbin.org/ip", proxies=proxies).json()["origin"] print (f'IP is {ip}') driver = uc.Chrome(options=options) return driver, tor_process