feat: jpg buffer (#13)

Added a buffer to the FlutGrid to store encoded jpeg data of the canvas. This buffer is stored behind a RwLock to allow for arbitrary concurrent read operations.

save_image_frames has also been rewritten to make use of this buffer rather than encoding the jpeg on the fly.

An update thread has been added to regularly (currently every 20 ms) update the contents of the buffer.
This commit is contained in:
peppidesu 2024-10-19 16:09:58 +02:00 committed by GitHub
commit 26a827ba14
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 46 additions and 18 deletions

View file

@ -3,6 +3,7 @@ use std::time::Duration;
pub const GRID_LENGTH: usize = 1;
pub const HOST: &str = "0.0.0.0:7791";
pub const IMAGE_SAVE_INTERVAL: Duration = Duration::from_secs(5);
pub const JPEG_UPDATE_INTERVAL: Duration = Duration::from_millis(20);
pub const HELP_TEXT: &[u8] = b"Flurry is a pixelflut implementation, this means you can use commands to get and set pixels in the canvas
SIZE returns the size of the canvas

View file

@ -1,6 +1,7 @@
use std::cell::SyncUnsafeCell;
use image::{GenericImageView, Rgb};
use tokio::sync::{RwLock, RwLockReadGuard};
use crate::Coordinate;
@ -15,6 +16,7 @@ pub struct Flut<T> {
size_x: usize,
size_y: usize,
cells: SyncUnsafeCell<Vec<T>>,
jpgbuf: RwLock<Vec<u8>>
}
impl<T: Clone> Flut<T> {
@ -27,6 +29,7 @@ impl<T: Clone> Flut<T> {
size_x,
size_y,
cells: vec.into(),
jpgbuf: RwLock::new(Vec::new())
}
}
@ -44,6 +47,9 @@ impl<T> Flut<T> {
}
Some((y * self.size_x) + x)
}
pub async fn read_jpg_buffer(&self) -> RwLockReadGuard<'_, Vec<u8>> {
self.jpgbuf.read().await
}
}
impl<T> Grid<Coordinate, T> for Flut<T> {
@ -78,6 +84,20 @@ impl GenericImageView for Flut<u32> {
let [r, g, b, _a] = pixel.to_be_bytes();
Rgb::from([r, g, b])
}
}
impl Flut<u32> {
pub async fn update_jpg_buffer(&self) {
let mut jpgbuf = self.jpgbuf.write().await;
jpgbuf.clear();
let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut *jpgbuf, 50);
let subimage = self.view(0, 0, self.width(), self.height()).to_image();
match subimage.write_with_encoder(encoder) {
Ok(_) => {}
Err(err) => eprintln!("{}", err),
}
}
}
#[cfg(test)]

View file

@ -1,6 +1,7 @@
use std::{
convert::Infallible,
fs::{create_dir_all, File},
io::{self},
io::{self, Write as _},
path::Path,
sync::Arc,
time::Duration,
@ -8,16 +9,19 @@ use std::{
use chrono::Local;
use flurry::{
config::{GRID_LENGTH, HOST, IMAGE_SAVE_INTERVAL},
config::{GRID_LENGTH, HOST, IMAGE_SAVE_INTERVAL, JPEG_UPDATE_INTERVAL},
flutclient::FlutClient,
grid::{self, Flut},
COUNTER,
};
use image::{codecs::jpeg::JpegEncoder, GenericImageView, SubImage};
use tokio::{net::TcpListener, time::interval};
use tokio::{
net::TcpListener,
time::interval
};
type Never = Infallible;
/// This function logs the current amount of changed pixels to stdout every second
async fn pixel_change_stdout_log() -> io::Result<()> {
async fn pixel_change_stdout_log() -> io::Result<Never> {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop {
interval.tick().await;
@ -32,25 +36,17 @@ async fn pixel_change_stdout_log() -> io::Result<()> {
/// # Errors
///
/// This function will return an error if it is unable to create or write to the file for the image
async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>, duration: Duration) -> io::Result<()> {
async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>, duration: Duration) -> io::Result<Never> {
let base_dir = Path::new("./recordings");
let mut timer = interval(duration);
create_dir_all(base_dir)?;
loop {
timer.tick().await;
for grid in grids.as_ref() {
let p = base_dir.join(format!("{}", Local::now().format("%Y-%m-%d %H:%M:%S")));
let p = base_dir.join(format!("{}", Local::now().format("%Y-%m-%d_%H-%M-%S.jpg")));
let mut file_writer = File::create(p)?;
let encoder = JpegEncoder::new_with_quality(&mut file_writer, 50);
grid.view(0, 0, grid.width(), grid.height()).to_image();
let sub_image = SubImage::new(grid, 0, 0, grid.width(), grid.height());
let image = sub_image.to_image();
match image.write_with_encoder(encoder) {
Ok(_) => {}
Err(err) => eprintln!("{}", err),
}
file_writer.write_all(&grid.read_jpg_buffer().await)?;
}
}
}
@ -58,7 +54,7 @@ async fn save_image_frames(grids: Arc<[grid::Flut<u32>]>, duration: Duration) ->
/// Handle connections made to the socket, keeps a vec of the currently active connections,
/// uses timeout to loop through them and clean them up to stop a memory leak while not throwing
/// everything away
async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut<u32>]>) -> io::Result<()> {
async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut<u32>]>) -> io::Result<Never> {
let mut handles = Vec::new();
loop {
let (mut socket, _) = flut_listener.accept().await?;
@ -75,11 +71,21 @@ async fn handle_flut(flut_listener: TcpListener, grids: Arc<[grid::Flut<u32>]>)
}
}
async fn jpeg_update_loop(grids: Arc<[Flut<u32>]>) -> io::Result<Never> {
let mut interval = interval(JPEG_UPDATE_INTERVAL);
loop {
interval.tick().await;
for grid in grids.as_ref() {
grid.update_jpg_buffer().await;
}
}
}
#[tokio::main]
#[allow(clippy::needless_return)]
async fn main() {
println!("created grids");
let grids: Arc<[Flut<u32>; GRID_LENGTH]> = [grid::Flut::init(800, 600, 0xff_00_ff_ff)].into();
println!("created grids");
let Ok(flut_listener) = TcpListener::bind(HOST).await else {
eprintln!("Was unable to bind to {HOST}, please check if a different process is bound");
@ -91,6 +97,7 @@ async fn main() {
(tokio::spawn(pixel_change_stdout_log())),
(tokio::spawn(save_image_frames(grids.clone(), IMAGE_SAVE_INTERVAL))),
(tokio::spawn(handle_flut(flut_listener, grids.clone()))),
(tokio::spawn(jpeg_update_loop(grids.clone())))
];
for handle in handles {