# Statistiche per Nerd homebrew

In [None]:
import schedule
import signal
import json
import time
import socket
from bs4 import BeautifulSoup as bs
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
from IPython.display import display, DisplayHandle, Image

In [None]:
def interrupt_handler(signum, driver: webdriver.Chrome):
    print(f'Handling signal {signum} ({signal.Signals(signum).name}).')

    schedule.clear()
    driver.quit()
    %reset_selective -f driver
    raise SystemExit

In [None]:
def setupChromeDriver():
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--mute-audio")
    chrome_options.add_argument("--window-size=1280,720")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument("--disable-features=WebRtcHideLocalIpsWithMdns")
    chrome_options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'})
    #chrome_options.add_extension('./qryn-webrtc-exporter.crx')

    driver = webdriver.Chrome(options=chrome_options)
    #driver = webdriver.Remote(command_executor='http://localhost:4444', options=chrome_options)
    return driver

In [None]:
def saveStats(stats: dict):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(json.dumps(stats).encode(), ('localhost', 8094))
        sock.close()
    except socket.error as e:
        print(f'Got socket error: {e}')

def downloadStats(driver: webdriver.Chrome, display_handle: DisplayHandle):
    html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list')
    htmlBS = bs(html.get_attribute('innerHTML'), 'html.parser')

    timestampDiv = htmlBS.new_tag('div', attrs={'style': 'display: block;'})
    timestampTitleDiv = htmlBS.new_tag('div')
    timestampTitleDiv.string = 'Timestamp'
    timestampDiv.append(timestampTitleDiv)
    timestampSpan = htmlBS.new_tag('span')
    timestampSpan.string = time.strftime('%Y-%m-%dT%H:%M:%S%z')
    timestampDiv.append(timestampSpan)
    htmlBS.div.insert_before(timestampDiv)

    peers = driver.find_element(By.CLASS_NAME ,'peers-number').text
    peersDiv = htmlBS.new_tag('div', attrs={'style': 'display: block;'})
    peersTitleDiv = htmlBS.new_tag('div')
    peersTitleDiv.string = 'Peers'
    peersDiv.append(peersTitleDiv)
    peersSpan = htmlBS.new_tag('span')
    peersSpan.string = peers
    peersDiv.append(peersSpan)
    htmlBS.div.insert_before(peersDiv)
    
    stats = htmlBS.find_all('div', attrs={'style': 'display: block;'})

    statsDict = {
        stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip()
        for stat in stats
    }
    
    for stat in statsDict:
        if 'Buffer State' == stat:
            statsDict[stat] = statsDict[stat][1:-1].split(', ')

    # statsDict = {
    # 'userName': dict(
    #     map(
    #         lambda stat: (
    #                 stat.div.text, 
    #                 stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip()
    #             ), stats
    #         )
    #     )
    # }

    statsDict.update({'Timestamp': time.strftime('%Y-%m-%dT%H:%M:%S%z')})
    statsDict['userName'] = 'user'

    display_handle.update(json.dumps(statsDict))

    saveStats(statsDict)

In [None]:
def setupStats(driver: webdriver.Chrome, url: str):
    actions = ActionChains(driver)
    wait = WebDriverWait(driver, 30, poll_frequency=0.2)

    driver.get(url)

    wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button')))
    actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
    actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
    statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item')
    actions.pause(2)
    actions.click(statsForNerds[-1]).perform()
    wait.until(ec.text_to_be_present_in_element((By.CLASS_NAME, 'vjs-stats-list'), 'Player'))
    actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-peertube')).perform()

    display(Image(driver.get_screenshot_as_png()))

    return driver

In [None]:
if __name__ == '__main__':
    driver = setupChromeDriver()

    signal.signal(signal.SIGINT, lambda signum, frame: interrupt_handler(signum, driver))
    
    setupStats(driver, "https://tube.kobim.cloud/w/9hAbiwai4rsbw9QnPpPkCd")

    display_handle = display("Loading...", display_id=True)
    
    schedule.every(1).seconds.do(downloadStats, driver, display_handle)
    while True:
        schedule.run_pending()