From 4adb9d4ec3c32211bf2fbcb27228c52034c61d9a Mon Sep 17 00:00:00 2001 From: Noa Aarts Date: Thu, 12 Dec 2024 15:11:59 +0100 Subject: [PATCH] add websocket stream for statistics --- src/webapi.rs | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/src/webapi.rs b/src/webapi.rs index 6aef64c..1680b1b 100644 --- a/src/webapi.rs +++ b/src/webapi.rs @@ -1,10 +1,10 @@ -use std::{net::SocketAddr, process::exit, sync::Arc}; +use std::{net::SocketAddr, process::exit, sync::Arc, time::Duration}; use axum::{ - extract::{ConnectInfo, Query, State}, + extract::{ws::Message, ConnectInfo, Query, State, WebSocketUpgrade}, http::{self, HeaderMap, HeaderValue}, - response::IntoResponse, - routing::any, + response::{IntoResponse, Response}, + routing::get, Router, }; use axum_extra::TypedHeader; @@ -12,14 +12,14 @@ use axum_streams::StreamBodyAs; use futures::{never::Never, stream::repeat_with, Stream}; use rust_embed::RustEmbed; use serde::Deserialize; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, time::interval}; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use crate::{ config::{WEB_HOST, WEB_UPDATE_INTERVAL}, grid, stream::Multipart, - AsyncResult, + AsyncResult, CLIENTS, COUNTER, }; #[derive(RustEmbed, Clone)] @@ -38,7 +38,8 @@ pub async fn serve(ctx: WebApiContext) -> AsyncResult { Some("index.html".to_string()), ); let app = Router::new() - .route("/imgstream", any(image_stream)) + .route("/imgstream", get(image_stream)) + .route("/stats", get(stats_stream)) .nest_service("/", assets) .with_state(ctx) // logging middleware @@ -85,6 +86,24 @@ fn make_image_stream( .throttle(WEB_UPDATE_INTERVAL) } +fn make_stats() -> Message { + let pixels: u64 = COUNTER.load(std::sync::atomic::Ordering::Relaxed); + let clients: u64 = CLIENTS.load(std::sync::atomic::Ordering::Relaxed); + format!("{{\"c\":{}, \"p\":{}}}", clients, pixels).into() +} + +async fn stats_stream(ws: WebSocketUpgrade) -> Response { + ws.on_upgrade(|mut c| async move { + let mut interval = interval(Duration::from_millis(100)); + loop { + interval.tick().await; + if let Err(e) = c.send(make_stats()).await { + tracing::warn!("websocket disconnected with {e:?}") + } + } + }) +} + async fn image_stream( user_agent: Option>, ConnectInfo(addr): ConnectInfo,