From de3248c3c6114bfe2026c929cb79738c8747a808 Mon Sep 17 00:00:00 2001 From: Noa Aarts Date: Sat, 19 Oct 2024 14:05:37 +0200 Subject: [PATCH] fix memory leak in closing connections --- Cargo.lock | 9 +++++- Cargo.toml | 2 ++ src/color.rs | 32 +++++++++++++++++++ src/config.rs | 3 ++ src/flutclient.rs | 4 ++- src/lib.rs | 10 ++---- src/main.rs | 80 ++++++++++++++++++++++++++++------------------- 7 files changed, 98 insertions(+), 42 deletions(-) create mode 100644 src/color.rs diff --git a/Cargo.lock b/Cargo.lock index 58e5d8a..1e4ec79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "debug_print" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f215f9b7224f49fb73256115331f677d868b34d18b65dbe4db392e6021eea90" + [[package]] name = "either" version = "1.13.0" @@ -505,6 +511,7 @@ dependencies = [ "bytes", "chrono", "criterion", + "debug_print", "image", "rand", "tempfile", @@ -1666,7 +1673,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ea7aa2d..36b7ae3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,9 @@ async-trait = "0.1.83" atoi_radix10 = "0.0.1" bytes = "1.6.0" chrono = "0.4.38" +debug_print = "1.0.0" image = "0.25.2" +rand = "*" tokio = { version = "1.38", features = ["full"] } tokio-test = "*" diff --git a/src/color.rs b/src/color.rs new file mode 100644 index 0000000..b372a72 --- /dev/null +++ b/src/color.rs @@ -0,0 +1,32 @@ +use std::fmt::Display; + +use rand::{distributions::Standard, prelude::Distribution}; + +#[derive(Debug, PartialEq)] +pub enum Color { + RGB24(u8, u8, u8), + RGBA32(u8, u8, u8, u8), + W8(u8), +} + +impl Display for Color { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Color::RGB24(r, g, b) => write!(f, "#{:02X}{:02X}{:02X}FF", r, g, b), + Color::RGBA32(r, g, b, a) => write!(f, "#{:02X}{:02X}{:02X}{:02X}", r, g, b, a), + Color::W8(w) => write!(f, "#{:02X}{:02X}{:02X}FF", w, w, w), + } + } +} + +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> Color { + let index: u8 = rng.gen_range(0..3); + match index { + 0 => Color::W8(rng.gen()), + 1 => Color::RGB24(rng.gen(), rng.gen(), rng.gen()), + 2 => Color::RGBA32(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + _ => unreachable!(), + } + } +} diff --git a/src/config.rs b/src/config.rs index 06185e4..366179f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,8 @@ +use std::time::Duration; + pub const GRID_LENGTH: usize = 1; pub const HOST: &str = "0.0.0.0:7791"; +pub const IMAGE_SAVE_INTERVAL: Duration = Duration::from_secs(5); pub const HELP_TEXT: &[u8] = b"Flurry is a pixelflut implementation, this means you can use commands to get and set pixels in the canvas SIZE returns the size of the canvas diff --git a/src/flutclient.rs b/src/flutclient.rs index 202cc20..95eab47 100644 --- a/src/flutclient.rs +++ b/src/flutclient.rs @@ -139,7 +139,9 @@ where self.change_protocol(&protocol); break 'outer; } - Err(err) if err.kind() == ErrorKind::UnexpectedEof => return Ok(()), + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + increment_counter(self.counter); + return Ok(())}, Err(e) => return Err(e), } } diff --git a/src/lib.rs b/src/lib.rs index 7194a00..bcece04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ use std::sync::atomic::AtomicU64; +pub use color::Color; use grid::Grid; pub mod config; @@ -12,6 +13,8 @@ pub mod grid; pub mod protocols; pub mod utils; +mod color; + pub type Canvas = u8; pub type Coordinate = u16; @@ -46,13 +49,6 @@ fn increment_counter(amount: u64) { COUNTER.fetch_add(amount, std::sync::atomic::Ordering::Relaxed); } -#[derive(Debug, PartialEq)] -pub enum Color { - RGB24(u8, u8, u8), - RGBA32(u8, u8, u8, u8), - W8(u8), -} - #[derive(Debug, PartialEq)] pub enum Protocol { Text, diff --git a/src/main.rs b/src/main.rs index 9c8ba37..74c036b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,5 @@ -#![feature(test)] -#![feature(sync_unsafe_cell)] -#![feature(if_let_guard)] - use std::{ + collections::VecDeque, fs::{create_dir_all, File}, io::{self, Error, ErrorKind}, path::Path, @@ -11,8 +8,9 @@ use std::{ }; use chrono::Local; +use debug_print::{debug_eprintln, debug_println}; use flurry::{ - config::{GRID_LENGTH, HOST}, + config::{GRID_LENGTH, HOST, IMAGE_SAVE_INTERVAL}, flutclient::FlutClient, grid::{self, Flut}, COUNTER, @@ -21,12 +19,11 @@ use image::{codecs::jpeg::JpegEncoder, GenericImageView, SubImage}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, net::TcpListener, - time::{interval, Instant}, + time::{interval, sleep, timeout, Instant}, }; -extern crate test; - -async fn listen_handle() -> io::Result<()> { +/// This function logs the current amount of changed pixels to stdout every second +async fn pixel_change_stdout_log() -> () { let mut interval = tokio::time::interval(Duration::from_millis(1000)); loop { interval.tick().await; @@ -35,15 +32,21 @@ async fn listen_handle() -> io::Result<()> { } } -async fn save_image_frames(grids: Arc<[grid::Flut]>) -> io::Result<()> { +/// This function starts a timer that saves the current grid state every `duration`. +/// These images may then be used for moderation or timelapses +/// +/// # Errors +/// +/// This function will return an error if it is unable to create or write to the file for the image +async fn save_image_frames(grids: Arc<[grid::Flut]>, duration: Duration) -> io::Result<()> { let base_dir = Path::new("./recordings"); - let mut timer = interval(Duration::from_secs(5)); + let mut timer = interval(duration); create_dir_all(base_dir)?; loop { timer.tick().await; for grid in grids.as_ref() { let p = base_dir.join(format!("{}", Local::now().format("%Y-%m-%d %H:%M:%S"))); - println!("timer ticked, grid writing to {:?}", p); + debug_println!("timer ticked, grid writing to {:?}", p); let mut file_writer = File::create(p)?; let encoder = JpegEncoder::new_with_quality(&mut file_writer, 50); @@ -59,20 +62,37 @@ async fn save_image_frames(grids: Arc<[grid::Flut]>) -> io::Result<()> { } } -async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut]>) -> io::Result<()> { - let mut handles = Vec::new(); - loop { - let (mut socket, _) = flut_listener.accept().await?; - let grids = grids.clone(); - handles.push(tokio::spawn(async move { - let (reader, writer) = socket.split(); - let mut connection = FlutClient::new(reader, writer, grids); - let resp = connection.process_socket().await; - match resp { - Ok(()) => Ok(()), - Err(err) => Err(err), +/// Handle connections made to the socket, keeps a vec of the currently active connections, +/// uses timeout to loop through them and clean them up to stop a memory leak while not throwing +/// everything away +async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut]>) -> () { + let mut handles = VecDeque::new(); + tokio::spawn(async { + loop { + if let Some(handle) = handles.pop_front() { + match timeout(Duration::from_secs(10), handle).await { + Ok(Ok(())) => debug_println!("connection closed ok"), + Ok(Err(e)) => debug_eprintln!("connection error {:?}", e), + Err(_) => handles.push_back(handle), + } + } else { + sleep(Duration::from_secs(30)).await; } - })); + } + }); + loop { + if let Ok((mut socket, _)) = flut_listener.accept().await { + let grids = grids.clone(); + handles.push_back(tokio::spawn(async move { + let (reader, writer) = socket.split(); + let mut connection = FlutClient::new(reader, writer, grids); + let resp = connection.process_socket().await; + match resp { + Ok(()) => Ok(()), + Err(err) => Err(err), + } + })) + }; } } @@ -89,11 +109,8 @@ async fn main() { println!("bound flut listener"); let handles = vec![ - // log the amount of changed pixels each second - (tokio::spawn(listen_handle())), - // save frames every 5 seconds - (tokio::spawn(save_image_frames(grids.clone()))), - // accept and handle flut connections + (tokio::spawn(pixel_change_stdout_log())), + (tokio::spawn(save_image_frames(grids.clone(), IMAGE_SAVE_INTERVAL))), (tokio::spawn(handle_flut(flut_listener, grids.clone()))), ]; @@ -101,6 +118,3 @@ async fn main() { println!("joined handle had result {:?}", handle.await); } } - -#[cfg(test)] -mod tests {}