fix: eof flushes buffer

This commit is contained in:
Noa Aarts 2024-07-12 17:18:09 +02:00
parent c8df06aa7e
commit 39e3ffbeb9
3 changed files with 90 additions and 53 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target /target
.direnv/

View file

@ -19,6 +19,7 @@
pkgs.mkShell { pkgs.mkShell {
buildInputs = [ buildInputs = [
pkgs.rustup pkgs.rustup
pkgs.wgo
]; ];
}); });
}; };

View file

@ -4,13 +4,12 @@
use std::{ use std::{
cell::SyncUnsafeCell, cell::SyncUnsafeCell,
io::{self, Error, ErrorKind}, io::{self, Error, ErrorKind},
iter::{self, once}, iter::once,
sync::Arc, sync::Arc,
usize, vec,
}; };
use tokio::{ use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufStream}, io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::TcpListener, net::TcpListener,
}; };
@ -95,11 +94,17 @@ fn get_pixel(grids: &mut [FlutGrid<u32>], canvas: u8, x: u16, y: u16) -> Option<
} }
} }
async fn process_msg<T: AsyncReadExt + AsyncWriteExt + std::marker::Unpin>( async fn process_msg<
stream: &mut T, R: AsyncReadExt + std::marker::Unpin,
W: AsyncWriteExt + std::marker::Unpin,
>(
reader: &mut R,
writer: &mut W,
grids: &mut [FlutGrid<u32>; GRID_LENGTH], grids: &mut [FlutGrid<u32>; GRID_LENGTH],
) -> io::Result<()> { ) -> io::Result<()> {
match stream.read_u8().await { let fst = reader.read_u8().await;
println!("first byte is {:?}", fst);
match fst {
Ok(i) => match i { Ok(i) => match i {
HELP_TEXT => todo!("HELP command check and message"), HELP_TEXT => todo!("HELP command check and message"),
SIZE_TEXT => todo!("SIZE command check and message"), SIZE_TEXT => todo!("SIZE command check and message"),
@ -107,11 +112,11 @@ async fn process_msg<T: AsyncReadExt + AsyncWriteExt + std::marker::Unpin>(
HELP_BIN => todo!("HELP command message"), HELP_BIN => todo!("HELP command message"),
SIZE_BIN => todo!("SIZE command check and message"), SIZE_BIN => todo!("SIZE command check and message"),
LOCK => { LOCK => {
let amount = stream.read_u16_le().await?; let amount = reader.read_u16_le().await?;
let command = stream.read_u8().await?; let command = reader.read_u8().await?;
let lockmask = stream.read_u16().await?; let lockmask = reader.read_u16().await?;
let mut buf = vec![0; lockmask.count_ones() as usize]; let mut buf = vec![0; lockmask.count_ones() as usize];
let statics = stream.read_exact(&mut buf).await?; let statics = reader.read_exact(&mut buf).await?;
match command { match command {
GET_PX_BIN => todo!("GET pixel lock"), GET_PX_BIN => todo!("GET pixel lock"),
@ -132,9 +137,9 @@ async fn process_msg<T: AsyncReadExt + AsyncWriteExt + std::marker::Unpin>(
}) })
.collect(); .collect();
let mut mod_buf: Vec<u8> = vec![0; per]; let mut mod_buf: Vec<u8> = vec![0; per];
for z in 0..amount { for _ in 0..amount {
a = 0; a = 0;
let _ = stream.read_exact(&mut mod_buf).await?; let _ = reader.read_exact(&mut mod_buf).await?;
let aa = static_buf let aa = static_buf
.iter() .iter()
.map(|x| *match x { .map(|x| *match x {
@ -159,39 +164,40 @@ async fn process_msg<T: AsyncReadExt + AsyncWriteExt + std::marker::Unpin>(
} }
SET_PX_RGBA_BIN => todo!("Set rgba lock"), SET_PX_RGBA_BIN => todo!("Set rgba lock"),
SET_PX_W_BIN => todo!("set w lock"), SET_PX_W_BIN => todo!("set w lock"),
_ => return Err(Error::from(ErrorKind::InvalidInput)), _ => {
eprintln!("not a cmd");
return Err(Error::from(ErrorKind::InvalidInput));
}
} }
return Ok(()); return Ok(());
} }
GET_PX_BIN => { GET_PX_BIN => {
let canvas = stream.read_u8().await?; let canvas = reader.read_u8().await?;
let x = stream.read_u16_le().await?; let x = reader.read_u16_le().await?;
let y = stream.read_u16_le().await?; let y = reader.read_u16_le().await?;
match get_pixel(grids, canvas, x, y) { match get_pixel(grids, canvas, x, y) {
None => (), None => (),
Some(color) => { Some(color) => {
stream let towrite = &once(GET_PX_BIN)
.write_all( .chain(once(canvas))
&once(canvas) .chain(x.to_le_bytes())
.chain(x.to_le_bytes()) .chain(y.to_le_bytes())
.chain(y.to_le_bytes()) .chain(color.to_be_bytes().into_iter().skip(1))
.chain(color.to_be_bytes()) .collect::<Vec<_>>();
.collect::<Vec<_>>(), println!("to write {:?}", towrite);
) writer.write_all(towrite).await?;
.await?;
()
} }
} }
return Ok(()); return Ok(());
} }
SET_PX_RGB_BIN => { SET_PX_RGB_BIN => {
let canvas = stream.read_u8().await?; let canvas = reader.read_u8().await?;
let x = stream.read_u16_le().await?; let x = reader.read_u16_le().await?;
let y = stream.read_u16_le().await?; let y = reader.read_u16_le().await?;
let r = stream.read_u8().await?; let r = reader.read_u8().await?;
let g = stream.read_u8().await?; let g = reader.read_u8().await?;
let b = stream.read_u8().await?; let b = reader.read_u8().await?;
let rgb = (r as u32) << 24 | (g as u32) << 16 | (b as u32) << 8; let rgb = (r as u32) << 24 | (g as u32) << 16 | (b as u32) << 8;
set_pixel_rgba(grids, canvas, x, y, rgb); set_pixel_rgba(grids, canvas, x, y, rgb);
return Ok(()); return Ok(());
@ -210,15 +216,31 @@ async fn process_msg<T: AsyncReadExt + AsyncWriteExt + std::marker::Unpin>(
} }
} }
async fn process_socket<T: AsyncRead + AsyncWrite + std::marker::Unpin>( async fn process_socket<W, R>(
socket: T, reader: R,
writer: W,
grids: &mut [FlutGrid<u32>; GRID_LENGTH], grids: &mut [FlutGrid<u32>; GRID_LENGTH],
) -> io::Result<()> { ) -> io::Result<()>
let mut stream = BufStream::new(socket); where
W: AsyncWriteExt + std::marker::Unpin,
R: AsyncReadExt + std::marker::Unpin,
{
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
loop { loop {
match process_msg(&mut stream, grids).await { println!("processing next...");
Ok(()) => (), match process_msg(&mut reader, &mut writer, grids).await {
Err(e) => return Err(e), Ok(()) => {
println!("msg was ok");
}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
let _ = writer.flush().await;
return Ok(());
}
Err(e) => {
eprintln!("error with kind {}", e.kind());
return Err(e);
}
} }
} }
} }
@ -234,11 +256,12 @@ async fn main() -> io::Result<()> {
let asuc = Arc::new(SyncUnsafeCell::new(grids)); let asuc = Arc::new(SyncUnsafeCell::new(grids));
loop { loop {
let (socket, _) = flut_listener.accept().await?; let (mut socket, _) = flut_listener.accept().await?;
let asuc = asuc.clone(); let asuc = asuc.clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
let grids = unsafe { asuc.get().as_mut().unwrap() }; let grids = unsafe { asuc.get().as_mut().unwrap() };
match process_socket(socket, grids).await { let (reader, writer) = socket.split();
match process_socket(reader, writer, grids).await {
Ok(()) => return Ok(()), Ok(()) => return Ok(()),
Err(err) => return Err(err), Err(err) => return Err(err),
} }
@ -250,6 +273,7 @@ async fn main() -> io::Result<()> {
mod tests { mod tests {
use super::*; use super::*;
use test::Bencher; use test::Bencher;
use tokio_test::assert_ok;
#[tokio::test] #[tokio::test]
async fn test_grid_init_values() { async fn test_grid_init_values() {
@ -326,15 +350,12 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_set_rgb_bin() { async fn test_set_rgb_bin() {
let mut grids = [FlutGrid::init(800, 600, 0xFF00FF)]; let mut grids = [FlutGrid::init(800, 600, 0xFF00FF)];
let writer = tokio_test::io::Builder::new() let reader = tokio_test::io::Builder::new()
.read(&[SET_PX_RGB_BIN, 0, 16, 0, 32, 0, 0, 0, 0]) .read(&[SET_PX_RGB_BIN, 0, 16, 0, 32, 0, 0, 0, 0])
.read(&[SET_PX_RGB_BIN, 0, 16, 0, 33, 0, 2, 3, 5]) .read(&[SET_PX_RGB_BIN, 0, 16, 0, 33, 0, 2, 3, 5])
.build(); .build();
let res = process_socket(writer, &mut grids).await; let writer = tokio_test::io::Builder::new().build();
match res { assert_ok!(process_socket(reader, writer, &mut grids).await);
Ok(()) => (),
Err(err) => eprintln!("{}", err),
};
assert_eq!(grids[0].get(16, 32), Some(&0x00000000)); assert_eq!(grids[0].get(16, 32), Some(&0x00000000));
assert_eq!(grids[0].get(16, 33), Some(&0x02030500)); assert_eq!(grids[0].get(16, 33), Some(&0x02030500));
} }
@ -342,7 +363,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_set_rgb_lock() { async fn test_set_rgb_lock() {
let mut grids = [FlutGrid::init(800, 600, 0xFF00FF)]; let mut grids = [FlutGrid::init(800, 600, 0xFF00FF)];
let writer = tokio_test::io::Builder::new() let reader = tokio_test::io::Builder::new()
.read(&[ .read(&[
LOCK, LOCK,
3, 3,
@ -361,14 +382,28 @@ mod tests {
.read(&[101, 5]) .read(&[101, 5])
.read(&[102, 6]) .read(&[102, 6])
.build(); .build();
let res = process_socket(writer, &mut grids).await; let writer = tokio_test::io::Builder::new().build();
match res { assert_ok!(process_socket(reader, writer, &mut grids).await);
Ok(()) => (),
Err(err) => eprintln!("{}", err),
};
assert_eq!(grids[0].get(100, 0), Some(&0x02030400)); assert_eq!(grids[0].get(100, 0), Some(&0x02030400));
assert_eq!(grids[0].get(101, 0), Some(&0x02030500)); assert_eq!(grids[0].get(101, 0), Some(&0x02030500));
assert_eq!(grids[0].get(102, 0), Some(&0x02030600)); assert_eq!(grids[0].get(102, 0), Some(&0x02030600));
} }
#[tokio::test]
async fn test_get_rgb_bin() {
let mut grids = [FlutGrid::init(800, 600, 0xFF00F0)];
let reader = tokio_test::io::Builder::new()
.read(&[GET_PX_BIN, 0, 15, 0, 21, 0])
.read(&[GET_PX_BIN, 0, 16, 0, 21, 0])
.read(&[GET_PX_BIN, 0, 17, 0, 21, 0])
.build();
let writer = tokio_test::io::Builder::new()
.write(&[GET_PX_BIN, 0, 15, 0, 21, 0, 0xff, 0x00, 0xf0])
.write(&[GET_PX_BIN, 0, 16, 0, 21, 0, 0xff, 0x00, 0xf0])
.write(&[GET_PX_BIN, 0, 17, 0, 21, 0, 0xff, 0x00, 0xf0])
.build();
assert_ok!(process_socket(reader, writer, &mut grids).await);
assert_eq!(grids[0].get(15, 21), Some(&0xff00f0));
}
} }