fix memory leak in closing connections

This commit is contained in:
Noa Aarts 2024-10-19 14:05:37 +02:00
parent b0d74880dd
commit de3248c3c6
Signed by: noa
GPG key ID: 1850932741EFF672
7 changed files with 98 additions and 42 deletions

9
Cargo.lock generated
View file

@ -424,6 +424,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "debug_print"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f215f9b7224f49fb73256115331f677d868b34d18b65dbe4db392e6021eea90"
[[package]]
name = "either"
version = "1.13.0"
@ -505,6 +511,7 @@ dependencies = [
"bytes",
"chrono",
"criterion",
"debug_print",
"image",
"rand",
"tempfile",
@ -1666,7 +1673,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]

View file

@ -8,7 +8,9 @@ async-trait = "0.1.83"
atoi_radix10 = "0.0.1"
bytes = "1.6.0"
chrono = "0.4.38"
debug_print = "1.0.0"
image = "0.25.2"
rand = "*"
tokio = { version = "1.38", features = ["full"] }
tokio-test = "*"

32
src/color.rs Normal file
View file

@ -0,0 +1,32 @@
use std::fmt::Display;
use rand::{distributions::Standard, prelude::Distribution};
#[derive(Debug, PartialEq)]
pub enum Color {
RGB24(u8, u8, u8),
RGBA32(u8, u8, u8, u8),
W8(u8),
}
impl Display for Color {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Color::RGB24(r, g, b) => write!(f, "#{:02X}{:02X}{:02X}FF", r, g, b),
Color::RGBA32(r, g, b, a) => write!(f, "#{:02X}{:02X}{:02X}{:02X}", r, g, b, a),
Color::W8(w) => write!(f, "#{:02X}{:02X}{:02X}FF", w, w, w),
}
}
}
impl Distribution<Color> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> Color {
let index: u8 = rng.gen_range(0..3);
match index {
0 => Color::W8(rng.gen()),
1 => Color::RGB24(rng.gen(), rng.gen(), rng.gen()),
2 => Color::RGBA32(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
_ => unreachable!(),
}
}
}

View file

@ -1,5 +1,8 @@
use std::time::Duration;
pub const GRID_LENGTH: usize = 1;
pub const HOST: &str = "0.0.0.0:7791";
pub const IMAGE_SAVE_INTERVAL: Duration = Duration::from_secs(5);
pub const HELP_TEXT: &[u8] = b"Flurry is a pixelflut implementation, this means you can use commands to get and set pixels in the canvas
SIZE returns the size of the canvas

View file

@ -139,7 +139,9 @@ where
self.change_protocol(&protocol);
break 'outer;
}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => return Ok(()),
Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
increment_counter(self.counter);
return Ok(())},
Err(e) => return Err(e),
}
}

View file

@ -4,6 +4,7 @@
use std::sync::atomic::AtomicU64;
pub use color::Color;
use grid::Grid;
pub mod config;
@ -12,6 +13,8 @@ pub mod grid;
pub mod protocols;
pub mod utils;
mod color;
pub type Canvas = u8;
pub type Coordinate = u16;
@ -46,13 +49,6 @@ fn increment_counter(amount: u64) {
COUNTER.fetch_add(amount, std::sync::atomic::Ordering::Relaxed);
}
#[derive(Debug, PartialEq)]
pub enum Color {
RGB24(u8, u8, u8),
RGBA32(u8, u8, u8, u8),
W8(u8),
}
#[derive(Debug, PartialEq)]
pub enum Protocol {
Text,

View file

@ -1,8 +1,5 @@
#![feature(test)]
#![feature(sync_unsafe_cell)]
#![feature(if_let_guard)]
use std::{
collections::VecDeque,
fs::{create_dir_all, File},
io::{self, Error, ErrorKind},
path::Path,
@ -11,8 +8,9 @@ use std::{
};
use chrono::Local;
use debug_print::{debug_eprintln, debug_println};
use flurry::{
config::{GRID_LENGTH, HOST},
config::{GRID_LENGTH, HOST, IMAGE_SAVE_INTERVAL},
flutclient::FlutClient,
grid::{self, Flut},
COUNTER,
@ -21,12 +19,11 @@ use image::{codecs::jpeg::JpegEncoder, GenericImageView, SubImage};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::TcpListener,
time::{interval, Instant},
time::{interval, sleep, timeout, Instant},
};
extern crate test;
async fn listen_handle() -> io::Result<()> {
/// This function logs the current amount of changed pixels to stdout every second
async fn pixel_change_stdout_log() -> () {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop {
interval.tick().await;
@ -35,15 +32,21 @@ async fn listen_handle() -> io::Result<()> {
}
}
async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>) -> io::Result<()> {
/// This function starts a timer that saves the current grid state every `duration`.
/// These images may then be used for moderation or timelapses
///
/// # Errors
///
/// This function will return an error if it is unable to create or write to the file for the image
async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>, duration: Duration) -> io::Result<()> {
let base_dir = Path::new("./recordings");
let mut timer = interval(Duration::from_secs(5));
let mut timer = interval(duration);
create_dir_all(base_dir)?;
loop {
timer.tick().await;
for grid in grids.as_ref() {
let p = base_dir.join(format!("{}", Local::now().format("%Y-%m-%d %H:%M:%S")));
println!("timer ticked, grid writing to {:?}", p);
debug_println!("timer ticked, grid writing to {:?}", p);
let mut file_writer = File::create(p)?;
let encoder = JpegEncoder::new_with_quality(&mut file_writer, 50);
@ -59,20 +62,37 @@ async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>) -> io::Result<()> {
}
}
async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut<u32>]>) -> io::Result<()> {
let mut handles = Vec::new();
loop {
let (mut socket, _) = flut_listener.accept().await?;
let grids = grids.clone();
handles.push(tokio::spawn(async move {
let (reader, writer) = socket.split();
let mut connection = FlutClient::new(reader, writer, grids);
let resp = connection.process_socket().await;
match resp {
Ok(()) => Ok(()),
Err(err) => Err(err),
/// Handle connections made to the socket, keeps a vec of the currently active connections,
/// uses timeout to loop through them and clean them up to stop a memory leak while not throwing
/// everything away
async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut<u32>]>) -> () {
let mut handles = VecDeque::new();
tokio::spawn(async {
loop {
if let Some(handle) = handles.pop_front() {
match timeout(Duration::from_secs(10), handle).await {
Ok(Ok(())) => debug_println!("connection closed ok"),
Ok(Err(e)) => debug_eprintln!("connection error {:?}", e),
Err(_) => handles.push_back(handle),
}
} else {
sleep(Duration::from_secs(30)).await;
}
}));
}
});
loop {
if let Ok((mut socket, _)) = flut_listener.accept().await {
let grids = grids.clone();
handles.push_back(tokio::spawn(async move {
let (reader, writer) = socket.split();
let mut connection = FlutClient::new(reader, writer, grids);
let resp = connection.process_socket().await;
match resp {
Ok(()) => Ok(()),
Err(err) => Err(err),
}
}))
};
}
}
@ -89,11 +109,8 @@ async fn main() {
println!("bound flut listener");
let handles = vec![
// log the amount of changed pixels each second
(tokio::spawn(listen_handle())),
// save frames every 5 seconds
(tokio::spawn(save_image_frames(grids.clone()))),
// accept and handle flut connections
(tokio::spawn(pixel_change_stdout_log())),
(tokio::spawn(save_image_frames(grids.clone(), IMAGE_SAVE_INTERVAL))),
(tokio::spawn(handle_flut(flut_listener, grids.clone()))),
];
@ -101,6 +118,3 @@ async fn main() {
println!("joined handle had result {:?}", handle.await);
}
}
#[cfg(test)]
mod tests {}