import signal import json import time import socket import logging import os from functools import partial from http.server import HTTPServer from utils.PostHandler import Handler from utils.ColoredFormatter import ColoredFormatter from bs4 import BeautifulSoup as bs from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver import ActionChains from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as ec logger = logging.getLogger(__name__) def setupLogger(): logging_format = "[%(asctime)s] (%(levelname)s) %(module)s - %(funcName)s: %(message)s" logging.basicConfig(level=logging.INFO, format=logging_format) (logger := logging.getLogger(__name__)).setLevel(logging.INFO) logger.propagate = False (logger_handler := logging.StreamHandler()).setFormatter( ColoredFormatter(fmt=logging_format) ) logger.addHandler(logger_handler) def interrupt_handler(signum, driver: webdriver.Remote): logger.info(f'Handling signal {signum} ({signal.Signals(signum).name}).') driver.quit() raise SystemExit def setupChromeDriver(): logger.log(logging.INFO, 'Setting up Chrome driver.') chrome_options = Options() #chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--mute-audio") chrome_options.add_argument("--window-size=1280,720") #chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--no-default-browser-check") chrome_options.add_argument("--disable-features=WebRtcHideLocalIpsWithMdns") #chrome_options.add_argument(f"--load-extension={os.path.abspath(os.path.join(os.path.dirname(__file__), 'webrtc-internals-exporter'))}") chrome_options.add_argument("--load-extension=/tmp/webrtc-internals-exporter") chrome_options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'}) #driver = webdriver.Chrome(options=chrome_options) driver = webdriver.Remote(command_executor='http://host.docker.internal:4444', options=chrome_options) logger.log(logging.INFO, 'Chrome driver setup complete.') return driver def saveStats(stats: list): try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) logger.log(logging.DEBUG, f'Saving stats: {json.dumps(stats, indent=4)}') sock.sendto(json.dumps(stats).encode(), ('telegraf', 8094)) sock.close() logger.log(logging.DEBUG, 'Sent stats to socket.') except socket.error as e: logger.error(f'Got socket error: {e}') def downloadStats(driver: webdriver.Chrome, peersDict: dict): html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML') if html is not None: htmlBS = bs(html, 'html.parser') else: raise ValueError("html is None") stats = htmlBS.find_all('div', attrs={'style': 'display: block;'}) playerStats = { stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip() for stat in stats } keys = list(playerStats.keys()) for stat in keys: if 'Viewport / Frames' == stat: viewport, frames = playerStats[stat].split(' / ') width, height = viewport.split('x') height, devicePixelRatio = height.split('*') dropped, total = frames.split(' of ')[0].split()[0], frames.split(' of ')[1].split()[0] playerStats[stat] = {'Width': int(width), 'Height': int(height), 'Pixel ratio': float(devicePixelRatio), 'Frames': {'Dropped': int(dropped), 'Total': int(total)}} if 'Codecs' == stat: video, audio = playerStats[stat].split(' / ') playerStats[stat] = {'Video': video, 'Audio': audio} if 'Volume' == stat: if ' (' in playerStats[stat]: volume, muted = playerStats[stat].split(' (') playerStats[stat] = {'Volume': int(volume), 'Muted': 'muted' in muted} else: playerStats[stat] = {'Volume': int(playerStats[stat]), 'Muted': False} if 'Connection Speed' == stat: speed, unit = playerStats[stat].split() speedBytes = float(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit]) playerStats[stat] = {'Speed': speedBytes, 'Granularity': 's'} if 'Network Activity' == stat: downString, upString = playerStats[stat].split(' / ') down, downUnit = downString.replace('down', '').strip().split() up, upUnit = upString.replace('up', '').strip().split() downBytes = convert_to_bytes(down, downUnit) upBytes = convert_to_bytes(up, upUnit) playerStats[stat] = {'Down': downBytes, 'Up': upBytes} if 'Total Transfered' == stat: downString, upString = playerStats[stat].split(' / ') down, downUnit = downString.replace('down', '').strip().split() up, upUnit = upString.replace('up', '').strip().split() downBytes = convert_to_bytes(down, downUnit) upBytes = convert_to_bytes(up, upUnit) playerStats[stat] = {'Down': downBytes, 'Up': upBytes} if 'Download Breakdown' == stat: server, peer = playerStats[stat].split(' - ') server, serverUnit = server.replace('from servers', '').strip().split() peer, peerUnit = peer.replace('from peers', '').strip().split() serverBytes = convert_to_bytes(server, serverUnit) peerBytes = convert_to_bytes(peer, peerUnit) playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes} if 'Buffer State' == stat: del(playerStats[stat]) if 'Live Latency' == stat: latency, edge = playerStats[stat].split(' (from edge: ') latency = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in latency.replace('s', '').split('m') if part]))) edge = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in edge.replace('s', '').replace(')', '').split('m') if part]))) playerStats[stat] = {'Latency': latency, 'Edge': edge} stats = { 'player': playerStats, 'peers': peersDict, 'url': driver.current_url, 'timestamp': int(time.time() * 1000), 'session': driver.session_id } saveStats([stats]) def convert_to_bytes(down, downUnit): return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit]) def setupStats(driver: webdriver.Remote, url: str): logger.log(logging.INFO, 'Setting up stats.') actions = ActionChains(driver) wait = WebDriverWait(driver, 30, poll_frequency=0.2) driver.get(url) wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button'))) actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar'))) actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item') actions.click(statsForNerds[-1]).perform() wait.until(ec.text_to_be_present_in_element((By.CLASS_NAME, 'vjs-stats-list'), 'Player')) actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-control-bar')).perform() logger.log(logging.INFO, 'Stats setup complete.') return driver if __name__ == '__main__': setupLogger() driver = setupChromeDriver() signal.signal(signal.SIGINT, lambda signum, frame: interrupt_handler(signum, driver)) url = os.getenv('VIDEO_URL') if url is None: logger.error('VIDEO_URL environment variable is not set.') raise SystemExit(1) setupStats(driver, url) logger.log(logging.INFO, 'Starting server collector.') httpd = HTTPServer(('', 9092), partial(Handler, downloadStats, driver, logger)) logger.info('Server collector started.') httpd.serve_forever()