feat: enhance health checks in docker-compose and improve WebRTC stats collection

This commit is contained in:
2025-02-09 02:10:53 +01:00
parent be0e0f8153
commit 7b78f54510
6 changed files with 138 additions and 268 deletions

View File

@@ -2,38 +2,37 @@
peertube-collector is a project designed to collect and analyze WebRTC statistics from a Chromium browser and export them to a MongoDB service. This project includes a Docker setup for running the necessary services. peertube-collector is a project designed to collect and analyze WebRTC statistics from a Chromium browser and export them to a MongoDB service. This project includes a Docker setup for running the necessary services.
## Working Project Structure
```
peertube-collector/
├── requirements.txt
├── telegraf.conf
├── docker-compose.yml
├── Dockerfile
├── main.py
├── .env
└── utils/
└── webrtc-internals-exporter/
```
## Prerequisites ## Prerequisites
**Linux** based OS with the following: **Linux** based OS with the following:
### Software: ### Software:
- Docker (with compose support) - Docker and Docker Compose
### Open ports on localhost/loopback interface: ### Ports:
#### Localhost (REQUIRED):
- 4444 (Selenium) - 4444 (Selenium)
- 7900 (Selenium VNC - Optional)
If needed, you can open these ports in `ufw` by running the following commands: Ports can be opened in the host machine's firewall with:
```sh ```sh
ufw allow from 172.30.0.1 to 127.0.0.1 port 4444 ufw allow from 172.30.0.0/16 to any port 4444
ufw allow from 172.30.0.1 to 127.0.0.1 port 7900
``` ```
#### External (OPTIONAL):
These ports are actively used by selenium and the collector services. By defaut they should not be blocked by the firewall, but if so, they can be opened in the host machine's firewall.
- 50000:60000/udp (WebRTC)
- Needed for WebRTC NAT traversal, otherwise the browser will not connect to any peer.
The range needs to be fairly large since the port is chosen randomly by the STUN server.
- 27107/tcp (MongoDB)
Ports can be opened in the host machine's firewall with:
```sh
ufw allow 50000:60000/udp
ufw allow 27107/tcp
```
---
## Setup ## Setup
@@ -48,17 +47,23 @@ ufw allow from 172.30.0.1 to 127.0.0.1 port 7900
cp .env.example .env cp .env.example .env
``` ```
3. Ajust the firewall settings to allow the necessary ports if needed
3. Start the Docker containers: 4. Start the Docker containers:
```sh ```sh
docker compose up docker compose up
``` ```
or in detached mode: or in detached mode:
```sh ```sh
docker-compose up -d docker compose up -d
``` ```
The collector will start collecting WebRTC stats from the specified PeerTube instance. To stop the Docker containers run: `docker compose down -v`
The collector will start gathering WebRTC stats from the Selenium container and sending them to the Telegraf service.
### Monitoring
A noVNC server is available at [http://localhost:7900](http://localhost:7900/?autoconnect=1&resize=scale&password=secret) to monitor the Selenium container. The password is `secret`.
## Components ## Components
@@ -80,3 +85,17 @@ The `main.py` script sets up the Selenium WebDriver, collects WebRTC stats, and
### WebRTC Internals Exporter ### WebRTC Internals Exporter
The `webrtc-internals-exporter` directory contains a Chromium extension that collects WebRTC stats from the browser. The `webrtc-internals-exporter` directory contains a Chromium extension that collects WebRTC stats from the browser.
## Working Project Structure
```
peertube-collector/
├── requirements.txt
├── telegraf.conf
├── docker-compose.yml
├── Dockerfile
├── main.py
├── .env
└── utils/
└── webrtc-internals-exporter/
```

View File

@@ -6,6 +6,9 @@ services:
- ./webrtc-internals-exporter:/tmp/webrtc-internals-exporter:ro - ./webrtc-internals-exporter:/tmp/webrtc-internals-exporter:ro
shm_size: "2g" shm_size: "2g"
attach: false attach: false
depends_on:
telegraf:
condition: service_healthy
healthcheck: healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4444/wd/hub/status"] test: ["CMD", "curl", "-f", "http://localhost:4444/wd/hub/status"]
interval: 5s interval: 5s
@@ -27,7 +30,6 @@ services:
interval: 5s interval: 5s
timeout: 10s timeout: 10s
retries: 5 retries: 5
restart: unless-stopped
networks: networks:
- backend - backend
@@ -44,7 +46,6 @@ services:
condition: service_healthy condition: service_healthy
environment: environment:
- VIDEO_URL=${VIDEO_URL:?"Video URL is required"} - VIDEO_URL=${VIDEO_URL:?"Video URL is required"}
restart: unless-stopped
ports: ports:
- "9092:9092" - "9092:9092"
extra_hosts: extra_hosts:

19
main.py
View File

@@ -101,9 +101,9 @@ def downloadStats(driver: webdriver.Chrome, peersDict: dict):
if 'Connection Speed' == stat: if 'Connection Speed' == stat:
speed, unit = playerStats[stat].split() speed, unit = playerStats[stat].split()
speedBytes = int(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit]) speedBytes = float(speed) * (1024 ** {'B/s': 0, 'KB/s': 1, 'MB/s': 2, 'GB/s': 3}[unit])
playerStats[stat] = {'Speed': int(speedBytes), 'Granularity': 's'} playerStats[stat] = {'Speed': speedBytes, 'Granularity': 's'}
if 'Network Activity' == stat: if 'Network Activity' == stat:
downString, upString = playerStats[stat].split(' / ') downString, upString = playerStats[stat].split(' / ')
@@ -111,8 +111,8 @@ def downloadStats(driver: webdriver.Chrome, peersDict: dict):
down, downUnit = downString.replace('down', '').strip().split() down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split() up, upUnit = upString.replace('up', '').strip().split()
downBytes = int(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit]) downBytes = convert_to_bytes(down, downUnit)
upBytes = int(up) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[upUnit]) upBytes = convert_to_bytes(up, upUnit)
playerStats[stat] = {'Down': downBytes, 'Up': upBytes} playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
@@ -122,8 +122,8 @@ def downloadStats(driver: webdriver.Chrome, peersDict: dict):
down, downUnit = downString.replace('down', '').strip().split() down, downUnit = downString.replace('down', '').strip().split()
up, upUnit = upString.replace('up', '').strip().split() up, upUnit = upString.replace('up', '').strip().split()
downBytes = int(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit]) downBytes = convert_to_bytes(down, downUnit)
upBytes = int(up) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[upUnit]) upBytes = convert_to_bytes(up, upUnit)
playerStats[stat] = {'Down': downBytes, 'Up': upBytes} playerStats[stat] = {'Down': downBytes, 'Up': upBytes}
@@ -133,8 +133,8 @@ def downloadStats(driver: webdriver.Chrome, peersDict: dict):
server, serverUnit = server.replace('from servers', '').strip().split() server, serverUnit = server.replace('from servers', '').strip().split()
peer, peerUnit = peer.replace('from peers', '').strip().split() peer, peerUnit = peer.replace('from peers', '').strip().split()
serverBytes = int(server) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[serverUnit]) serverBytes = convert_to_bytes(server, serverUnit)
peerBytes = int(peer) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[peerUnit]) peerBytes = convert_to_bytes(peer, peerUnit)
playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes} playerStats[stat] = {'Server': serverBytes, 'Peers': peerBytes}
@@ -157,6 +157,9 @@ def downloadStats(driver: webdriver.Chrome, peersDict: dict):
saveStats([stats]) saveStats([stats])
def convert_to_bytes(down, downUnit):
return float(down) * (1024 ** {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3}[downUnit])
def setupStats(driver: webdriver.Remote, url: str): def setupStats(driver: webdriver.Remote, url: str):
logger.log(logging.INFO, 'Setting up stats.') logger.log(logging.INFO, 'Setting up stats.')
actions = ActionChains(driver) actions = ActionChains(driver)

View File

@@ -33,11 +33,6 @@ chrome.runtime.onInstalled.addListener(async ({ reason }) => {
...options, ...options,
}); });
} }
await chrome.alarms.create("webrtc-internals-exporter-alarm", {
delayInMinutes: 1,
periodInMinutes: 1,
});
}); });
async function updateTabInfo(tab) { async function updateTabInfo(tab) {
@@ -104,76 +99,8 @@ chrome.tabs.onUpdated.addListener(async (tabId, changeInfo) => {
await updateTabInfo({ id: tabId, url: changeInfo.url }); await updateTabInfo({ id: tabId, url: changeInfo.url });
}); });
chrome.alarms.onAlarm.addListener((alarm) => { // Send data to POST handler.
if (alarm.name === "webrtc-internals-exporter-alarm") { async function sendJsonData(method, data) {
cleanupPeerConnections().catch((err) => {
log(`cleanup peer connections error: ${err.message}`);
});
}
});
async function setPeerConnectionLastUpdate({ id, origin }, lastUpdate = 0) {
let { peerConnectionsLastUpdate } = await chrome.storage.local.get(
"peerConnectionsLastUpdate",
);
if (!peerConnectionsLastUpdate) {
peerConnectionsLastUpdate = {};
}
if (lastUpdate) {
peerConnectionsLastUpdate[id] = { origin, lastUpdate };
} else {
delete peerConnectionsLastUpdate[id];
}
await chrome.storage.local.set({ peerConnectionsLastUpdate });
const peerConnectionsPerOrigin = {};
Object.values(peerConnectionsLastUpdate).forEach(({ origin: o }) => {
if (!peerConnectionsPerOrigin[o]) {
peerConnectionsPerOrigin[o] = 0;
}
peerConnectionsPerOrigin[o]++;
});
await chrome.storage.local.set({ peerConnectionsPerOrigin });
await optionsUpdated();
}
async function cleanupPeerConnections() {
let { peerConnectionsLastUpdate } = await chrome.storage.local.get(
"peerConnectionsLastUpdate",
);
if (
!peerConnectionsLastUpdate ||
!Object.keys(peerConnectionsLastUpdate).length
) {
return;
}
log(
`checking stale peer connections (${
Object.keys(peerConnectionsLastUpdate).length
} total)`,
);
const now = Date.now();
await Promise.allSettled(
Object.entries(peerConnectionsLastUpdate)
.map(([id, { origin, lastUpdate }]) => {
if (
now - lastUpdate >
Math.max(2 * options.updateInterval, 30) * 1000
) {
return { id, origin };
}
})
.filter((ret) => !!ret?.id)
.map(({ id, origin }) => {
log(`removing stale peer connection metrics: ${id} ${origin}`);
return sendData("DELETE", { id, origin });
}),
);
}
// Send data to pushgateway.
async function sendData(method, { id, origin }, data) {
const { url, username, password, gzip, job } = options; const { url, username, password, gzip, job } = options;
const headers = { const headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
@@ -185,59 +112,7 @@ async function sendData(method, { id, origin }, data) {
headers["Content-Encoding"] = "gzip"; headers["Content-Encoding"] = "gzip";
data = await pako.gzip(data); data = await pako.gzip(data);
} }
log(`sendData: ${data} \n ${data.length} bytes (gzip: ${gzip}) url: ${url} job: ${job}`); log(`sendJsonData: ${data} \n ${data.length} bytes (gzip: ${gzip}) url: ${url} job: ${job}`);
const start = Date.now();
const response = await fetch(
`${url}/metrics/job/${job}/peerConnectionId/${id}`,
{
method,
headers,
body: method === "POST" ? data : undefined,
},
);
const stats = await chrome.storage.local.get([
"messagesSent",
"bytesSent",
"totalTime",
"errors",
]);
if (data) {
stats.messagesSent = (stats.messagesSent || 0) + 1;
stats.bytesSent = (stats.bytesSent || 0) + data.length;
stats.totalTime = (stats.totalTime || 0) + Date.now() - start;
}
if (!response.ok) {
stats.errors = (stats.errors || 0) + 1;
}
await chrome.storage.local.set(stats);
if (!response.ok) {
const text = await response.text();
throw new Error(`Response status: ${response.status} error: ${text}`);
}
await setPeerConnectionLastUpdate(
{ id, origin },
method === "POST" ? start : undefined,
);
return response.text();
}
async function sendJsonData(method, { id, origin }, data) {
const { url, username, password, gzip, job } = options;
const headers = {
"Content-Type": "application/json",
};
if (username && password) {
headers.Authorization = "Basic " + btoa(`${username}:${password}`);
}
if (data && gzip) {
headers["Content-Encoding"] = "gzip";
data = await pako.gzip(data);
}
log(`sendData: ${data} \n ${data.length} bytes (gzip: ${gzip}) url: ${url} job: ${job}`);
const start = Date.now(); const start = Date.now();
const response = await fetch( const response = await fetch(
`${url}/${job}`, `${url}/${job}`,
@@ -269,90 +144,13 @@ async function sendJsonData(method, { id, origin }, data) {
throw new Error(`Response status: ${response.status} error: ${text}`); throw new Error(`Response status: ${response.status} error: ${text}`);
} }
await setPeerConnectionLastUpdate(
{ id, origin },
method === "POST" ? start : undefined,
);
return response.text(); return response.text();
} }
const QualityLimitationReasons = {
none: 0,
bandwidth: 1,
cpu: 2,
other: 3,
};
/**
* sendPeerConnectionStats
* @param {string} url
* @param {string} id
* @param {RTCPeerConnectionState} state
* @param {any} values
*/
async function sendPeerConnectionStats(url, id, state, values) {
const origin = new URL(url).origin;
if (state === "closed") {
return sendData("DELETE", { id, origin });
}
let data = "";
const sentTypes = new Set();
values.forEach((value) => {
const type = value.type.replace(/-/g, "_");
const labels = [`pageUrl="${url}"`];
const metrics = [];
if (value.type === "peer-connection") {
labels.push(`state="${state}"`);
}
Object.entries(value).forEach(([key, v]) => {
if (typeof v === "number") {
metrics.push([key, v]);
} else if (typeof v === "object") {
Object.entries(v).forEach(([subkey, subv]) => {
if (typeof subv === "number") {
metrics.push([`${key}_${subkey}`, subv]);
}
});
} else if (
key === "qualityLimitationReason" &&
QualityLimitationReasons[v] !== undefined
) {
metrics.push([key, QualityLimitationReasons[v]]);
} else if (key === "googTimingFrameInfo") {
// TODO
} else {
labels.push(`${key}="${v}"`);
}
});
metrics.forEach(([key, v]) => {
const name = `${type}_${key.replace(/-/g, "_")}`;
let typeDesc = "";
if (!sentTypes.has(name)) {
typeDesc = `# TYPE ${name} gauge\n`;
sentTypes.add(name);
}
data += `${typeDesc}${name}{${labels.join(",")}} ${v}\n`;
});
});
if (data.length > 0) {
return sendData("POST", { id, origin }, data + "\n");
}
}
chrome.runtime.onMessage.addListener((message, sender, sendResponse) => { chrome.runtime.onMessage.addListener((message, sender, sendResponse) => {
if (message.event === "peer-connection-stats") { if (message.event === "peer-connection-stats") {
const { url, id, state, values } = message.data;
sendData("POST", { id, origin: new URL(url).origin }, JSON.stringify(message.data)) sendJsonData("POST", JSON.stringify(message.data))
.then(() => { .then(() => {
sendResponse({}); sendResponse({});
}) })
@@ -360,9 +158,14 @@ chrome.runtime.onMessage.addListener((message, sender, sendResponse) => {
sendResponse({ error: err.message }); sendResponse({ error: err.message });
}); });
} else if (message.event === "peer-connections-stats") { } else if (message.event === "peer-connections-stats") {
const { stats } = message.data;
sendJsonData("POST", { id: "all", origin: "all" }, JSON.stringify(message.data)) sendJsonData("POST", JSON.stringify(message.data))
.then(() => {
sendResponse({});
})
.catch((err) => {
sendResponse({ error: err.message });
});
} else { } else {
sendResponse({ error: "unknown event" }); sendResponse({ error: "unknown event" });
} }

View File

@@ -79,19 +79,14 @@ if (window.location.protocol.startsWith("http")) {
// Handle stats messages. // Handle stats messages.
window.addEventListener("message", async (message) => { window.addEventListener("message", async (message) => {
const { event, url, id, state, values, stats } = message.data; const { event, data, stats } = message.data;
if (event === "webrtc-internal-exporter:ready") { if (event === "webrtc-internal-exporter:ready") {
sendOptions(); sendOptions();
} else if (event === "webrtc-internal-exporter:peer-connection-stats") { } else if (event === "webrtc-internal-exporter:peer-connection-stats") {
try { try {
const response = await chrome.runtime.sendMessage({ const response = await chrome.runtime.sendMessage({
event: "peer-connection-stats", event: "peer-connection-stats",
data: { data: stats,
url,
id,
state,
values,
},
}); });
if (response.error) { if (response.error) {
log(`error: ${response.error}`); log(`error: ${response.error}`);
@@ -103,7 +98,7 @@ if (window.location.protocol.startsWith("http")) {
try { try {
const response = await chrome.runtime.sendMessage({ const response = await chrome.runtime.sendMessage({
event: "peer-connections-stats", event: "peer-connections-stats",
data: stats, data: data,
}); });
if (response.error) { if (response.error) {
log(`error: ${response.error}`); log(`error: ${response.error}`);

View File

@@ -31,15 +31,55 @@ class WebrtcInternalExporter {
); );
} }
/**
* @param {RTCPeerConnection} pc
*/
add(pc) { add(pc) {
const id = this.randomId(); const id = this.randomId();
pc.iceCandidates = [];
pc.iceCandidateErrors = [];
this.peerConnections.set(id, pc); this.peerConnections.set(id, pc);
pc.addEventListener("connectionstatechange", () => { pc.addEventListener("connectionstatechange", () => {
if (pc.connectionState === "closed") { if (pc.connectionState === "closed") {
this.peerConnections.delete(id); this.peerConnections.delete(id);
} }
}); });
//this.collectAndPostStats(id);
/**
* @param {RTCPeerConnectionIceErrorEvent} event
*/
pc.addEventListener("icecandidateerror", (event) => {
this.peerConnections.get(id).iceCandidateErrors.push({
timestamp: Date.now(),
address: event.errorAddress,
errorCode: event.errorCode,
errorText: event.errorText,
port: event.errorPort,
url: event.url,
});
});
/**
* @param {RTCPeerConnectionIceEvent} event
*/
pc.addEventListener("icecandidate", (event) => {
this.peerConnections.get(id).iceCandidates.push({
timestamp: Date.now(),
candidate: event.candidate?.candidate,
component: event.candidate?.component,
foundation: event.candidate?.foundation,
port: event.candidate?.port,
priority: event.candidate?.priority,
protocol: event.candidate?.protocol,
relatedAddress: event.candidate?.relatedAddress,
relatedPort: event.candidate?.relatedPort,
sdpMLineIndex: event.candidate?.sdpMLineIndex,
sdpMid: event.candidate?.sdpMid,
tcpType: event.candidate?.tcpType,
type: event.candidate?.type,
usernameFragment: event.candidate?.usernameFragment,
});
});
} }
async collectAndPostSingleStat(id) { async collectAndPostSingleStat(id) {
@@ -49,9 +89,9 @@ class WebrtcInternalExporter {
window.postMessage( window.postMessage(
{ {
event: "webrtc-internal-exporter:peer-connection-stats", event: "webrtc-internal-exporter:peer-connection-stats",
...stats, stats
}, },
[stats], stats
); );
} }
@@ -59,26 +99,30 @@ class WebrtcInternalExporter {
const stats = []; const stats = [];
for (const [id, pc] of this.peerConnections) { for (const [id, pc] of this.peerConnections) {
if (this.url && this.enabled && pc.connectionState === "connected") { if (this.url && this.enabled) {
const pcStats = await this.collectStats(id, pc); const pcStats = await this.collectStats(id, pc);
stats.push(pcStats); stats.push(pcStats);
} }
} }
window.postMessage( window.postMessage(
{ {
event: "webrtc-internal-exporter:peer-connections-stats", event: "webrtc-internal-exporter:peer-connections-stats",
stats, data: JSON.parse(JSON.stringify(stats)),
}, },
[stats], );
);
log(`Stats collected:`, stats); log(`Stats collected:`, JSON.parse(JSON.stringify(stats)));
setTimeout(this.collectAllStats.bind(this), this.updateInterval); setTimeout(this.collectAllStats.bind(this), this.updateInterval);
return stats; return stats;
} }
/**
* @param {string} id
* @param {RTCPeerConnection} pc
* @param {Function} binding
*/
async collectStats(id, pc, binding) { async collectStats(id, pc, binding) {
var completeStats = {}; var completeStats = {};
@@ -87,7 +131,7 @@ class WebrtcInternalExporter {
if (!pc) return; if (!pc) return;
} }
if (this.url && this.enabled && pc.connectionState === "connected") { if (this.url && this.enabled) {
try { try {
const stats = await pc.getStats(); const stats = await pc.getStats();
const values = [...stats.values()].filter( const values = [...stats.values()].filter(
@@ -98,7 +142,12 @@ class WebrtcInternalExporter {
completeStats = { completeStats = {
url: window.location.href, url: window.location.href,
id, id,
state: pc.connectionState, connectionState: pc.connectionState,
iceConnectionState: pc.iceConnectionState,
iceGatheringState: pc.iceGatheringState,
signalingState: pc.signalingState,
iceCandidateErrors: pc.iceCandidateErrors,
iceCandidates: pc.iceCandidates,
values, values,
}; };
} catch (error) { } catch (error) {