Files
peertube-collector/main.py
Mirko Milovanovic 5c92020169
All checks were successful
Build and Push Docker Image / build (push) Successful in 12m31s
Add convenience functions, update Docker setup, and integrate Webpack for building WebRTC internals exporter (#3)
Reviewed-on: #3
Co-authored-by: Mirko Milovanovic <mir_ko@me.com>
Co-committed-by: Mirko Milovanovic <mir_ko@me.com>
2025-02-21 11:21:14 +00:00

241 lines
10 KiB
Python

import signal
import json
import time
import socket
import logging
import os
import argparse
from yaspin import yaspin
from functools import partial
from http.server import HTTPServer
from utils.PostHandler import Handler
from utils.ColoredFormatter import ColoredFormatter
from utils.Convenience import *
from bs4 import BeautifulSoup as bs
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
logger = logging.getLogger(__name__)
args = None
def setupLogger():
logging_format = "[%(asctime)s] (%(levelname)s) %(module)s - %(funcName)s: %(message)s"
logging.basicConfig(level=firstValid(args.log_level, os.getenv('LOG_LEVEL'), default='INFO'), format=logging_format) # type: ignore
(logger := logging.getLogger(__name__)).setLevel(logging.INFO)
logger.propagate = False
(logger_handler := logging.StreamHandler()).setFormatter(
ColoredFormatter(fmt=logging_format)
)
logger.addHandler(logger_handler)
def setupArgParser():
parser = argparse.ArgumentParser(description='Collector for PeerTube stats.')
parser.add_argument('-u', '--url', type=str, help='URL of the video to collect stats for.')
parser.add_argument('--socket-url', type=str, help='URL of the socket to send the stats to. Default: localhost')
parser.add_argument('--socket-port', type=int, help='Port of the socket to send the stats to. Default: 8094')
parser.add_argument('--hub-url', type=str, help='URL of the Selenium hub to connect to. If not provided, local Chrome driver will be used.')
parser.add_argument('--webrtc-internals-path', type=str, help='Path to the WebRTC internals extension.')
parser.add_argument('--log-level', type=str, help='Log level to use. Default: INFO')
return parser
def interrupt_handler(signum, driver: webdriver.Remote):
logger.info(f'Handling signal {signum} ({signal.Signals(signum).name}).')
driver.quit()
raise SystemExit
@yaspin()
def setupChromeDriver(command_executor: str | None, webrtc_internals_path: str) -> webdriver.Remote | webdriver.Chrome:
logger.log(logging.INFO, 'Setting up Chrome driver.')
chrome_options = Options()
#chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--mute-audio")
chrome_options.add_argument("--window-size=1280,720")
chrome_options.add_argument("--no-default-browser-check")
chrome_options.add_argument("--disable-features=WebRtcHideLocalIpsWithMdns")
chrome_options.add_argument(f"--load-extension={webrtc_internals_path}")
chrome_options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'})
if command_executor is not None:
driver = webdriver.Remote(command_executor=command_executor, options=chrome_options)
logger.warning(f'Using Selenium hub at {command_executor}.')
else:
driver = webdriver.Chrome(options=chrome_options)
logger.warning('No Selenium hub URL provided, using local Chrome driver.')
logger.log(logging.INFO, 'Chrome driver setup complete.')
return driver
def saveStats(stats: list, socket_url: str, socket_port: int):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
logger.log(logging.DEBUG, f'Saving stats: {json.dumps(stats, indent=4)}')
sock.sendto(json.dumps(stats).encode(), (socket_url, socket_port))
sock.close()
logger.log(logging.DEBUG, 'Sent stats to socket.')
except socket.error as e:
logger.error(f'Got socket error: {e}')
def downloadStats(driver: webdriver.Remote | webdriver.Chrome, peersDict: dict, socket_url: str, socket_port: int):
html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML')
if html is not None:
htmlBS = bs(html, 'html.parser')
else:
raise ValueError("html is None")
stats = htmlBS.find_all('div', attrs={'style': 'display: block;'})
playerStats = {
stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip()
for stat in stats
}
keys = list(playerStats.keys())
for stat in keys:
if 'Viewport / Frames' == stat:
viewport, frames = playerStats[stat].split(' / ')
width, height = viewport.split('x')
height, devicePixelRatio = height.split('*')
dropped, total = frames.split(' of ')[0].split()[0], frames.split(' of ')[1].split()[0]
playerStats[stat] = {'Width': int(width), 'Height': int(height), 'Pixel ratio': float(devicePixelRatio), 'Frames': {'Dropped': int(dropped), 'Total': int(total)}}
if 'Codecs' == stat:
video, audio = playerStats[stat].split(' / ')
playerStats[stat] = {'Video': video, 'Audio': audio}
if 'Volume' == stat:
if ' (' in playerStats[stat]:
volume, muted = playerStats[stat].split(' (')
playerStats[stat] = {'Volume': int(volume), 'Muted': 'muted' in muted}
else:
playerStats[stat] = {'Volume': int(playerStats[stat]), 'Muted': False}
if 'Connection Speed' == stat:
speed, unit = playerStats[stat].split()
speedBytes = float(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit])
playerStats[stat] = {'Speed': speedBytes, 'Granularity': 's'}
if 'Network Activity' == stat:
downString, upString = playerStats[stat].split(' / ')
down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split()
downBytes = convert_to_bytes(down, downUnit)
upBytes = convert_to_bytes(up, upUnit)
playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
if 'Total Transfered' == stat:
downString, upString = playerStats[stat].split(' / ')
down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split()
downBytes = convert_to_bytes(down, downUnit)
upBytes = convert_to_bytes(up, upUnit)
playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
if 'Download Breakdown' == stat:
server, peer = playerStats[stat].split(' - ')
server, serverUnit = server.replace('from servers', '').strip().split()
peer, peerUnit = peer.replace('from peers', '').strip().split()
serverBytes = convert_to_bytes(server, serverUnit)
peerBytes = convert_to_bytes(peer, peerUnit)
playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes}
if 'Buffer State' == stat:
del(playerStats[stat])
if 'Live Latency' == stat:
latency, edge = playerStats[stat].split(' (from edge: ')
latency = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in latency.replace('s', '').split('m') if part])))
edge = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in edge.replace('s', '').replace(')', '').split('m') if part])))
playerStats[stat] = {'Latency': latency, 'Edge': edge}
stats = {
'player': playerStats,
'peers': peersDict,
'url': driver.current_url,
'timestamp': int(time.time() * 1000),
'session': driver.session_id
}
saveStats([stats], socket_url, socket_port)
def convert_to_bytes(down, downUnit):
return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit])
@yaspin()
def setupStats(driver: webdriver.Remote, url: str):
logger.log(logging.INFO, 'Setting up stats.')
actions = ActionChains(driver)
wait = WebDriverWait(driver, 30, poll_frequency=0.2)
driver.get(url)
try:
wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button')))
except Exception:
logger.error('Timeout while waiting for the big play button to be present.')
driver.quit()
raise SystemExit(1)
actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar')))
actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item')
actions.click(statsForNerds[-1]).perform()
wait.until(ec.text_to_be_present_in_element((By.CLASS_NAME, 'vjs-stats-list'), 'Player'))
actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-control-bar')).perform()
logger.log(logging.INFO, 'Stats setup complete.')
return driver
if __name__ == '__main__':
args = setupArgParser().parse_args()
setupLogger()
command_executor = firstValid(args.hub_url, os.getenv('HUB_URL'), default=None)
webrtc_internals_path = firstValid(
args.webrtc_internals_path,
os.getenv('WEBRTC_INTERNALS_PATH'),
default=os.path.abspath(os.path.join(os.path.dirname(__file__), 'webrtc-internals-exporter'))
)
driver = setupChromeDriver(command_executor, webrtc_internals_path)
signal.signal(signal.SIGINT, lambda signum, frame: interrupt_handler(signum, driver))
url = firstValid(args.url, os.getenv('VIDEO_URL'), default=None)
if url is None:
logger.error('VIDEO_URL environment variable or --url argument is required.')
raise SystemExit(1)
setupStats(driver, url)
socket_url = firstValid(args.socket_url, os.getenv('SOCKET_URL'), default='localhost')
try:
socket_port = int(firstValid(args.socket_port, os.getenv('SOCKET_PORT'), default=8094))
except ValueError:
logger.error('Invalid socket port provided. Exiting.')
raise SystemExit(1)
logger.info('Starting server collector.')
httpd = HTTPServer(('', 9092), partial(Handler, downloadStats, driver, logger, socket_url, socket_port))
httpd.serve_forever()