add websocket stream for statistics

This commit is contained in:
Noa Aarts 2024-12-12 15:11:59 +01:00
parent 3520118336
commit 4adb9d4ec3
Signed by: noa
GPG key ID: 1850932741EFF672

View file

@ -1,10 +1,10 @@
use std::{net::SocketAddr, process::exit, sync::Arc}; use std::{net::SocketAddr, process::exit, sync::Arc, time::Duration};
use axum::{ use axum::{
extract::{ConnectInfo, Query, State}, extract::{ws::Message, ConnectInfo, Query, State, WebSocketUpgrade},
http::{self, HeaderMap, HeaderValue}, http::{self, HeaderMap, HeaderValue},
response::IntoResponse, response::{IntoResponse, Response},
routing::any, routing::get,
Router, Router,
}; };
use axum_extra::TypedHeader; use axum_extra::TypedHeader;
@ -12,14 +12,14 @@ use axum_streams::StreamBodyAs;
use futures::{never::Never, stream::repeat_with, Stream}; use futures::{never::Never, stream::repeat_with, Stream};
use rust_embed::RustEmbed; use rust_embed::RustEmbed;
use serde::Deserialize; use serde::Deserialize;
use tokio::net::TcpListener; use tokio::{net::TcpListener, time::interval};
use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use crate::{ use crate::{
config::{WEB_HOST, WEB_UPDATE_INTERVAL}, config::{WEB_HOST, WEB_UPDATE_INTERVAL},
grid, grid,
stream::Multipart, stream::Multipart,
AsyncResult, AsyncResult, CLIENTS, COUNTER,
}; };
#[derive(RustEmbed, Clone)] #[derive(RustEmbed, Clone)]
@ -38,7 +38,8 @@ pub async fn serve(ctx: WebApiContext) -> AsyncResult<Never> {
Some("index.html".to_string()), Some("index.html".to_string()),
); );
let app = Router::new() let app = Router::new()
.route("/imgstream", any(image_stream)) .route("/imgstream", get(image_stream))
.route("/stats", get(stats_stream))
.nest_service("/", assets) .nest_service("/", assets)
.with_state(ctx) .with_state(ctx)
// logging middleware // logging middleware
@ -85,6 +86,24 @@ fn make_image_stream(
.throttle(WEB_UPDATE_INTERVAL) .throttle(WEB_UPDATE_INTERVAL)
} }
fn make_stats() -> Message {
let pixels: u64 = COUNTER.load(std::sync::atomic::Ordering::Relaxed);
let clients: u64 = CLIENTS.load(std::sync::atomic::Ordering::Relaxed);
format!("{{\"c\":{}, \"p\":{}}}", clients, pixels).into()
}
async fn stats_stream(ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(|mut c| async move {
let mut interval = interval(Duration::from_millis(100));
loop {
interval.tick().await;
if let Err(e) = c.send(make_stats()).await {
tracing::warn!("websocket disconnected with {e:?}")
}
}
})
}
async fn image_stream( async fn image_stream(
user_agent: Option<TypedHeader<headers::UserAgent>>, user_agent: Option<TypedHeader<headers::UserAgent>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>, ConnectInfo(addr): ConnectInfo<SocketAddr>,