import signal import json import time import socket import logging import os import argparse from yaspin import yaspin from functools import partial from http.server import HTTPServer from utils.PostHandler import Handler from utils.ColoredFormatter import ColoredFormatter from utils.Convenience import * from bs4 import BeautifulSoup as bs from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver import ActionChains from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as ec logger = logging.getLogger(__name__) args = None def setupLogger(): logging_format = "[%(asctime)s] (%(levelname)s) %(module)s - %(funcName)s: %(message)s" logging.basicConfig(level=firstValid(args.log_level, os.getenv('LOG_LEVEL'), default='INFO'), format=logging_format) # type: ignore (logger := logging.getLogger(__name__)).setLevel(logging.INFO) logger.propagate = False (logger_handler := logging.StreamHandler()).setFormatter( ColoredFormatter(fmt=logging_format) ) logger.addHandler(logger_handler) def setupArgParser(): parser = argparse.ArgumentParser(description='Collector for PeerTube stats.') parser.add_argument('-u', '--url', type=str, help='URL of the video to collect stats for.') parser.add_argument('--socket-url', type=str, help='URL of the socket to send the stats to. Default: localhost') parser.add_argument('--socket-port', type=int, help='Port of the socket to send the stats to. Default: 8094') parser.add_argument('--hub-url', type=str, help='URL of the Selenium hub to connect to. If not provided, local Chrome driver will be used.') parser.add_argument('--webrtc-internals-path', type=str, help='Path to the WebRTC internals extension.') parser.add_argument('--log-level', type=str, help='Log level to use. Default: INFO') return parser def interrupt_handler(signum, driver: webdriver.Remote): logger.info(f'Handling signal {signum} ({signal.Signals(signum).name}).') driver.quit() raise SystemExit @yaspin() def setupChromeDriver(command_executor: str | None, webrtc_internals_path: str) -> webdriver.Remote | webdriver.Chrome: logger.log(logging.INFO, 'Setting up Chrome driver.') chrome_options = Options() #chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--mute-audio") chrome_options.add_argument("--window-size=1280,720") chrome_options.add_argument("--no-default-browser-check") chrome_options.add_argument("--disable-features=WebRtcHideLocalIpsWithMdns") chrome_options.add_argument(f"--load-extension={webrtc_internals_path}") chrome_options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'}) if command_executor is not None: driver = webdriver.Remote(command_executor=command_executor, options=chrome_options) logger.warning(f'Using Selenium hub at {command_executor}.') else: driver = webdriver.Chrome(options=chrome_options) logger.warning('No Selenium hub URL provided, using local Chrome driver.') logger.log(logging.INFO, 'Chrome driver setup complete.') return driver def saveStats(stats: list, socket_url: str, socket_port: int): try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) logger.log(logging.DEBUG, f'Saving stats: {json.dumps(stats, indent=4)}') sock.sendto(json.dumps(stats).encode(), (socket_url, socket_port)) sock.close() logger.log(logging.DEBUG, 'Sent stats to socket.') except socket.error as e: logger.error(f'Got socket error: {e}') def downloadStats(driver: webdriver.Remote | webdriver.Chrome, peersDict: dict, socket_url: str, socket_port: int): html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML') if html is not None: htmlBS = bs(html, 'html.parser') else: raise ValueError("html is None") stats = htmlBS.find_all('div', attrs={'style': 'display: block;'}) playerStats = { stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip() for stat in stats } keys = list(playerStats.keys()) for stat in keys: if 'Viewport / Frames' == stat: viewport, frames = playerStats[stat].split(' / ') width, height = viewport.split('x') height, devicePixelRatio = height.split('*') dropped, total = frames.split(' of ')[0].split()[0], frames.split(' of ')[1].split()[0] playerStats[stat] = {'Width': int(width), 'Height': int(height), 'Pixel ratio': float(devicePixelRatio), 'Frames': {'Dropped': int(dropped), 'Total': int(total)}} if 'Codecs' == stat: video, audio = playerStats[stat].split(' / ') playerStats[stat] = {'Video': video, 'Audio': audio} if 'Volume' == stat: if ' (' in playerStats[stat]: volume, muted = playerStats[stat].split(' (') playerStats[stat] = {'Volume': int(volume), 'Muted': 'muted' in muted} else: playerStats[stat] = {'Volume': int(playerStats[stat]), 'Muted': False} if 'Connection Speed' == stat: speed, unit = playerStats[stat].split() speedBytes = float(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit]) playerStats[stat] = {'Speed': speedBytes, 'Granularity': 's'} if 'Network Activity' == stat: downString, upString = playerStats[stat].split(' / ') down, downUnit = downString.replace('down', '').strip().split() up, upUnit = upString.replace('up', '').strip().split() downBytes = convert_to_bytes(down, downUnit) upBytes = convert_to_bytes(up, upUnit) playerStats[stat] = {'Down': downBytes, 'Up': upBytes} if 'Total Transfered' == stat: downString, upString = playerStats[stat].split(' / ') down, downUnit = downString.replace('down', '').strip().split() up, upUnit = upString.replace('up', '').strip().split() downBytes = convert_to_bytes(down, downUnit) upBytes = convert_to_bytes(up, upUnit) playerStats[stat] = {'Down': downBytes, 'Up': upBytes} if 'Download Breakdown' == stat: server, peer = playerStats[stat].split(' - ') server, serverUnit = server.replace('from servers', '').strip().split() peer, peerUnit = peer.replace('from peers', '').strip().split() serverBytes = convert_to_bytes(server, serverUnit) peerBytes = convert_to_bytes(peer, peerUnit) playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes} if 'Buffer State' == stat: del(playerStats[stat]) if 'Live Latency' == stat: latency, edge = playerStats[stat].split(' (from edge: ') latency = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in latency.replace('s', '').split('m') if part]))) edge = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in edge.replace('s', '').replace(')', '').split('m') if part]))) playerStats[stat] = {'Latency': latency, 'Edge': edge} stats = { 'player': playerStats, 'peers': peersDict, 'url': driver.current_url, 'timestamp': int(time.time() * 1000), 'session': driver.session_id } saveStats([stats], socket_url, socket_port) def convert_to_bytes(down, downUnit): return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit]) @yaspin() def setupStats(driver: webdriver.Remote, url: str): logger.log(logging.INFO, 'Setting up stats.') actions = ActionChains(driver) wait = WebDriverWait(driver, 30, poll_frequency=0.2) driver.get(url) try: wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button'))) except Exception: logger.error('Timeout while waiting for the big play button to be present.') driver.quit() raise SystemExit(1) actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar'))) actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item') actions.click(statsForNerds[-1]).perform() wait.until(ec.text_to_be_present_in_element((By.CLASS_NAME, 'vjs-stats-list'), 'Player')) actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-control-bar')).perform() logger.log(logging.INFO, 'Stats setup complete.') return driver if __name__ == '__main__': args = setupArgParser().parse_args() setupLogger() command_executor = firstValid(args.hub_url, os.getenv('HUB_URL'), default=None) webrtc_internals_path = firstValid( args.webrtc_internals_path, os.getenv('WEBRTC_INTERNALS_PATH'), default=os.path.abspath(os.path.join(os.path.dirname(__file__), 'webrtc-internals-exporter')) ) driver = setupChromeDriver(command_executor, webrtc_internals_path) signal.signal(signal.SIGINT, lambda signum, frame: interrupt_handler(signum, driver)) url = firstValid(args.url, os.getenv('VIDEO_URL'), default=None) if url is None: logger.error('VIDEO_URL environment variable or --url argument is required.') raise SystemExit(1) setupStats(driver, url) socket_url = firstValid(args.socket_url, os.getenv('SOCKET_URL'), default='localhost') socket_port = firstValid(args.socket_port, os.getenv('SOCKET_PORT'), default=8094) logger.info('Starting server collector.') httpd = HTTPServer(('', 9092), partial(Handler, downloadStats, driver, logger, socket_url, socket_port)) httpd.serve_forever()