feat: implement abstract base classes for stats plugins and add example plugin for stats setup and download
All checks were successful
Build and Push Docker Image / build (push) Successful in 12m3s

This commit is contained in:
2025-02-23 22:05:24 +01:00
parent 83480ed3a8
commit 87e1d24a86
3 changed files with 269 additions and 133 deletions

160
main.py
View File

@@ -1,7 +1,5 @@
import signal import signal
import json
import time import time
import socket
import logging import logging
import os import os
import argparse import argparse
@@ -19,6 +17,14 @@ from selenium.webdriver import ActionChains
from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support import expected_conditions as ec
# Plugin system imports
import importlib
import importlib.util
import inspect
import glob
import sys # Import the sys module
from utils.plugins_base import StatsSetupPlugin, StatsDownloadPlugin
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
args = None args = None
@@ -41,6 +47,7 @@ def setupArgParser():
parser.add_argument('--hub-url', type=str, help='URL of the Selenium hub to connect to. If not provided, local Chrome driver will be used.') parser.add_argument('--hub-url', type=str, help='URL of the Selenium hub to connect to. If not provided, local Chrome driver will be used.')
parser.add_argument('--webrtc-internals-path', type=str, help='Path to the WebRTC internals extension.') parser.add_argument('--webrtc-internals-path', type=str, help='Path to the WebRTC internals extension.')
parser.add_argument('--log-level', type=str, help='Log level to use. Default: INFO') parser.add_argument('--log-level', type=str, help='Log level to use. Default: INFO')
parser.add_argument('--plugin-dir', type=str, help='Path to the plugin directory.')
return parser return parser
@@ -73,17 +80,43 @@ def setupChromeDriver(command_executor: str | None, webrtc_internals_path: str)
return driver return driver
def saveStats(stats: list, socket_url: str, socket_port: int): def convert_to_bytes(down, downUnit):
try: return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit])
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
logger.log(logging.DEBUG, f'Saving stats: {json.dumps(stats, indent=4)}')
sock.sendto(json.dumps(stats).encode(), (socket_url, socket_port))
sock.close()
logger.log(logging.DEBUG, 'Sent stats to socket.')
except socket.error as e:
logger.error(f'Got socket error: {e}')
def downloadStats(driver: webdriver.Remote | webdriver.Chrome, peersDict: dict, socket_url: str, socket_port: int): # Default Plugin Implementations
class DefaultStatsSetupPlugin(StatsSetupPlugin):
def setup_stats(self, driver: webdriver.Remote, url: str, retries: int = 5) -> webdriver.Remote:
logger.log(logging.INFO, 'Setting up stats.')
actions = ActionChains(driver)
wait = WebDriverWait(driver, 30, poll_frequency=0.2)
sleep(2)
for attempt in range(retries):
driver.get(url)
try:
wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button')))
break
except Exception:
logger.error(f'Timeout while waiting for the big play button to be present. Attempt {attempt + 1} of {retries}')
if attempt == retries - 1:
logger.error('Timeout limit reached. Exiting.')
driver.quit()
raise SystemExit(1)
actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar')))
actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform()
statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item')
actions.click(statsForNerds[-1]).perform()
wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, 'div.vjs-stats-content[style="display: block;"]')))
actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-control-bar')).perform()
logger.log(logging.INFO, 'Stats setup complete.')
return driver
class DefaultStatsDownloadPlugin(StatsDownloadPlugin):
def download_stats(self, driver: webdriver.Remote, peersDict: dict, socket_url: str, socket_port: int):
html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML') html = driver.find_element(By.CLASS_NAME ,'vjs-stats-list').get_attribute('innerHTML')
if html is not None: if html is not None:
htmlBS = bs(html, 'html.parser') htmlBS = bs(html, 'html.parser')
@@ -93,7 +126,7 @@ def downloadStats(driver: webdriver.Remote | webdriver.Chrome, peersDict: dict,
stats = htmlBS.find_all('div', attrs={'style': 'display: block;'}) stats = htmlBS.find_all('div', attrs={'style': 'display: block;'})
playerStats = { playerStats = {
stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip() stat.div.text: stat.span.text.replace('\u21d3', 'down').replace('down/', 'down /').replace('\u21d1 ', 'up').replace('\u21d1', 'up').replace('\u00b7', '-').strip() # type: ignore
for stat in stats for stat in stats
} }
@@ -174,46 +207,88 @@ def downloadStats(driver: webdriver.Remote | webdriver.Chrome, peersDict: dict,
'session': driver.session_id 'session': driver.session_id
} }
saveStats([stats], socket_url, socket_port) super().saveStats([stats], socket_url, socket_port)
def convert_to_bytes(down, downUnit): # Plugin loading mechanism
return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit]) def load_plugins(plugin_dir: str) -> tuple[StatsSetupPlugin | None, StatsDownloadPlugin | None]:
"""
Loads plugins from the specified directory.
def setupStats(driver: webdriver.Remote, url: str, retries: int = 5) -> webdriver.Remote: Args:
logger.log(logging.INFO, 'Setting up stats.') plugin_dir: The directory to search for plugins.
actions = ActionChains(driver)
wait = WebDriverWait(driver, 30, poll_frequency=0.2)
sleep(2) Returns:
A tuple containing the loaded StatsSetupPlugin and StatsDownloadPlugin, or (None, None) if no plugins were found.
"""
for attempt in range(retries): logger.info(f"Loading plugins from {plugin_dir}")
driver.get(url)
setup_plugin = None
download_plugin = None
plugin_files = glob.glob(os.path.join(plugin_dir, "*.py"))
# Log the contents of the plugin directory
logger.debug(f"Plugin directory contents: {os.listdir(plugin_dir)}")
for plugin_file in plugin_files:
module_name = os.path.basename(plugin_file)[:-3] # Remove .py extension
logger.debug(f"Loading plugin file {plugin_file}")
try: try:
wait.until(ec.presence_of_element_located((By.CLASS_NAME, 'vjs-big-play-button'))) spec = importlib.util.spec_from_file_location(module_name, plugin_file)
break logger.debug(f"Spec: {spec}")
except Exception: if spec is None:
logger.error(f'Timeout while waiting for the big play button to be present. Attempt {attempt + 1} of {retries}') logger.warning(f"Can't load plugin file {plugin_file}")
if attempt == retries - 1: continue
logger.error('Timeout limit reached. Exiting.') module = importlib.util.module_from_spec(spec)
driver.quit() logger.debug(f"Module: {module}")
raise SystemExit(1) if spec.loader is not None:
spec.loader.exec_module(module)
else:
logger.warning(f"Can't load module {module_name} from {plugin_file}")
actions.click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() for name, obj in inspect.getmembers(module):
wait.until(ec.visibility_of_element_located((By.CLASS_NAME, 'vjs-control-bar'))) logger.debug(f"Found member: {name} in module {module_name}")
actions.context_click(driver.find_element(By.CLASS_NAME ,'video-js')).perform() if inspect.isclass(obj):
statsForNerds = driver.find_elements(By.CLASS_NAME ,'vjs-menu-item') if issubclass(obj, StatsSetupPlugin) and obj is not StatsSetupPlugin:
actions.click(statsForNerds[-1]).perform() logger.info(f"Found StatsSetupPlugin: {obj.__name__}")
wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, 'div.vjs-stats-content[style="display: block;"]'))) setup_plugin = obj()
actions.move_to_element(driver.find_element(By.CLASS_NAME ,'vjs-control-bar')).perform() logger.debug(f"Loaded StatsSetupPlugin: {obj.__name__} from {plugin_file}")
logger.log(logging.INFO, 'Stats setup complete.') elif issubclass(obj, StatsDownloadPlugin) and obj is not StatsDownloadPlugin:
logger.info(f"Found StatsDownloadPlugin: {obj.__name__}")
download_plugin = obj()
logger.debug(f"Loaded StatsDownloadPlugin: {obj.__name__} from {plugin_file}")
else:
logger.debug(f"Class {obj.__name__} is not a subclass of StatsSetupPlugin or StatsDownloadPlugin")
else:
logger.debug(f"{name} is not a class")
except Exception as e:
logger.warning(f"Error loading plugin {plugin_file}: {e}")
return driver return setup_plugin, download_plugin
if __name__ == '__main__': if __name__ == '__main__':
args = setupArgParser().parse_args() args = setupArgParser().parse_args()
setupLogger() setupLogger()
# Load plugins
plugin_dir = firstValid(args.plugin_dir, os.getenv('PLUGIN_DIR'), default=None)
if plugin_dir is None:
logger.info("No plugin directory provided. Using default plugins.")
setup_plugin = None
download_plugin = None
else:
setup_plugin, download_plugin = load_plugins(plugin_dir)
# Use default plugins if none are loaded
if setup_plugin is None:
setup_plugin = DefaultStatsSetupPlugin()
logger.info("Using default StatsSetupPlugin.")
if download_plugin is None:
download_plugin = DefaultStatsDownloadPlugin()
logger.info("Using default StatsDownloadPlugin.")
command_executor = firstValid(args.hub_url, os.getenv('HUB_URL'), default=None) command_executor = firstValid(args.hub_url, os.getenv('HUB_URL'), default=None)
webrtc_internals_path = firstValid( webrtc_internals_path = firstValid(
args.webrtc_internals_path, args.webrtc_internals_path,
@@ -230,7 +305,8 @@ if __name__ == '__main__':
logger.error('VIDEO_URL environment variable or --url argument is required.') logger.error('VIDEO_URL environment variable or --url argument is required.')
raise SystemExit(1) raise SystemExit(1)
setupStats(driver, url) # Use the loaded plugin
driver = setup_plugin.setup_stats(driver, url)
socket_url = firstValid(args.socket_url, os.getenv('SOCKET_URL'), default='localhost') socket_url = firstValid(args.socket_url, os.getenv('SOCKET_URL'), default='localhost')
try: try:
@@ -240,5 +316,5 @@ if __name__ == '__main__':
raise SystemExit(1) raise SystemExit(1)
logger.info('Starting server collector.') logger.info('Starting server collector.')
httpd = HTTPServer(('', 9092), partial(Handler, downloadStats, driver, logger, socket_url, socket_port)) httpd = HTTPServer(('', 9092), partial(Handler, download_plugin.download_stats, driver, logger, socket_url, socket_port))
httpd.serve_forever() httpd.serve_forever()

31
plugins/example_plugin.py Normal file
View File

@@ -0,0 +1,31 @@
import logging
from selenium import webdriver
from selenium.webdriver.remote.webdriver import WebDriver as Remote
from utils.plugins_base import StatsSetupPlugin, StatsDownloadPlugin
logger = logging.getLogger(__name__)
class ExampleStatsSetupPlugin(StatsSetupPlugin):
def setup_stats(self, driver: webdriver.Chrome, url: str, retries: int = 5) -> webdriver.Chrome:
logger.info("Running ExampleStatsSetupPlugin...")
# Here you would implement the custom logic to setup stats
# For example, you could click on a button to display stats.
# You could also wait for an element to appear before continuing.
# This is just an example
driver.get(url)
return driver
class ExampleStatsDownloadPlugin(StatsDownloadPlugin):
def download_stats(self, driver: webdriver.Chrome, peersDict: dict, socket_url: str, socket_port: int):
logger.info("Running ExampleStatsDownloadPlugin...")
stats = {'message': 'Hello from ExampleStatsDownloadPlugin'}
# Here you would implement the custom logic to download stats
# and send them to the socket.
# This is just an example
print(f"Sending stats: {stats} to {socket_url}:{socket_port}")
# Remember to call the saveStats method to send the stats to the socket
super().saveStats([stats], socket_url, socket_port)

29
utils/plugins_base.py Normal file
View File

@@ -0,0 +1,29 @@
import abc
import json
import socket
import logging
from selenium import webdriver
logger = logging.getLogger(__name__)
# Abstract Base Classes for Plugins
class StatsSetupPlugin(abc.ABC):
@abc.abstractmethod
def setup_stats(self, driver: webdriver.Remote | webdriver.Chrome, url: str, retries: int = 5) -> webdriver.Remote | webdriver.Chrome:
pass
class StatsDownloadPlugin(abc.ABC):
@abc.abstractmethod
def download_stats(self, driver: webdriver.Remote | webdriver.Chrome, peersDict: dict, socket_url: str, socket_port: int):
pass
@staticmethod
def saveStats(stats: list, socket_url: str, socket_port: int):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
logger.debug(f'Saving stats: {json.dumps(stats, indent=4)}')
sock.sendto(json.dumps(stats).encode(), (socket_url, socket_port))
sock.close()
logger.debug('Sent stats to socket.')
except socket.error as e:
logger.error(f'Got socket error: {e}')