improve: double buffering

This commit is contained in:
bQUARKz 2026-04-14 11:40:12 +01:00
parent 0082258f7c
commit 333edf0df9
Signed by: bquarkz
SSH Key Fingerprint: SHA256:Z7dgqoglWwoK6j6u4QC87OveEq74WOhFN+gitsxtkf8

View File

@ -1,10 +1,6 @@
use axum::extract::State; use axum::extract::State;
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::{ use axum::{extract::Json, routing::post, Router};
extract::Json,
routing::post,
Router,
};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
@ -18,6 +14,7 @@ use std::time::Duration;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::signal; use tokio::signal;
/// Domain model for incoming metrics.
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
struct Metric { struct Metric {
metric: String, metric: String,
@ -25,23 +22,31 @@ struct Metric {
value: f64, value: f64,
} }
enum QueueCommand { /// Commands accepted by the ingest/aggregator stage.
enum IngestCommand {
WriteLine(String), WriteLine(String),
Shutdown, Shutdown,
} }
/// Batches sent from the aggregator thread to the disk writer thread.
enum FlushCommand {
WriteBatch(Vec<u8>, usize, bool),
Shutdown,
}
/// Shared application state exposed to request handlers.
#[derive(Clone)] #[derive(Clone)]
struct BaseState { struct BaseState {
sender: Sender<QueueCommand>, ingest_tx: Sender<IngestCommand>,
queue_size: Arc<AtomicUsize>, pending_writes: Arc<AtomicUsize>,
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
} }
impl BaseState { impl BaseState {
fn new(sender: Sender<QueueCommand>) -> Self { fn new(ingest_tx: Sender<IngestCommand>) -> Self {
BaseState { Self {
sender, ingest_tx,
queue_size: Arc::new(AtomicUsize::new(0)), pending_writes: Arc::new(AtomicUsize::new(0)),
running: Arc::new(AtomicBool::new(true)), running: Arc::new(AtomicBool::new(true)),
} }
} }
@ -49,42 +54,62 @@ impl BaseState {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let (tx, rx) = bounded::<QueueCommand>(1000); let (ingest_tx, ingest_rx) = bounded::<IngestCommand>(1024);
let state = BaseState::new(tx.clone()); let (flush_tx, flush_rx) = bounded::<FlushCommand>(128);
let state = BaseState::new(ingest_tx.clone());
println!("Starting Simple TSDB... listening on: localhost <port: 3000>"); println!("Starting Simple TSDB... listening on: localhost <port: 3000>");
let writer = spawn_writer_facade(rx, &state); let aggregator = spawn_aggregator_facade(ingest_rx, flush_tx, &state);
let disk_writer = spawn_disk_writer_facade(flush_rx, &state);
let counter = spawn_counter_facade(&state); let counter = spawn_counter_facade(&state);
let app = Router::new().route("/metrics", post(handler)).with_state(state.clone()); let app = Router::new()
let listener = TcpListener::bind("0.0.0.0:3000").await.expect("error when opening port 3000"); .route("/metrics", post(handler))
.with_state(state.clone());
let listener = TcpListener::bind("0.0.0.0:3000")
.await
.expect("error when opening port 3000");
let start_time = std::time::SystemTime::now(); let start_time = std::time::SystemTime::now();
axum::serve(listener, app).with_graceful_shutdown(shutdown_signal(state.clone())).await.expect("internal server error");
writer.join().expect("error thread panic"); axum::serve(listener, app)
counter.join().expect("error thread panic"); .with_graceful_shutdown(shutdown_signal(state.clone()))
.await
.expect("internal server error");
aggregator.join().expect("aggregator thread panic");
disk_writer.join().expect("disk writer thread panic");
state.running.store(false, Ordering::Relaxed);
counter.join().expect("counter thread panic");
println!("\nServer Stopped. Analysing metrics...\n"); println!("\nServer Stopped. Analysing metrics...\n");
let elapsed = start_time.elapsed().expect("error when measuring elapsed time");
let elapsed = start_time
.elapsed()
.expect("error when measuring elapsed time");
print_metrics_report(elapsed, "data/metrics.ndjson"); print_metrics_report(elapsed, "data/metrics.ndjson");
} }
fn spawn_counter_facade(state: &BaseState) -> JoinHandle<()> { fn spawn_counter_facade(state: &BaseState) -> JoinHandle<()> {
let s2 = state.clone(); let state = state.clone();
let counter = thread::spawn(move || counter_facade(s2)); thread::spawn(move || counter_facade(state))
counter
} }
fn counter_facade(state: BaseState) { fn counter_facade(state: BaseState) {
let spinner = ['⠁', '⠂', '⠄', '⡀', '⢀', '⠠', '⠐', '⠈']; let spinner = ['⠁', '⠂', '⠄', '⡀', '⢀', '⠠', '⠐', '⠈'];
let mut i = 0; let mut i = 0;
while state.running.load(Ordering::Relaxed) while state.running.load(Ordering::Relaxed) {
{ let size = state.pending_writes.load(Ordering::Relaxed);
let size = state.queue_size.load(Ordering::Relaxed);
let spin = spinner[i % spinner.len()]; let spin = spinner[i % spinner.len()];
print!("\r\x1b[2K{} pending_writes: {:5}", spin, size); print!("\r\x1b[2K{} pending_writes: {:5}", spin, size);
std::io::stdout().flush().unwrap(); std::io::stdout().flush().expect("stdout flush failed");
i += 1; i += 1;
thread::sleep(Duration::from_millis(80)); thread::sleep(Duration::from_millis(80));
@ -93,74 +118,143 @@ fn counter_facade(state: BaseState) {
println!("\r\x1b[2K✔ done"); println!("\r\x1b[2K✔ done");
} }
fn spawn_writer_facade(rx: Receiver<QueueCommand>, state: &BaseState) -> JoinHandle<()> { fn spawn_aggregator_facade(
let s1 = state.clone(); ingest_rx: Receiver<IngestCommand>,
let writer = thread::spawn(move || writer_facade(s1, rx)); flush_tx: Sender<FlushCommand>,
writer state: &BaseState,
) -> JoinHandle<()> {
let state = state.clone();
thread::spawn(move || aggregator_facade(state, ingest_rx, flush_tx))
} }
fn writer_facade(state: BaseState, rx: Receiver<QueueCommand>) { fn aggregator_facade(
std::fs::create_dir_all("data").expect("error when creating data dir"); _state: BaseState,
let file = OpenOptions::new() ingest_rx: Receiver<IngestCommand>,
.append(true) flush_tx: Sender<FlushCommand>,
.create(true) ) {
.open("data/metrics.ndjson") // Capacity in bytes per batch - 64Kb.
.expect("error when opening file"); let buffer_capacity = 64 * 1024;
let mut writer = BufWriter::new(file);
let buffer_flush = |writer: &mut BufWriter<File>, buffer: &mut Vec<String>| -> usize { // Double buffer: while one batch is on its way to disk, we keep filling the other.
let amount = buffer.len(); let mut front_buffer = Vec::<u8>::with_capacity(buffer_capacity);
if amount <= 0 { let mut back_buffer = Vec::<u8>::with_capacity(buffer_capacity);
return 0usize; let mut front_count: usize = 0;
let flush_current_batch = |front_buffer: &mut Vec<u8>,
back_buffer: &mut Vec<u8>,
front_count: &mut usize,
flush_tx: &Sender<FlushCommand>| {
if *front_count == 0 {
return;
} }
for line in buffer.drain(..) { std::mem::swap(front_buffer, back_buffer);
writer.write_all(line.as_bytes()).unwrap(); let batch = std::mem::take(back_buffer);
writer.write_all(b"\n").unwrap(); let count = *front_count;
} *front_count = 0;
// thread::sleep(Duration::from_millis(20)); // delay of 20ms - simulate ~50ops/s flush_tx
amount .send(FlushCommand::WriteBatch(batch, count, false))
.expect("error when sending batch to disk writer");
}; };
let flush_writer = |writer: &mut BufWriter<File>| {
writer.flush().expect("error when flushing writer");
};
let shutdown = |writer: &mut BufWriter<File>, buffer: &mut Vec<String>| {
state.running.store(false, Ordering::Relaxed);
let amount = buffer_flush(writer, buffer);
state.queue_size.fetch_sub(amount, Ordering::Relaxed);
flush_writer(writer);
};
let buffer_capacity = 50;
let mut buffer: Vec<String> = Vec::with_capacity(buffer_capacity);
loop { loop {
match rx.recv_timeout(Duration::from_millis(100)) { match ingest_rx.recv_timeout(Duration::from_millis(100)) {
Ok(QueueCommand::WriteLine(line)) => { Ok(IngestCommand::WriteLine(line)) => {
buffer.push(line); front_buffer.extend_from_slice(line.as_bytes());
if buffer.len() >= buffer_capacity { front_buffer.push(b'\n');
let amount = buffer_flush(&mut writer, &mut buffer); front_count += 1;
state.queue_size.fetch_sub(amount, Ordering::Relaxed);
if front_buffer.len() >= buffer_capacity {
flush_current_batch(
&mut front_buffer,
&mut back_buffer,
&mut front_count,
&flush_tx,
);
} }
} }
Err(RecvTimeoutError::Timeout) => { Err(RecvTimeoutError::Timeout) => {
if !buffer.is_empty() { if front_count > 0 {
let amount = buffer_flush(&mut writer, &mut buffer); flush_current_batch(
state.queue_size.fetch_sub(amount, Ordering::Relaxed); &mut front_buffer,
&mut back_buffer,
&mut front_count,
&flush_tx,
);
} }
} }
Ok(QueueCommand::Shutdown) => { Ok(IngestCommand::Shutdown) => {
println!("\nStarting shutdown..."); if front_count > 0 {
shutdown(&mut writer, &mut buffer); flush_current_batch(
break; // quit the loop &mut front_buffer,
&mut back_buffer,
&mut front_count,
&flush_tx,
);
}
flush_tx
.send(FlushCommand::Shutdown)
.expect("error when sending shutdown to disk writer");
break;
} }
Err(RecvTimeoutError::Disconnected) => { Err(RecvTimeoutError::Disconnected) => {
println!("error when receiving command: disconnected"); if front_count > 0 {
shutdown(&mut writer, &mut buffer); flush_current_batch(
break; // quit the loop &mut front_buffer,
&mut back_buffer,
&mut front_count,
&flush_tx,
);
}
let _ = flush_tx.send(FlushCommand::Shutdown);
break;
}
}
}
}
fn spawn_disk_writer_facade(
flush_rx: Receiver<FlushCommand>,
state: &BaseState,
) -> JoinHandle<()> {
let state = state.clone();
thread::spawn(move || disk_writer_facade(state, flush_rx))
}
fn disk_writer_facade(state: BaseState, flush_rx: Receiver<FlushCommand>) {
std::fs::create_dir_all("data").expect("error when creating data dir");
let file = OpenOptions::new()
.append(true)
.create(true)
//.truncate(true)
.open("data/metrics.ndjson")
.expect("error when opening file");
let mut writer = BufWriter::new(file);
loop {
match flush_rx.recv() {
Ok(FlushCommand::WriteBatch(batch, count, flush)) => {
writer
.write_all(&batch)
.expect("error when writing batch to file");
if flush { writer.flush().expect("error when flushing writer"); }
state.pending_writes.fetch_sub(count, Ordering::Relaxed);
}
Ok(FlushCommand::Shutdown) => {
writer.flush().expect("error when flushing writer");
break;
}
Err(_) => {
writer.flush().expect("error when flushing writer");
break;
} }
} }
} }
@ -168,23 +262,32 @@ fn writer_facade(state: BaseState, rx: Receiver<QueueCommand>) {
async fn handler( async fn handler(
State(state): State<BaseState>, State(state): State<BaseState>,
Json(metric): Json<Metric>) -> StatusCode { Json(metric): Json<Metric>,
) -> StatusCode {
let line = serde_json::to_string(&metric).expect("error when serializing metric"); let line = serde_json::to_string(&metric).expect("error when serializing metric");
state.queue_size.fetch_add(1, Ordering::Relaxed);
match state.sender.try_send(QueueCommand::WriteLine(line)) { state.pending_writes.fetch_add(1, Ordering::Relaxed);
Ok(_) => {
StatusCode::OK match state.ingest_tx.try_send(IngestCommand::WriteLine(line)) {
}, Ok(_) => StatusCode::OK,
Err(_) => { Err(_) => {
state.queue_size.fetch_sub(1, Ordering::Relaxed); state.pending_writes.fetch_sub(1, Ordering::Relaxed);
StatusCode::SERVICE_UNAVAILABLE StatusCode::SERVICE_UNAVAILABLE
}, }
} }
} }
async fn shutdown_signal(state: BaseState) { async fn shutdown_signal(state: BaseState) {
signal::ctrl_c().await.expect("fail when waiting for Ctrl+C"); signal::ctrl_c()
state.sender.send(QueueCommand::Shutdown).expect("error when sending flush signal"); .await
.expect("fail when waiting for Ctrl+C");
println!("\nStarting shutdown...");
state
.ingest_tx
.send(IngestCommand::Shutdown)
.expect("error when sending shutdown signal");
} }
fn print_metrics_report(elapsed: Duration, path: &str) { fn print_metrics_report(elapsed: Duration, path: &str) {
@ -228,10 +331,13 @@ fn print_metrics_report(elapsed: Duration, path: &str) {
let elapsed_time = elapsed.as_secs_f64().max(0.001); let elapsed_time = elapsed.as_secs_f64().max(0.001);
let rate = total as f64 / elapsed_time; let rate = total as f64 / elapsed_time;
println!("Number of valid metrics: {total} in {elapsed_time}s ({rate} metrics/s)"); println!("Number of valid metrics: {total} in {elapsed_time}s ({rate} metrics/s)");
if invalid_lines > 0 { if invalid_lines > 0 {
println!("Ignored values: {invalid_lines}"); println!("Ignored values: {invalid_lines}");
} }
println!(); println!();
for (metric, count) in &counts { for (metric, count) in &counts {