import os import re import stem import shutil import socket import requests from contextlib import closing import undetected_chromedriver as uc import threading data_directory = "tor-data-dir" def find_free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] def create_tor_proxy(socks_port, control_port): global data_directory TOR_PATH = os.environ['TOR_PATH'] data_directory += (str(threading.get_native_id())) try: tor_process = stem.process.launch_tor_with_config( config = { 'SocksPort': str(socks_port), 'ControlPort' : str(control_port), 'MaxCircuitDirtiness' : '300', 'DataDirectory' : data_directory }, init_msg_handler = lambda line: print(line) if re.search('Bootstrapped', line) else False, tor_cmd = TOR_PATH ) print("[INFO] Tor connection created.") except Exception as e : tor_process = None print("[ERROR] Starting new TOR", e) print("[INFO] Using existing tor connection.") return tor_process def start_browser(use_tor=False, headless=False): options = uc.ChromeOptions() if headless: options.add_argument('--disable-gpu') options.add_argument('--no-first-run') options.add_argument('--password-store=basic') options.add_argument('--start-maximized') tor_process = None if use_tor: SOCKS_PORT = find_free_port() CONTROL_PORT = find_free_port() tor_process = create_tor_proxy(SOCKS_PORT, CONTROL_PORT) proxies = {'http' : f'socks5://localhost:{SOCKS_PORT}','https' : f'socks5://localhost:{SOCKS_PORT}'} options.add_argument(f'--proxy-server=socks5://localhost:{SOCKS_PORT}') else: proxies = [] ip = requests.get("http://httpbin.org/ip", proxies=proxies).json()["origin"] print (f'IP is {ip}') driver = uc.Chrome(options=options) return driver, tor_process def close_browser(driver, tor_process): if tor_process: tor_process.kill() shutil.rmtree(data_directory) try: driver.close() except: print("[INFO] Undetected chromedriver threw the usual exception while closing, exiting")