96 lines
2.8 KiB
Python
96 lines
2.8 KiB
Python
import os
|
|
import re
|
|
import shutil
|
|
import socket
|
|
import requests
|
|
from contextlib import closing
|
|
import undetected_chromedriver as uc
|
|
|
|
import threading
|
|
|
|
def find_free_port():
|
|
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
|
|
s.bind(('', 0))
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
return s.getsockname()[1]
|
|
|
|
def get_tor_data_dir():
|
|
return './tor-data-dir-'+(str(threading.get_native_id()))
|
|
|
|
def get_chrome_data_dir():
|
|
return './chrome-data-dir-'+(str(threading.get_native_id()))
|
|
|
|
def create_tor_proxy(socks_port, control_port):
|
|
import stem.process as process
|
|
global data_directory
|
|
|
|
|
|
TOR_PATH = os.environ['TOR_PATH']
|
|
|
|
try:
|
|
tor_process = process.launch_tor_with_config(
|
|
config = {
|
|
'SocksPort': str(socks_port),
|
|
'ControlPort' : str(control_port),
|
|
'MaxCircuitDirtiness' : '300',
|
|
'DataDirectory' : get_tor_data_dir()
|
|
},
|
|
init_msg_handler = lambda line: print(line) if re.search('Bootstrapped', line) else False,
|
|
tor_cmd = TOR_PATH
|
|
)
|
|
print("[INFO] Tor connection created.")
|
|
except Exception as e :
|
|
tor_process = None
|
|
print("[ERROR] Starting new TOR", e)
|
|
print("[INFO] Using existing tor connection.")
|
|
|
|
return tor_process
|
|
|
|
def start_browser(use_tor=False, headless=False):
|
|
os.mkdir(get_chrome_data_dir())
|
|
|
|
shutil.copy('/usr/bin/chromedriver', './'+get_chrome_data_dir())
|
|
|
|
options = uc.ChromeOptions()
|
|
|
|
if headless:
|
|
options.add_argument('--disable-gpu')
|
|
options.add_argument('--headless')
|
|
|
|
options.add_argument('--no-first-run')
|
|
options.add_argument('--password-store=basic')
|
|
options.add_argument('--start-maximized')
|
|
# options.add_argument('--user-data-dir='+get_chrome_data_dir())
|
|
|
|
tor_process = None
|
|
|
|
if use_tor:
|
|
SOCKS_PORT = find_free_port()
|
|
CONTROL_PORT = find_free_port()
|
|
|
|
tor_process = create_tor_proxy(SOCKS_PORT, CONTROL_PORT)
|
|
proxies = {'http' : f'socks5://localhost:{SOCKS_PORT}','https' : f'socks5://localhost:{SOCKS_PORT}'}
|
|
|
|
options.add_argument(f'--proxy-server=socks5://localhost:{SOCKS_PORT}')
|
|
else:
|
|
proxies = []
|
|
|
|
driver = uc.Chrome(options=options, chromedriver_executable_path=get_chrome_data_dir()+'/chromedriver')
|
|
|
|
ip = requests.get("http://httpbin.org/ip", proxies=proxies).json()["origin"]
|
|
print (f'IP is {ip}')
|
|
|
|
|
|
return driver, tor_process
|
|
|
|
def close_browser(driver, tor_process):
|
|
if tor_process:
|
|
tor_process.kill()
|
|
shutil.rmtree(get_tor_data_dir())
|
|
|
|
try:
|
|
driver.quit()
|
|
except Exception as e:
|
|
print("[INFO] Undetected chromedriver threw exception while closing, exiting. Exception:", e)
|
|
|
|
shutil.rmtree(get_chrome_data_dir()) |