import signal import time import logging import os import argparse from time import sleep from functools import partial from http.server import HTTPServer from utils.PostHandler import Handler from utils.ColoredFormatter import ColoredFormatter from utils.Convenience import * from bs4 import BeautifulSoup as bs from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver import ActionChains from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as ec # Plugin system imports import importlib import importlib.util import inspect import glob import sys # Import the sys module from utils.plugins_base import StatsSetupPlugin, StatsDownloadPlugin logger = logging.getLogger(__name__) args = None def setupLogger(): logging_format = "[%(asctime)s] (%(levelname)s) %(module)s - %(funcName)s: %(message)s" logging.basicConfig(level=firstValid(args.log_level, os.getenv('LOG_LEVEL'), default='INFO'), format=logging_format) # type: ignore (logger := logging.getLogger(__name__)).setLevel(logging.INFO) logger.propagate = False (logger_handler := logging.StreamHandler()).setFormatter( ColoredFormatter(fmt=logging_format) ) logger.addHandler(logger_handler) def setupArgParser(): parser = argparse.ArgumentParser(description='Collector for PeerTube stats.') parser.add_argument('-u', '--url', type=str, help='URL of the video to collect stats for.') parser.add_argument('--socket-url', type=str, help='URL of the socket to send the stats to. Default: localhost') parser.add_argument('--socket-port', type=int, help='Port of the socket to send the stats to. Default: 8094') parser.add_argument('--hub-url', type=str, help='URL of the Selenium hub to connect to. If not provided, local Chrome driver will be used.') parser.add_argument('--webrtc-internals-path', type=str, help='Path to the WebRTC internals extension.') parser.add_argument('--log-level', type=str, help='Log level to use. Default: INFO') parser.add_argument('--plugin-dir', type=str, help='Path to the plugin directory.') return parser def interrupt_handler(signum, driver: webdriver.Remote): logger.info(f'Handling signal {signum} ({signal.Signals(signum).name}).') driver.quit() raise SystemExit def setupChromeDriver(command_executor: str | None, webrtc_internals_path: str) -> webdriver.Remote | webdriver.Chrome: logger.log(logging.INFO, 'Setting up Chrome driver.') chrome_options = Options() #chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--mute-audio") chrome_options.add_argument("--window-size=1280,720") chrome_options.add_argument("--no-default-browser-check") chrome_options.add_argument("--disable-features=WebRtcHideLocalIpsWithMdns") chrome_options.add_argument(f"--load-extension={webrtc_internals_path}") chrome_options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'}) if command_executor is not None: driver = webdriver.Remote(command_executor=command_executor, options=chrome_options) logger.warning(f'Using Selenium hub at {command_executor}.') else: driver = webdriver.Chrome(options=chrome_options) logger.warning('No Selenium hub URL provided, using local Chrome driver.') logger.log(logging.INFO, 'Chrome driver setup complete.') return driver def convert_to_bytes(down, downUnit): return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit]) # Default Plugin Implementations class DefaultStatsSetupPlugin(StatsSetupPlugin): def setup_stats(self, driver: webdriver.Remote, url: str, retries: int = 5) -> webdriver.Remote: logger.log(logging.INFO, 'Setting up stats.') actions = ActionChains(driver) wait = WebDriverWait(driver, 30, poll_frequency=0.2) sleep(2) for attempt in range(retries): driver.get(url) try: wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button'))) break except Exception: logger.error(f'Timeout while waiting for the big play button to be present. Attempt {attempt + 1} of {retries}') if attempt == retries - 1: logger.error('Timeout limit reached. Exiting.') driver.quit() raise SystemExit(1) actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar'))) actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item') actions.click(statsForNerds[-1]).perform() wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, 'div.vjs-stats-content[style="display: block;"]'))) actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-control-bar')).perform() logger.log(logging.INFO, 'Stats setup complete.') return driver class DefaultStatsDownloadPlugin(StatsDownloadPlugin): def download_stats(self, driver: webdriver.Remote, peersDict: dict, socket_url: str, socket_port: int): html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML') if html is not None: htmlBS = bs(html, 'html.parser') else: raise ValueError("html is None") stats = htmlBS.find_all('div', attrs={'style': 'display: block;'}) playerStats = { stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip() # type: ignore for stat in stats } keys = list(playerStats.keys()) for stat in keys: if 'Viewport / Frames' == stat: viewport, frames = playerStats[stat].split(' / ') width, height = viewport.split('x') height, devicePixelRatio = height.split('*') dropped, total = frames.split(' of ')[0].split()[0], frames.split(' of ')[1].split()[0] playerStats[stat] = {'Width': int(width), 'Height': int(height), 'Pixel ratio': float(devicePixelRatio), 'Frames': {'Dropped': int(dropped), 'Total': int(total)}} if 'Codecs' == stat: video, audio = playerStats[stat].split(' / ') playerStats[stat] = {'Video': video, 'Audio': audio} if 'Volume' == stat: if ' (' in playerStats[stat]: volume, muted = playerStats[stat].split(' (') playerStats[stat] = {'Volume': int(volume), 'Muted': 'muted' in muted} else: playerStats[stat] = {'Volume': int(playerStats[stat]), 'Muted': False} if 'Connection Speed' == stat: speed, unit = playerStats[stat].split() speedBytes = float(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit]) playerStats[stat] = {'Speed': speedBytes, 'Granularity': 's'} if 'Network Activity' == stat: downString, upString = playerStats[stat].split(' / ') down, downUnit = downString.replace('down', '').strip().split() up, upUnit = upString.replace('up', '').strip().split() downBytes = convert_to_bytes(down, downUnit) upBytes = convert_to_bytes(up, upUnit) playerStats[stat] = {'Down': downBytes, 'Up': upBytes} if 'Total Transfered' == stat: downString, upString = playerStats[stat].split(' / ') down, downUnit = downString.replace('down', '').strip().split() up, upUnit = upString.replace('up', '').strip().split() downBytes = convert_to_bytes(down, downUnit) upBytes = convert_to_bytes(up, upUnit) playerStats[stat] = {'Down': downBytes, 'Up': upBytes} if 'Download Breakdown' == stat: server, peer = playerStats[stat].split(' - ') server, serverUnit = server.replace('from servers', '').strip().split() peer, peerUnit = peer.replace('from peers', '').strip().split() serverBytes = convert_to_bytes(server, serverUnit) peerBytes = convert_to_bytes(peer, peerUnit) playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes} if 'Buffer State' == stat: del(playerStats[stat]) if 'Live Latency' == stat: latency, edge = playerStats[stat].split(' (from edge: ') latency = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in latency.replace('s', '').split('m') if part]))) edge = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in edge.replace('s', '').replace(')', '').split('m') if part]))) playerStats[stat] = {'Latency': latency, 'Edge': edge} stats = { 'player': playerStats, 'peers': peersDict, 'url': driver.current_url, 'timestamp': int(time.time() * 1000), 'session': driver.session_id } super().saveStats([stats], socket_url, socket_port) # Plugin loading mechanism def load_plugins(plugin_dir: str) -> tuple[StatsSetupPlugin | None, StatsDownloadPlugin | None]: """ Loads plugins from the specified directory. Args: plugin_dir: The directory to search for plugins. Returns: A tuple containing the loaded StatsSetupPlugin and StatsDownloadPlugin, or (None, None) if no plugins were found. """ logger.info(f"Loading plugins from {plugin_dir}") setup_plugin = None download_plugin = None plugin_files = glob.glob(os.path.join(plugin_dir, "*.py")) # Log the contents of the plugin directory logger.debug(f"Plugin directory contents: {os.listdir(plugin_dir)}") for plugin_file in plugin_files: module_name = os.path.basename(plugin_file)[:-3] # Remove .py extension logger.debug(f"Loading plugin file {plugin_file}") try: spec = importlib.util.spec_from_file_location(module_name, plugin_file) logger.debug(f"Spec: {spec}") if spec is None: logger.warning(f"Can't load plugin file {plugin_file}") continue module = importlib.util.module_from_spec(spec) logger.debug(f"Module: {module}") if spec.loader is not None: spec.loader.exec_module(module) else: logger.warning(f"Can't load module {module_name} from {plugin_file}") for name, obj in inspect.getmembers(module): logger.debug(f"Found member: {name} in module {module_name}") if inspect.isclass(obj): if issubclass(obj, StatsSetupPlugin) and obj is not StatsSetupPlugin: logger.info(f"Found StatsSetupPlugin: {obj.__name__}") setup_plugin = obj() logger.debug(f"Loaded StatsSetupPlugin: {obj.__name__} from {plugin_file}") elif issubclass(obj, StatsDownloadPlugin) and obj is not StatsDownloadPlugin: logger.info(f"Found StatsDownloadPlugin: {obj.__name__}") download_plugin = obj() logger.debug(f"Loaded StatsDownloadPlugin: {obj.__name__} from {plugin_file}") else: logger.debug(f"Class {obj.__name__} is not a subclass of StatsSetupPlugin or StatsDownloadPlugin") else: logger.debug(f"{name} is not a class") except Exception as e: logger.warning(f"Error loading plugin {plugin_file}: {e}") return setup_plugin, download_plugin if __name__ == '__main__': args = setupArgParser().parse_args() setupLogger() # Load plugins plugin_dir = firstValid(args.plugin_dir, os.getenv('PLUGIN_DIR'), default=None) if plugin_dir is None: logger.info("No plugin directory provided. Using default plugins.") setup_plugin = None download_plugin = None else: setup_plugin, download_plugin = load_plugins(plugin_dir) # Use default plugins if none are loaded if setup_plugin is None: setup_plugin = DefaultStatsSetupPlugin() logger.info("Using default StatsSetupPlugin.") if download_plugin is None: download_plugin = DefaultStatsDownloadPlugin() logger.info("Using default StatsDownloadPlugin.") command_executor = firstValid(args.hub_url, os.getenv('HUB_URL'), default=None) webrtc_internals_path = firstValid( args.webrtc_internals_path, os.getenv('WEBRTC_INTERNALS_PATH'), default=os.path.abspath(os.path.join(os.path.dirname(__file__), 'webrtc-internals-exporter')) ) driver = setupChromeDriver(command_executor, webrtc_internals_path) signal.signal(signal.SIGINT, lambda signum, frame: interrupt_handler(signum, driver)) url = firstValid(args.url, os.getenv('VIDEO_URL'), default=None) if url is None: logger.error('VIDEO_URL environment variable or --url argument is required.') raise SystemExit(1) # Use the loaded plugin driver = setup_plugin.setup_stats(driver, url) socket_url = firstValid(args.socket_url, os.getenv('SOCKET_URL'), default='localhost') try: socket_port = int(firstValid(args.socket_port, os.getenv('SOCKET_PORT'), default=8094)) except ValueError: logger.error('Invalid socket port provided. Exiting.') raise SystemExit(1) logger.info('Starting server collector.') httpd = HTTPServer(('', 9092), partial(Handler, download_plugin.download_stats, driver, logger, socket_url, socket_port)) httpd.serve_forever()