feat: Website (#15)

Add a basic website that shows the current flutgrid state
Co-authored-by: Noa Aarts <noa@voorwaarts.nl>
This commit is contained in:
peppidesu 2024-10-22 21:58:25 +02:00 committed by GitHub
parent 6c9de45c6a
commit 7f04b39a15
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1095 additions and 75 deletions

815
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,13 +6,24 @@ edition = "2021"
[dependencies]
async-trait = "0.1.83"
atoi_radix10 = "0.0.1"
axum = { version = "0.7.7", features = ["ws"] }
axum-extra = { version = "0.9.4", features = ["typed-header"] }
axum-streams = "0.19.0"
bytes = "1.6.0"
chrono = "0.4.38"
debug_print = "1.0.0"
futures = "0.3.31"
futures-util = { version = "0.3.31", features = ["sink", "std"] }
headers = "0.4.0"
image = "0.25.2"
rand = "*"
serde = { version = "1.0.210", features = ["derive"] }
tokio = { version = "1.38", features = ["full"] }
tokio-stream = "0.1.16"
tokio-test = "*"
tower-http = { version = "0.6.1", features = ["fs", "trace"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
[dev-dependencies]
tempfile = "*"

View file

@ -1,9 +1,11 @@
use std::time::Duration;
pub const GRID_LENGTH: usize = 1;
pub const HOST: &str = "0.0.0.0:7791";
pub const HOST: &str = "127.0.0.1:7791";
pub const WEB_HOST: &str = "127.0.0.1:3000";
pub const IMAGE_SAVE_INTERVAL: Duration = Duration::from_secs(5);
pub const JPEG_UPDATE_INTERVAL: Duration = Duration::from_millis(20);
pub const JPEG_UPDATE_INTERVAL: Duration = Duration::from_millis(17);
pub const WEB_UPDATE_INTERVAL: Duration = Duration::from_millis(50);
pub const HELP_TEXT: &[u8] = b"Flurry is a pixelflut implementation, this means you can use commands to get and set pixels in the canvas
SIZE returns the size of the canvas

View file

@ -56,7 +56,6 @@ where
W: AsyncWriteExt + std::marker::Unpin,
{
async fn help_command(&mut self) -> io::Result<()> {
println!("HELP wanted");
match_parser!(parser: self.parser => parser.unparse(Response::Help, &mut self.writer).await?);
self.writer.flush().await?;

View file

@ -1,7 +1,9 @@
use std::cell::SyncUnsafeCell;
use std::{
cell::SyncUnsafeCell,
sync::{RwLock, RwLockReadGuard},
};
use image::{GenericImageView, Rgb};
use tokio::sync::{RwLock, RwLockReadGuard};
use crate::Coordinate;
@ -16,7 +18,7 @@ pub struct Flut<T> {
size_x: usize,
size_y: usize,
cells: SyncUnsafeCell<Vec<T>>,
jpgbuf: RwLock<Vec<u8>>
jpgbuf: RwLock<Vec<u8>>,
}
impl<T: Clone> Flut<T> {
@ -29,7 +31,7 @@ impl<T: Clone> Flut<T> {
size_x,
size_y,
cells: vec.into(),
jpgbuf: RwLock::new(Vec::new())
jpgbuf: RwLock::new(Vec::new()),
}
}
@ -47,8 +49,8 @@ impl<T> Flut<T> {
}
Some((y * self.size_x) + x)
}
pub async fn read_jpg_buffer(&self) -> RwLockReadGuard<'_, Vec<u8>> {
self.jpgbuf.read().await
pub fn read_jpg_buffer(&self) -> RwLockReadGuard<'_, Vec<u8>> {
self.jpgbuf.read().expect("RWlock didn't exit nicely")
}
}
@ -84,19 +86,18 @@ impl GenericImageView for Flut<u32> {
let [r, g, b, _a] = pixel.to_be_bytes();
Rgb::from([r, g, b])
}
}
impl Flut<u32> {
pub async fn update_jpg_buffer(&self) {
let mut jpgbuf = self.jpgbuf.write().await;
pub fn update_jpg_buffer(&self) {
let mut jpgbuf = self.jpgbuf.write().expect("Could not get write RWlock");
jpgbuf.clear();
let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut *jpgbuf, 50);
let subimage = self.view(0, 0, self.width(), self.height()).to_image();
match subimage.write_with_encoder(encoder) {
Ok(_) => {}
Err(err) => eprintln!("{}", err),
}
Err(err) => tracing::error!("Error writing jpeg buffer: {:?}", err),
}
}
}

View file

@ -11,7 +11,9 @@ pub mod config;
pub mod flutclient;
pub mod grid;
pub mod protocols;
pub(crate) mod stream;
pub mod utils;
pub mod webapi;
mod color;
@ -20,6 +22,8 @@ pub type Coordinate = u16;
pub static COUNTER: AtomicU64 = AtomicU64::new(0);
pub type AsyncResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
fn set_pixel_rgba(
grids: &[grid::Flut<u32>],
canvas: Canvas,

View file

@ -1,8 +1,8 @@
use std::{
convert::Infallible,
fs::{create_dir_all, File},
io::{self, Write as _},
io::Write as _,
path::Path,
process::exit,
sync::Arc,
time::Duration,
};
@ -12,21 +12,20 @@ use flurry::{
config::{GRID_LENGTH, HOST, IMAGE_SAVE_INTERVAL, JPEG_UPDATE_INTERVAL},
flutclient::FlutClient,
grid::{self, Flut},
COUNTER,
webapi::WebApiContext,
AsyncResult, COUNTER,
};
use tokio::{
net::TcpListener,
time::interval
};
type Never = Infallible;
use futures::never::Never;
use tokio::{net::TcpListener, time::interval, try_join};
use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _};
/// This function logs the current amount of changed pixels to stdout every second
async fn pixel_change_stdout_log() -> io::Result<Never> {
async fn pixel_change_stdout_log() -> AsyncResult<Never> {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop {
interval.tick().await;
let cnt = COUNTER.load(std::sync::atomic::Ordering::Relaxed);
println!("{cnt} pixels were changed");
tracing::info!("{cnt} pixels changed");
}
}
@ -36,7 +35,10 @@ async fn pixel_change_stdout_log() -> io::Result<Never> {
/// # Errors
///
/// This function will return an error if it is unable to create or write to the file for the image
async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>, duration: Duration) -> io::Result<Never> {
async fn save_image_frames(
grids: Arc<[grid::Flut<u32>; GRID_LENGTH]>,
duration: Duration,
) -> AsyncResult<Never> {
let base_dir = Path::new("./recordings");
let mut timer = interval(duration);
create_dir_all(base_dir)?;
@ -46,7 +48,7 @@ async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>, duration: Duration) ->
let p = base_dir.join(format!("{}", Local::now().format("%Y-%m-%d_%H-%M-%S.jpg")));
let mut file_writer = File::create(p)?;
file_writer.write_all(&grid.read_jpg_buffer().await)?;
file_writer.write_all(&grid.read_jpg_buffer())?;
}
}
}
@ -54,7 +56,10 @@ async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>, duration: Duration) ->
/// Handle connections made to the socket, keeps a vec of the currently active connections,
/// uses timeout to loop through them and clean them up to stop a memory leak while not throwing
/// everything away
async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut<u32>]>) -> io::Result<Never> {
async fn handle_flut(
flut_listener: TcpListener,
grids: Arc<[grid::Flut<u32>]>,
) -> AsyncResult<Never> {
let mut handles = Vec::new();
loop {
let (mut socket, _) = flut_listener.accept().await?;
@ -71,12 +76,12 @@ async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut<u32>]>)
}
}
async fn jpeg_update_loop(grids: Arc<[Flut<u32>]>) -> io::Result<Never> {
async fn jpeg_update_loop(grids: Arc<[Flut<u32>]>) -> AsyncResult<Never> {
let mut interval = interval(JPEG_UPDATE_INTERVAL);
loop {
interval.tick().await;
for grid in grids.as_ref() {
grid.update_jpg_buffer().await;
grid.update_jpg_buffer();
}
}
}
@ -84,23 +89,41 @@ async fn jpeg_update_loop(grids: Arc<[Flut<u32>]>) -> io::Result<Never> {
#[tokio::main]
#[allow(clippy::needless_return)]
async fn main() {
// diagnostics
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
format!("{}=debug,tower_http=debug", env!("CARGO_CRATE_NAME")).into()
}),
)
.with(tracing_subscriber::fmt::layer())
.init();
let grids: Arc<[Flut<u32>; GRID_LENGTH]> = [grid::Flut::init(800, 600, 0xff_00_ff_ff)].into();
println!("created grids");
tracing::trace!("created grids");
let Ok(flut_listener) = TcpListener::bind(HOST).await else {
eprintln!("Was unable to bind to {HOST}, please check if a different process is bound");
return;
tracing::error!(
"Was unable to bind to {HOST}, please check if a different process is bound"
);
exit(1);
};
println!("bound flut listener");
tracing::info!("Started TCP listener on {HOST}");
let handles = vec![
(tokio::spawn(pixel_change_stdout_log())),
(tokio::spawn(save_image_frames(grids.clone(), IMAGE_SAVE_INTERVAL))),
(tokio::spawn(handle_flut(flut_listener, grids.clone()))),
(tokio::spawn(jpeg_update_loop(grids.clone())))
];
let pixel_logger = tokio::spawn(pixel_change_stdout_log());
let snapshots = tokio::spawn(save_image_frames(grids.clone(), IMAGE_SAVE_INTERVAL));
let pixelflut_server = tokio::spawn(handle_flut(flut_listener, grids.clone()));
let jpeg_update_loop = tokio::spawn(jpeg_update_loop(grids.clone()));
let website = tokio::spawn(flurry::webapi::serve(WebApiContext {
grids: grids.clone(),
}));
for handle in handles {
println!("joined handle had result {:?}", handle.await);
}
let res = try_join! {
pixel_logger,
snapshots,
pixelflut_server,
jpeg_update_loop,
website,
};
tracing::error!("something went wrong {:?}", res);
}

View file

@ -74,12 +74,12 @@ impl<R: AsyncBufRead + AsyncBufReadExt + std::marker::Unpin> Parser<R> for Binar
))
}
_ => {
eprintln!("received illegal command: {command}");
tracing::error!("received illegal command: {command}");
Err(Error::from(ErrorKind::InvalidInput))
}
},
Err(err) => {
eprintln!("{err}");
tracing::error!("{err}");
Err(err)
}
}

106
src/stream.rs Normal file
View file

@ -0,0 +1,106 @@
use axum::http::{self, HeaderMap, HeaderValue};
use axum_streams::StreamingFormat;
use futures::StreamExt;
use rand::{distributions::Standard, thread_rng, Rng};
pub(crate) struct Multipart {
first: bool,
boundary: Vec<u8>,
headers: HeaderMap,
}
impl Multipart {
pub(crate) fn new(boundary_length: usize, headers: HeaderMap) -> Self {
let boundary = thread_rng()
.sample_iter(Standard)
.filter(|c| match c {
32..127 | 128..=255 => true,
0..32 | 127 => false,
})
.take(boundary_length)
.collect();
Multipart {
first: false,
boundary,
headers,
}
}
}
impl<T> StreamingFormat<T> for Multipart
where
T: Send + Sync + IntoIterator<Item = u8> + 'static,
{
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: futures::stream::BoxStream<'b, Result<T, axum::Error>>,
_options: &'a axum_streams::StreamBodyAsOptions,
) -> futures::stream::BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
fn write_multipart_frame<T>(
obj: T,
boundary: Vec<u8>,
headers: HeaderMap,
first: bool,
) -> Result<Vec<u8>, axum::Error>
where
T: IntoIterator<Item = u8>,
{
let mut frame_vec = Vec::new();
if first {
frame_vec.extend_from_slice(b"--");
} else {
frame_vec.extend_from_slice(b"\r\n--");
}
frame_vec.extend(boundary);
frame_vec.extend_from_slice(b"\r\n");
for (header_name, header_value) in headers {
match header_name {
Some(header) => {
frame_vec.extend_from_slice(header.as_str().as_bytes());
frame_vec.extend_from_slice(b": ");
frame_vec.extend_from_slice(header_value.as_bytes());
frame_vec.extend_from_slice(b"\r\n");
}
None => todo!(),
}
}
frame_vec.extend_from_slice(b"\r\n");
frame_vec.extend(obj);
Ok(frame_vec)
}
let boundary = self.boundary.clone();
let headers = self.headers.clone();
let first = self.first;
Box::pin({
stream.map(move |obj_res| match obj_res {
Err(e) => Err(e),
Ok(obj) => {
let picture_framed =
write_multipart_frame(obj, boundary.clone(), headers.clone(), first);
picture_framed.map(axum::body::Bytes::from)
}
})
})
}
fn http_response_headers(
&self,
_options: &axum_streams::StreamBodyAsOptions,
) -> Option<axum::http::HeaderMap> {
let mut header_map = HeaderMap::new();
let mut multipart: Vec<u8> = "multipart/x-mixed-replace; boundary=".into();
multipart.extend_from_slice(&self.boundary);
header_map.insert(
http::header::CONTENT_TYPE,
HeaderValue::from_bytes(multipart.as_slice()).unwrap(),
);
Some(header_map)
}
}

96
src/webapi.rs Normal file
View file

@ -0,0 +1,96 @@
use std::{net::SocketAddr, process::exit, sync::Arc};
use axum::{
extract::{ConnectInfo, Query, State},
http::{self, HeaderMap, HeaderValue},
response::IntoResponse,
routing::any,
Router,
};
use axum_extra::TypedHeader;
use axum_streams::StreamBodyAs;
use futures::{never::Never, stream::repeat_with, Stream};
use serde::Deserialize;
use tokio::net::TcpListener;
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use crate::{
config::{WEB_HOST, WEB_UPDATE_INTERVAL},
grid,
stream::Multipart,
AsyncResult,
};
#[derive(Clone)]
pub struct WebApiContext {
pub grids: Arc<[grid::Flut<u32>]>,
}
pub async fn serve(ctx: WebApiContext) -> AsyncResult<Never> {
let app = Router::new()
.route("/imgstream", any(image_stream))
.with_state(ctx)
// logging middleware
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
// run it with hyper
let Ok(listener) = TcpListener::bind(WEB_HOST).await else {
tracing::error!(
"Was unable to bind to {WEB_HOST}, please check if a different process is bound"
);
exit(1);
};
tracing::debug!("listening on {}", listener.local_addr()?);
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await?;
Err("Web api exited".into())
}
#[derive(Debug, Deserialize)]
struct CanvasQuery {
canvas: u8,
}
fn make_image_stream(
ctx: WebApiContext,
canvas: u8,
) -> impl Stream<Item = Result<Vec<u8>, axum::Error>> {
use tokio_stream::StreamExt;
let mut buf = Vec::new();
repeat_with(move || {
buf.clear();
buf.extend_from_slice(&ctx.grids[canvas as usize].read_jpg_buffer());
Ok(buf.clone())
})
.throttle(WEB_UPDATE_INTERVAL)
}
async fn image_stream(
user_agent: Option<TypedHeader<headers::UserAgent>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(ctx): State<WebApiContext>,
Query(CanvasQuery { canvas }): Query<CanvasQuery>,
) -> impl IntoResponse {
let user_agent = if let Some(TypedHeader(user_agent)) = user_agent {
user_agent.to_string()
} else {
String::from("Unknown browser")
};
tracing::info!("`{user_agent}` at {addr} connected.");
let mut headers = HeaderMap::new();
headers.insert(
http::header::CONTENT_TYPE,
HeaderValue::from_static("image/jpeg"),
);
StreamBodyAs::new(Multipart::new(10, headers), make_image_stream(ctx, canvas))
}

12
test.html Normal file
View file

@ -0,0 +1,12 @@
<!DOCTYPE html>
<html>
<head>
<title>Test</title>
<script src="test.js"></script>
</head>
<body>
<h1>Test</h1>
<p>Test</p>
<img id="image"></img>
</body>
</html>

13
test.js Normal file
View file

@ -0,0 +1,13 @@
const ws = new WebSocket("ws://127.0.0.1:3000/imgstream?canvas=0");
console.log("Connecting");
ws.onopen = () => {
console.log("Connected");
}
ws.onmessage = (msg) => {
var reader = new FileReader();
reader.readAsDataURL(msg.data);
reader.onloadend = function() {
var base64data = reader.result;
document.getElementById("image").src = base64data;
}
}