From 8c3ff179b27c20658a7d202786c799f77000885e Mon Sep 17 00:00:00 2001 From: Noa Aarts Date: Fri, 12 Jul 2024 23:42:46 +0200 Subject: [PATCH] wip: it should work, but doesn't yet --- Cargo.lock | 67 ++++++++++++++++++++++ Cargo.toml | 5 ++ src/main.rs | 157 +++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 203 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb77d2b..067fc05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,6 +378,8 @@ name = "flurry" version = "0.1.0" dependencies = [ "axum", + "futures", + "http-body-util", "image", "tokio", "tokio-test", @@ -398,6 +400,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -405,6 +422,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -413,6 +431,40 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + [[package]] name = "futures-task" version = "0.3.30" @@ -425,10 +477,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1228,6 +1286,15 @@ dependencies = [ "quote", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.13.2" diff --git a/Cargo.toml b/Cargo.toml index 33bc2af..afd5e8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,11 @@ edition = "2021" [dependencies] axum = { version = "0.7.5" } +futures = "0.3.30" +http-body-util = "0.1.2" image = "0.25.1" tokio = { version = "1.38", features = ["full"] } tokio-test = "*" + +[profile.dev] +opt-level = 1 diff --git a/src/main.rs b/src/main.rs index 400abfe..98a5914 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,14 +3,22 @@ use std::{ cell::SyncUnsafeCell, - io::{self, Error, ErrorKind}, + hash::{DefaultHasher, Hash, Hasher}, + io::{self, Cursor, Error, ErrorKind}, iter::once, - sync::Arc, + sync::{atomic::AtomicU64, Arc, RwLock}, time::Duration, }; -use axum::{routing::get, Router}; -use image::{GenericImageView, Rgb}; +use axum::{ + body::{Body, BodyDataStream}, + http::header, + response::IntoResponse, + routing::get, + Router, +}; +use futures::Stream; +use image::{GenericImageView, ImageBuffer, Rgb}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, net::TcpListener, @@ -90,6 +98,92 @@ impl Grid for FlutGrid { } } +struct ImageStreamer { + v: Arc>>>, + last: Arc>, +} + +impl Clone for ImageStreamer { + fn clone(&self) -> Self { + return ImageStreamer { + v: self.v.clone(), + last: self.last.clone(), + }; + } +} + +impl ImageStreamer { + fn init() -> ImageStreamer { + let v = Vec::new(); + let hasher = DefaultHasher::default(); + return ImageStreamer { + v: Arc::new(RwLock::new(Cursor::new(v))), + last: Arc::new(RwLock::new(hasher.finish())), + }; + } + + async fn start( + &self, + img_grids: &[FlutGrid; GRID_LENGTH], + grid_id: usize, + ) -> io::Result<()> { + println!("start called"); + let mut interval = tokio::time::interval(Duration::from_millis(25)); + loop { + interval.tick().await; + let mut hasher = DefaultHasher::default(); + img_grids[grid_id].cells.hash(&mut hasher); + if hasher.finish() == *self.last.read().unwrap() { + continue; + } else { + *self.last.write().unwrap() = hasher.finish(); + } + let img = img_grids[grid_id] + .view( + 0, + 0, + img_grids[grid_id].width(), + img_grids[grid_id].height(), + ) + .to_image(); + let mut new = self.v.write().unwrap(); + let _ = img.write_to(&mut *new, image::ImageFormat::Png); + } + } + + fn get_stream(&self) -> ImageStreamReader { + return ImageStreamReader { + v: self.v.clone(), + last: self.last.clone(), + last_attempt: 0, + }; + } +} + +struct ImageStreamReader { + v: Arc>>>, + last: Arc>, + last_attempt: u64, +} + +impl Stream for ImageStreamReader { + type Item = io::Result>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let now = *self.last.read().unwrap(); + if now == self.last_attempt { + return std::task::Poll::Pending; + } + self.last_attempt = now; + let r = self.v.read().unwrap(); + let v: Vec = (*r.clone().into_inner()).to_vec(); + return std::task::Poll::Ready(Some(Ok(v))); + } +} + const HELP_TEXT: u8 = 72; const SIZE_TEXT: u8 = 83; const PX_TEXT: u8 = 80; @@ -104,6 +198,8 @@ const SET_PX_W_BIN: u8 = 130; const SET_PX_RGB_BIN_LENGTH: usize = 8; const GRID_LENGTH: usize = 1; +static COUNTER: AtomicU64 = AtomicU64::new(0); + fn set_pixel_rgba(grids: &mut [FlutGrid], canvas: u8, x: u16, y: u16, rgb: u32) { match grids.get_mut(canvas as usize) { Some(grid) => grid.set(x, y, rgb), @@ -118,6 +214,10 @@ fn get_pixel(grids: &mut [FlutGrid], canvas: u8, x: u16, y: u16) -> Option< } } +fn increment_counter() { + COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); +} + async fn process_lock< R: AsyncReadExt + std::marker::Unpin, W: AsyncWriteExt + std::marker::Unpin, @@ -167,11 +267,14 @@ async fn process_lock< .map(|z| z) .collect::>(); match grids.get_mut(aa[0] as usize) { - Some(grid) => grid.set( - u16::from_le_bytes([aa[1], aa[2]]), - u16::from_le_bytes([aa[3], aa[4]]), - u32::from_be_bytes([aa[5], aa[6], aa[7], 0]), - ), + Some(grid) => { + grid.set( + u16::from_le_bytes([aa[1], aa[2]]), + u16::from_le_bytes([aa[3], aa[4]]), + u32::from_be_bytes([aa[5], aa[6], aa[7], 0]), + ); + increment_counter() + } None => (), } } @@ -231,6 +334,7 @@ async fn process_msg< let b = reader.read_u8().await?; let rgb = u32::from_be_bytes([r, g, b, 0xff]); set_pixel_rgba(grids, canvas, x, y, rgb); + increment_counter(); return Ok(()); } SET_PX_RGBA_BIN => todo!("SET rgba"), @@ -277,6 +381,12 @@ async fn root() -> &'static str { return "hiii"; } +async fn image_stream(is: ImageStreamReader) -> impl IntoResponse { + let header = [(header::CONTENT_TYPE, "image/jpeg")]; + + (header, Body::from_stream(is)) +} + #[tokio::main] async fn main() -> io::Result<()> { println!("Start initialisation"); @@ -292,26 +402,21 @@ async fn main() -> io::Result<()> { println!("bound web listener"); let img_asuc = asuc.clone(); + let img_grids = unsafe { img_asuc.get().as_ref().unwrap() }; + let streamer = ImageStreamer::init(); + let cstrm = streamer.clone(); let _ = tokio::spawn(async move { - let img_grids = unsafe { img_asuc.get().as_ref().unwrap() }; - loop { - let img = img_grids[0] - .view(0, 0, img_grids[0].width(), img_grids[0].height()) - .to_image(); - println!("starting save"); - match img.save("test.jpg") { - Ok(()) => (), - Err(err) => { - eprintln!("{}", err); - return; - } - }; - println!("finished save, sleeping"); - tokio::time::sleep(Duration::from_secs(1)).await; - } + println!("staring streming"); + let _ = cstrm.start(&img_grids, 0).await; }); + println!("streamer started"); - let app = Router::new().route("/", get(root)); + let app = Router::new().route("/", get(root)).route( + "/image", + get(move || { + return image_stream(streamer.clone().get_stream()); + }), + ); tokio::spawn(async move { axum::serve(web_listener, app).await }); println!("web server started");