wip: it should work, but doesn't yet

This commit is contained in:
Noa Aarts 2024-07-12 23:42:46 +02:00
parent 43f4147b2a
commit 8c3ff179b2
3 changed files with 203 additions and 26 deletions

View file

@ -3,14 +3,22 @@
use std::{
cell::SyncUnsafeCell,
io::{self, Error, ErrorKind},
hash::{DefaultHasher, Hash, Hasher},
io::{self, Cursor, Error, ErrorKind},
iter::once,
sync::Arc,
sync::{atomic::AtomicU64, Arc, RwLock},
time::Duration,
};
use axum::{routing::get, Router};
use image::{GenericImageView, Rgb};
use axum::{
body::{Body, BodyDataStream},
http::header,
response::IntoResponse,
routing::get,
Router,
};
use futures::Stream;
use image::{GenericImageView, ImageBuffer, Rgb};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::TcpListener,
@ -90,6 +98,92 @@ impl<T> Grid<u16, T> for FlutGrid<T> {
}
}
struct ImageStreamer {
v: Arc<RwLock<Cursor<Vec<u8>>>>,
last: Arc<RwLock<u64>>,
}
impl Clone for ImageStreamer {
fn clone(&self) -> Self {
return ImageStreamer {
v: self.v.clone(),
last: self.last.clone(),
};
}
}
impl ImageStreamer {
fn init() -> ImageStreamer {
let v = Vec::new();
let hasher = DefaultHasher::default();
return ImageStreamer {
v: Arc::new(RwLock::new(Cursor::new(v))),
last: Arc::new(RwLock::new(hasher.finish())),
};
}
async fn start(
&self,
img_grids: &[FlutGrid<u32>; GRID_LENGTH],
grid_id: usize,
) -> io::Result<()> {
println!("start called");
let mut interval = tokio::time::interval(Duration::from_millis(25));
loop {
interval.tick().await;
let mut hasher = DefaultHasher::default();
img_grids[grid_id].cells.hash(&mut hasher);
if hasher.finish() == *self.last.read().unwrap() {
continue;
} else {
*self.last.write().unwrap() = hasher.finish();
}
let img = img_grids[grid_id]
.view(
0,
0,
img_grids[grid_id].width(),
img_grids[grid_id].height(),
)
.to_image();
let mut new = self.v.write().unwrap();
let _ = img.write_to(&mut *new, image::ImageFormat::Png);
}
}
fn get_stream(&self) -> ImageStreamReader {
return ImageStreamReader {
v: self.v.clone(),
last: self.last.clone(),
last_attempt: 0,
};
}
}
struct ImageStreamReader {
v: Arc<RwLock<Cursor<Vec<u8>>>>,
last: Arc<RwLock<u64>>,
last_attempt: u64,
}
impl Stream for ImageStreamReader {
type Item = io::Result<Vec<u8>>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let now = *self.last.read().unwrap();
if now == self.last_attempt {
return std::task::Poll::Pending;
}
self.last_attempt = now;
let r = self.v.read().unwrap();
let v: Vec<u8> = (*r.clone().into_inner()).to_vec();
return std::task::Poll::Ready(Some(Ok(v)));
}
}
const HELP_TEXT: u8 = 72;
const SIZE_TEXT: u8 = 83;
const PX_TEXT: u8 = 80;
@ -104,6 +198,8 @@ const SET_PX_W_BIN: u8 = 130;
const SET_PX_RGB_BIN_LENGTH: usize = 8;
const GRID_LENGTH: usize = 1;
static COUNTER: AtomicU64 = AtomicU64::new(0);
fn set_pixel_rgba(grids: &mut [FlutGrid<u32>], canvas: u8, x: u16, y: u16, rgb: u32) {
match grids.get_mut(canvas as usize) {
Some(grid) => grid.set(x, y, rgb),
@ -118,6 +214,10 @@ fn get_pixel(grids: &mut [FlutGrid<u32>], canvas: u8, x: u16, y: u16) -> Option<
}
}
fn increment_counter() {
COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
async fn process_lock<
R: AsyncReadExt + std::marker::Unpin,
W: AsyncWriteExt + std::marker::Unpin,
@ -167,11 +267,14 @@ async fn process_lock<
.map(|z| z)
.collect::<Vec<_>>();
match grids.get_mut(aa[0] as usize) {
Some(grid) => grid.set(
u16::from_le_bytes([aa[1], aa[2]]),
u16::from_le_bytes([aa[3], aa[4]]),
u32::from_be_bytes([aa[5], aa[6], aa[7], 0]),
),
Some(grid) => {
grid.set(
u16::from_le_bytes([aa[1], aa[2]]),
u16::from_le_bytes([aa[3], aa[4]]),
u32::from_be_bytes([aa[5], aa[6], aa[7], 0]),
);
increment_counter()
}
None => (),
}
}
@ -231,6 +334,7 @@ async fn process_msg<
let b = reader.read_u8().await?;
let rgb = u32::from_be_bytes([r, g, b, 0xff]);
set_pixel_rgba(grids, canvas, x, y, rgb);
increment_counter();
return Ok(());
}
SET_PX_RGBA_BIN => todo!("SET rgba"),
@ -277,6 +381,12 @@ async fn root() -> &'static str {
return "hiii";
}
async fn image_stream(is: ImageStreamReader) -> impl IntoResponse {
let header = [(header::CONTENT_TYPE, "image/jpeg")];
(header, Body::from_stream(is))
}
#[tokio::main]
async fn main() -> io::Result<()> {
println!("Start initialisation");
@ -292,26 +402,21 @@ async fn main() -> io::Result<()> {
println!("bound web listener");
let img_asuc = asuc.clone();
let img_grids = unsafe { img_asuc.get().as_ref().unwrap() };
let streamer = ImageStreamer::init();
let cstrm = streamer.clone();
let _ = tokio::spawn(async move {
let img_grids = unsafe { img_asuc.get().as_ref().unwrap() };
loop {
let img = img_grids[0]
.view(0, 0, img_grids[0].width(), img_grids[0].height())
.to_image();
println!("starting save");
match img.save("test.jpg") {
Ok(()) => (),
Err(err) => {
eprintln!("{}", err);
return;
}
};
println!("finished save, sleeping");
tokio::time::sleep(Duration::from_secs(1)).await;
}
println!("staring streming");
let _ = cstrm.start(&img_grids, 0).await;
});
println!("streamer started");
let app = Router::new().route("/", get(root));
let app = Router::new().route("/", get(root)).route(
"/image",
get(move || {
return image_stream(streamer.clone().get_stream());
}),
);
tokio::spawn(async move { axum::serve(web_listener, app).await });
println!("web server started");