diff --git a/Cargo.lock b/Cargo.lock index 067fc05..3f910b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -122,7 +122,7 @@ checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", - "bytes", + "bytes 1.6.0", "futures-util", "http", "http-body", @@ -155,7 +155,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" dependencies = [ "async-trait", - "bytes", + "bytes 1.6.0", "futures-util", "http", "http-body", @@ -238,6 +238,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "1.6.0" @@ -378,11 +388,15 @@ name = "flurry" version = "0.1.0" dependencies = [ "axum", - "futures", + "bytes 1.6.0", + "futures 0.3.30", + "futures-util", "http-body-util", "image", "tokio", + "tokio-io", "tokio-test", + "tokio-util", ] [[package]] @@ -400,6 +414,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.30" @@ -550,7 +570,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ - "bytes", + "bytes 1.6.0", "fnv", "itoa", ] @@ -561,7 +581,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ - "bytes", + "bytes 1.6.0", "http", ] @@ -571,7 +591,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ - "bytes", + "bytes 1.6.0", "futures-util", "http", "http-body", @@ -596,7 +616,7 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ - "bytes", + "bytes 1.6.0", "futures-channel", "futures-util", "http", @@ -615,7 +635,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" dependencies = [ - "bytes", + "bytes 1.6.0", "futures-util", "http", "http-body", @@ -684,6 +704,15 @@ dependencies = [ "syn", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "itertools" version = "0.12.1" @@ -1400,7 +1429,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", - "bytes", + "bytes 1.6.0", "libc", "mio", "num_cpus", @@ -1412,6 +1441,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "log", +] + [[package]] name = "tokio-macros" version = "2.3.0" @@ -1441,12 +1481,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" dependencies = [ "async-stream", - "bytes", + "bytes 1.6.0", "futures-core", "tokio", "tokio-stream", ] +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes 1.6.0", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.8.14" diff --git a/Cargo.toml b/Cargo.toml index afd5e8c..d6cdf8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,11 +5,15 @@ edition = "2021" [dependencies] axum = { version = "0.7.5" } +bytes = "1.6.0" futures = "0.3.30" +futures-util = "0.3.30" http-body-util = "0.1.2" image = "0.25.1" tokio = { version = "1.38", features = ["full"] } +tokio-io = "0.1.13" tokio-test = "*" +tokio-util = { version = "0.7.11", features = ["codec"] } [profile.dev] opt-level = 1 diff --git a/src/main.rs b/src/main.rs index 98a5914..4dfb507 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,19 +6,15 @@ use std::{ hash::{DefaultHasher, Hash, Hasher}, io::{self, Cursor, Error, ErrorKind}, iter::once, + pin::Pin, sync::{atomic::AtomicU64, Arc, RwLock}, + task::Poll, time::Duration, }; -use axum::{ - body::{Body, BodyDataStream}, - http::header, - response::IntoResponse, - routing::get, - Router, -}; use futures::Stream; -use image::{GenericImageView, ImageBuffer, Rgb}; +use hyper::body::Frame; +use image::{GenericImageView, Rgb}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, net::TcpListener, @@ -98,18 +94,11 @@ impl Grid for FlutGrid { } } +#[derive(Clone)] struct ImageStreamer { v: Arc>>>, last: Arc>, -} - -impl Clone for ImageStreamer { - fn clone(&self) -> Self { - return ImageStreamer { - v: self.v.clone(), - last: self.last.clone(), - }; - } + last_attempt: u64, } impl ImageStreamer { @@ -119,6 +108,7 @@ impl ImageStreamer { return ImageStreamer { v: Arc::new(RwLock::new(Cursor::new(v))), last: Arc::new(RwLock::new(hasher.finish())), + last_attempt: hasher.finish(), }; } @@ -138,6 +128,7 @@ impl ImageStreamer { } else { *self.last.write().unwrap() = hasher.finish(); } + println!("ticking..."); let img = img_grids[grid_id] .view( 0, @@ -150,37 +141,40 @@ impl ImageStreamer { let _ = img.write_to(&mut *new, image::ImageFormat::Png); } } +} - fn get_stream(&self) -> ImageStreamReader { - return ImageStreamReader { - v: self.v.clone(), - last: self.last.clone(), - last_attempt: 0, - }; +impl ImageStreamer { + fn has_next(&self) -> Poll { + let now = *self.last.read().unwrap(); + if now == self.last_attempt { + return Poll::Pending; + } + return Poll::Ready(now); } } -struct ImageStreamReader { - v: Arc>>>, - last: Arc>, - last_attempt: u64, -} - -impl Stream for ImageStreamReader { - type Item = io::Result>; +impl Stream for ImageStreamer { + type Item = io::Result>; fn poll_next( mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let now = *self.last.read().unwrap(); - if now == self.last_attempt { - return std::task::Poll::Pending; + loop { + let new_hash = match Pin::new(&mut self).has_next() { + Poll::Ready(i) => Some(i), + Poll::Pending => None, + }; + if new_hash.is_none() { + return Poll::Pending; + } + println!("got one"); + self.last_attempt = new_hash.unwrap(); + let r = self.v.read().unwrap(); + let v: Vec = (*r.clone().into_inner()).to_vec(); + + return Poll::Ready(Some(Ok(Frame::data(v.into())))); } - self.last_attempt = now; - let r = self.v.read().unwrap(); - let v: Vec = (*r.clone().into_inner()).to_vec(); - return std::task::Poll::Ready(Some(Ok(v))); } } @@ -377,14 +371,10 @@ where } } -async fn root() -> &'static str { - return "hiii"; -} - -async fn image_stream(is: ImageStreamReader) -> impl IntoResponse { - let header = [(header::CONTENT_TYPE, "image/jpeg")]; - - (header, Body::from_stream(is)) +async fn web(listener: TcpListener, image_streamer: ImageStreamer) -> io::Result<()> { + loop { + todo!("idk yet"); + } } #[tokio::main] @@ -398,9 +388,6 @@ async fn main() -> io::Result<()> { let flut_listener = TcpListener::bind("0.0.0.0:7791").await?; println!("bound flut listener"); - let web_listener = TcpListener::bind("0.0.0.0:7792").await?; - println!("bound web listener"); - let img_asuc = asuc.clone(); let img_grids = unsafe { img_asuc.get().as_ref().unwrap() }; let streamer = ImageStreamer::init(); @@ -411,13 +398,10 @@ async fn main() -> io::Result<()> { }); println!("streamer started"); - let app = Router::new().route("/", get(root)).route( - "/image", - get(move || { - return image_stream(streamer.clone().get_stream()); - }), - ); - tokio::spawn(async move { axum::serve(web_listener, app).await }); + let web_listener = TcpListener::bind("0.0.0.0:7792").await?; + println!("bound web listener"); + + let _ = tokio::spawn(async move { web(web_listener, streamer) }); println!("web server started"); loop {