Files
Tesi/peertube/statnerd/main.py

189 lines
8.1 KiB
Python

import signal
import json
import time
import socket
import logging
import os
from functools import partial
from http.server import HTTPServer
from utils.PostHandler import Handler
from utils.ColoredFormatter import ColoredFormatter
from bs4 import BeautifulSoup as bs
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
logger = logging.getLogger(__name__)
def setupLogger():
logging_format = "[%(asctime)s] (%(levelname)s) %(module)s - %(funcName)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=logging_format)
(logger := logging.getLogger(__name__)).setLevel(logging.INFO)
logger.propagate = False
(logger_handler := logging.StreamHandler()).setFormatter(
ColoredFormatter(fmt=logging_format)
)
logger.addHandler(logger_handler)
def interrupt_handler(signum, driver: webdriver.Chrome):
logger.info(f'Handling signal {signum} ({signal.Signals(signum).name}).')
driver.quit()
raise SystemExit
def setupChromeDriver():
logger.log(logging.INFO, 'Setting up Chrome driver.')
chrome_options = Options()
#chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--mute-audio")
chrome_options.add_argument("--window-size=1280,720")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--no-default-browser-check")
chrome_options.add_argument("--disable-features=WebRtcHideLocalIpsWithMdns")
chrome_options.add_argument(f"--load-extension={os.path.abspath(os.path.join(os.path.dirname(__file__), 'webrtc-internals-exporter'))}")
chrome_options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'})
driver = webdriver.Chrome(options=chrome_options)
#driver = webdriver.Remote(command_executor='http://localhost:4444', options=chrome_options)
logger.log(logging.INFO, 'Chrome driver setup complete.')
return driver
def saveStats(stats: list):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
logger.log(logging.DEBUG, f'Saving stats: {json.dumps(stats, indent=4)}')
sock.sendto(json.dumps(stats).encode(), ('localhost', 8094))
sock.close()
logger.log(logging.DEBUG, 'Sent stats to socket.')
except socket.error as e:
logger.error(f'Got socket error: {e}')
def downloadStats(driver: webdriver.Chrome, peersDict: dict):
html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML')
if html is not None:
htmlBS = bs(html, 'html.parser')
else:
raise ValueError("html is None")
stats = htmlBS.find_all('div', attrs={'style': 'display: block;'})
playerStats = {
stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip()
for stat in stats
}
keys = list(playerStats.keys())
for stat in keys:
if 'Viewport / Frames' == stat:
viewport, frames = playerStats[stat].split(' / ')
width, height = viewport.split('x')
height, devicePixelRatio = height.split('*')
dropped, total = frames.split(' of ')[0].split()[0], frames.split(' of ')[1].split()[0]
playerStats[stat] = {'Width': int(width), 'Height': int(height), 'Pixel ratio': float(devicePixelRatio), 'Frames': {'Dropped': int(dropped), 'Total': int(total)}}
if 'Codecs' == stat:
video, audio = playerStats[stat].split(' / ')
playerStats[stat] = {'Video': video, 'Audio': audio}
if 'Volume' == stat:
if ' (' in playerStats[stat]:
volume, muted = playerStats[stat].split(' (')
playerStats[stat] = {'Volume': int(volume), 'Muted': 'muted' in muted}
else:
playerStats[stat] = {'Volume': int(playerStats[stat]), 'Muted': False}
if 'Connection Speed' == stat:
speed, unit = playerStats[stat].split()
speedBytes = int(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit])
playerStats[stat] = {'Speed': int(speedBytes), 'Granularity': 's'}
if 'Network Activity' == stat:
downString, upString = playerStats[stat].split(' / ')
down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split()
downBytes = int(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit])
upBytes = int(up) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[upUnit])
playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
if 'Total Transfered' == stat:
downString, upString = playerStats[stat].split(' / ')
down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split()
downBytes = int(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit])
upBytes = int(up) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[upUnit])
playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
if 'Download Breakdown' == stat:
server, peer = playerStats[stat].split(' - ')
server, serverUnit = server.replace('from servers', '').strip().split()
peer, peerUnit = peer.replace('from peers', '').strip().split()
serverBytes = int(server) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[serverUnit])
peerBytes = int(peer) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[peerUnit])
playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes}
if 'Buffer State' == stat:
del(playerStats[stat])
if 'Live Latency' == stat:
latency, edge = playerStats[stat].split(' (from edge: ')
latency = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in latency.replace('s', '').split('m') if part])))
edge = sum(int(x) * 60 ** i for i, x in enumerate(reversed([part for part in edge.replace('s', '').replace(')', '').split('m') if part])))
playerStats[stat] = {'Latency': latency, 'Edge': edge}
stats = {
'player': playerStats,
'peers': peersDict,
'url': driver.current_url,
'timestamp': int(time.time() * 1000),
'session': driver.session_id
}
saveStats([stats])
def setupStats(driver: webdriver.Chrome, url: str):
logger.log(logging.INFO, 'Setting up stats.')
actions = ActionChains(driver)
wait = WebDriverWait(driver, 30, poll_frequency=0.2)
driver.get(url)
wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button')))
actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar')))
actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item')
actions.click(statsForNerds[-1]).perform()
wait.until(ec.text_to_be_present_in_element((By.CLASS_NAME, 'vjs-stats-list'), 'Player'))
logger.log(logging.INFO, 'Stats setup complete.')
return driver
if __name__ == '__main__':
setupLogger()
driver = setupChromeDriver()
signal.signal(signal.SIGINT, lambda signum, frame: interrupt_handler(signum, driver))
setupStats(driver, "https://tube.kobim.cloud/w/9hAbiwai4rsbw9QnPpPkCd")
logger.log(logging.INFO, 'Starting server collector.')
httpd = HTTPServer(('localhost', 9092), partial(Handler, downloadStats, driver, logger))
logger.info('Server collector started.')
httpd.serve_forever()