Files
peertube-collector/utils/plugins_base.py
2025-02-23 22:37:21 +01:00

29 lines
1.0 KiB
Python

import abc
import json
import socket
import logging
from selenium import webdriver
logger = logging.getLogger(__name__)
# Abstract Base Classes for Plugins
class StatsSetupPlugin(abc.ABC):
@abc.abstractmethod
def setup_stats(self, driver: webdriver.Remote | webdriver.Chrome, url: str, retries: int = 5) -> webdriver.Remote | webdriver.Chrome:
pass
class StatsDownloadPlugin(abc.ABC):
@abc.abstractmethod
def download_stats(self, driver: webdriver.Remote | webdriver.Chrome, peersDict: dict, socket_url: str, socket_port: int):
pass
@staticmethod
def saveStats(stats: list, socket_url: str, socket_port: int):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
logger.debug(f'Saving stats: {json.dumps(stats, indent=4)}')
sock.sendto(json.dumps(stats).encode(), (socket_url, socket_port))
sock.close()
logger.debug('Sent stats to socket.')
except socket.error as e:
logger.error(f'Got socket error: {e}')