Files
peertube-collector/main.py
Mirko Milovanovic 0518d1ba48
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m5s
feat: integrate yaspin for improved logging and error handling in Chrome driver setup
2025-02-14 15:32:14 +01:00

207 lines
8.5 KiB
Python

import signal
import json
import time
import socket
import logging
import os
from yaspin import yaspin
from functools import partial
from http.server import HTTPServer
from utils.PostHandler import Handler
from utils.ColoredFormatter import ColoredFormatter
from bs4 import BeautifulSoup as bs
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
logger = logging.getLogger(__name__)
def setupLogger():
logging_format = "[%(asctime)s] (%(levelname)s) %(module)s - %(funcName)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=logging_format)
(logger := logging.getLogger(__name__)).setLevel(logging.INFO)
logger.propagate = False
(logger_handler := logging.StreamHandler()).setFormatter(
ColoredFormatter(fmt=logging_format)
)
logger.addHandler(logger_handler)
def interrupt_handler(signum, driver: webdriver.Remote):
logger.info(f'Handling signal {signum} ({signal.Signals(signum).name}).')
driver.quit()
raise SystemExit
@yaspin()
def setupChromeDriver():
logger.log(logging.INFO, 'Setting up Chrome driver.')
chrome_options = Options()
#chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--mute-audio")
chrome_options.add_argument("--window-size=1280,720")
#chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--no-default-browser-check")
chrome_options.add_argument("--disable-features=WebRtcHideLocalIpsWithMdns")
#chrome_options.add_argument(f"--load-extension={os.path.abspath(os.path.join(os.path.dirname(__file__), 'webrtc-internals-exporter'))}")
chrome_options.add_argument("--load-extension=/tmp/webrtc-internals-exporter")
chrome_options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'})
#driver = webdriver.Chrome(options=chrome_options)
driver = webdriver.Remote(command_executor='http://host.docker.internal:4444', options=chrome_options)
logger.log(logging.INFO, 'Chrome driver setup complete.')
return driver
def saveStats(stats: list):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
logger.log(logging.DEBUG, f'Saving stats: {json.dumps(stats, indent=4)}')
sock.sendto(json.dumps(stats).encode(), ('telegraf', 8094))
sock.close()
logger.log(logging.DEBUG, 'Sent stats to socket.')
except socket.error as e:
logger.error(f'Got socket error: {e}')
def downloadStats(driver: webdriver.Chrome, peersDict: dict):
html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML')
if html is not None:
htmlBS = bs(html, 'html.parser')
else:
raise ValueError("html is None")
stats = htmlBS.find_all('div', attrs={'style': 'display: block;'})
playerStats = {
stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip()
for stat in stats
}
keys = list(playerStats.keys())
for stat in keys:
if 'Viewport / Frames' == stat:
viewport, frames = playerStats[stat].split(' / ')
width, height = viewport.split('x')
height, devicePixelRatio = height.split('*')
dropped, total = frames.split(' of ')[0].split()[0], frames.split(' of ')[1].split()[0]
playerStats[stat] = {'Width': int(width), 'Height': int(height), 'Pixel ratio': float(devicePixelRatio), 'Frames': {'Dropped': int(dropped), 'Total': int(total)}}
if 'Codecs' == stat:
video, audio = playerStats[stat].split(' / ')
playerStats[stat] = {'Video': video, 'Audio': audio}
if 'Volume' == stat:
if ' (' in playerStats[stat]:
volume, muted = playerStats[stat].split(' (')
playerStats[stat] = {'Volume': int(volume), 'Muted': 'muted' in muted}
else:
playerStats[stat] = {'Volume': int(playerStats[stat]), 'Muted': False}
if 'Connection Speed' == stat:
speed, unit = playerStats[stat].split()
speedBytes = float(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit])
playerStats[stat] = {'Speed': speedBytes, 'Granularity': 's'}
if 'Network Activity' == stat:
downString, upString = playerStats[stat].split(' / ')
down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split()
downBytes = convert_to_bytes(down, downUnit)
upBytes = convert_to_bytes(up, upUnit)
playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
if 'Total Transfered' == stat:
downString, upString = playerStats[stat].split(' / ')
down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split()
downBytes = convert_to_bytes(down, downUnit)
upBytes = convert_to_bytes(up, upUnit)
playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
if 'Download Breakdown' == stat:
server, peer = playerStats[stat].split(' - ')
server, serverUnit = server.replace('from servers', '').strip().split()
peer, peerUnit = peer.replace('from peers', '').strip().split()
serverBytes = convert_to_bytes(server, serverUnit)
peerBytes = convert_to_bytes(peer, peerUnit)
playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes}
if 'Buffer State' == stat:
del(playerStats[stat])
if 'Live Latency' == stat:
latency, edge = playerStats[stat].split(' (from edge: ')
latency = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in latency.replace('s', '').split('m') if part])))
edge = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in edge.replace('s', '').replace(')', '').split('m') if part])))
playerStats[stat] = {'Latency': latency, 'Edge': edge}
stats = {
'player': playerStats,
'peers': peersDict,
'url': driver.current_url,
'timestamp': int(time.time() * 1000),
'session': driver.session_id
}
saveStats([stats])
def convert_to_bytes(down, downUnit):
return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit])
@yaspin()
def setupStats(driver: webdriver.Remote, url: str):
logger.log(logging.INFO, 'Setting up stats.')
actions = ActionChains(driver)
wait = WebDriverWait(driver, 30, poll_frequency=0.2)
driver.get(url)
try:
wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button')))
except Exception:
logger.error('Timeout while waiting for the big play button to be present.')
driver.quit()
raise SystemExit(1)
actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar')))
actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item')
actions.click(statsForNerds[-1]).perform()
wait.until(ec.text_to_be_present_in_element((By.CLASS_NAME, 'vjs-stats-list'), 'Player'))
actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-control-bar')).perform()
logger.log(logging.INFO, 'Stats setup complete.')
return driver
if __name__ == '__main__':
setupLogger()
driver = setupChromeDriver()
signal.signal(signal.SIGINT, lambda signum, frame: interrupt_handler(signum, driver))
url = os.getenv('VIDEO_URL')
if url is None:
logger.error('VIDEO_URL environment variable is not set.')
raise SystemExit(1)
setupStats(driver, url)
logger.info('Starting server collector.')
httpd = HTTPServer(('', 9092), partial(Handler, downloadStats, driver, logger))
httpd.serve_forever()