diff --git a/src/main.rs b/src/main.rs index e366b99..365c6dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,6 @@ use axum::extract::State; use axum::http::StatusCode; -use axum::{ - extract::Json, - routing::post, - Router, -}; +use axum::{extract::Json, routing::post, Router}; use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -18,6 +14,7 @@ use std::time::Duration; use tokio::net::TcpListener; use tokio::signal; +/// Domain model for incoming metrics. #[derive(Debug, Deserialize, Serialize)] struct Metric { metric: String, @@ -25,23 +22,31 @@ struct Metric { value: f64, } -enum QueueCommand { +/// Commands accepted by the ingest/aggregator stage. +enum IngestCommand { WriteLine(String), Shutdown, } +/// Batches sent from the aggregator thread to the disk writer thread. +enum FlushCommand { + WriteBatch(Vec, usize, bool), + Shutdown, +} + +/// Shared application state exposed to request handlers. #[derive(Clone)] struct BaseState { - sender: Sender, - queue_size: Arc, + ingest_tx: Sender, + pending_writes: Arc, running: Arc, } impl BaseState { - fn new(sender: Sender) -> Self { - BaseState { - sender, - queue_size: Arc::new(AtomicUsize::new(0)), + fn new(ingest_tx: Sender) -> Self { + Self { + ingest_tx, + pending_writes: Arc::new(AtomicUsize::new(0)), running: Arc::new(AtomicBool::new(true)), } } @@ -49,42 +54,62 @@ impl BaseState { #[tokio::main] async fn main() { - let (tx, rx) = bounded::(1000); - let state = BaseState::new(tx.clone()); + let (ingest_tx, ingest_rx) = bounded::(1024); + let (flush_tx, flush_rx) = bounded::(128); + + let state = BaseState::new(ingest_tx.clone()); + println!("Starting Simple TSDB... listening on: localhost "); - let writer = spawn_writer_facade(rx, &state); + let aggregator = spawn_aggregator_facade(ingest_rx, flush_tx, &state); + let disk_writer = spawn_disk_writer_facade(flush_rx, &state); let counter = spawn_counter_facade(&state); - let app = Router::new().route("/metrics", post(handler)).with_state(state.clone()); - let listener = TcpListener::bind("0.0.0.0:3000").await.expect("error when opening port 3000"); + let app = Router::new() + .route("/metrics", post(handler)) + .with_state(state.clone()); + + let listener = TcpListener::bind("0.0.0.0:3000") + .await + .expect("error when opening port 3000"); let start_time = std::time::SystemTime::now(); - axum::serve(listener, app).with_graceful_shutdown(shutdown_signal(state.clone())).await.expect("internal server error"); - writer.join().expect("error thread panic"); - counter.join().expect("error thread panic"); + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal(state.clone())) + .await + .expect("internal server error"); + + aggregator.join().expect("aggregator thread panic"); + disk_writer.join().expect("disk writer thread panic"); + + state.running.store(false, Ordering::Relaxed); + counter.join().expect("counter thread panic"); + println!("\nServer Stopped. Analysing metrics...\n"); - let elapsed = start_time.elapsed().expect("error when measuring elapsed time"); + + let elapsed = start_time + .elapsed() + .expect("error when measuring elapsed time"); + print_metrics_report(elapsed, "data/metrics.ndjson"); } fn spawn_counter_facade(state: &BaseState) -> JoinHandle<()> { - let s2 = state.clone(); - let counter = thread::spawn(move || counter_facade(s2)); - counter + let state = state.clone(); + thread::spawn(move || counter_facade(state)) } fn counter_facade(state: BaseState) { - let spinner = ['⠁','⠂','⠄','⡀','⢀','⠠','⠐','⠈']; + let spinner = ['⠁', '⠂', '⠄', '⡀', '⢀', '⠠', '⠐', '⠈']; let mut i = 0; - while state.running.load(Ordering::Relaxed) - { - let size = state.queue_size.load(Ordering::Relaxed); + while state.running.load(Ordering::Relaxed) { + let size = state.pending_writes.load(Ordering::Relaxed); let spin = spinner[i % spinner.len()]; + print!("\r\x1b[2K{} pending_writes: {:5}", spin, size); - std::io::stdout().flush().unwrap(); + std::io::stdout().flush().expect("stdout flush failed"); i += 1; thread::sleep(Duration::from_millis(80)); @@ -93,74 +118,143 @@ fn counter_facade(state: BaseState) { println!("\r\x1b[2K✔ done"); } -fn spawn_writer_facade(rx: Receiver, state: &BaseState) -> JoinHandle<()> { - let s1 = state.clone(); - let writer = thread::spawn(move || writer_facade(s1, rx)); - writer +fn spawn_aggregator_facade( + ingest_rx: Receiver, + flush_tx: Sender, + state: &BaseState, +) -> JoinHandle<()> { + let state = state.clone(); + thread::spawn(move || aggregator_facade(state, ingest_rx, flush_tx)) } -fn writer_facade(state: BaseState, rx: Receiver) { - std::fs::create_dir_all("data").expect("error when creating data dir"); - let file = OpenOptions::new() - .append(true) - .create(true) - .open("data/metrics.ndjson") - .expect("error when opening file"); - let mut writer = BufWriter::new(file); +fn aggregator_facade( + _state: BaseState, + ingest_rx: Receiver, + flush_tx: Sender, +) { + // Capacity in bytes per batch - 64Kb. + let buffer_capacity = 64 * 1024; - let buffer_flush = |writer: &mut BufWriter, buffer: &mut Vec| -> usize { - let amount = buffer.len(); - if amount <= 0 { - return 0usize; + // Double buffer: while one batch is on its way to disk, we keep filling the other. + let mut front_buffer = Vec::::with_capacity(buffer_capacity); + let mut back_buffer = Vec::::with_capacity(buffer_capacity); + let mut front_count: usize = 0; + + let flush_current_batch = |front_buffer: &mut Vec, + back_buffer: &mut Vec, + front_count: &mut usize, + flush_tx: &Sender| { + if *front_count == 0 { + return; } - for line in buffer.drain(..) { - writer.write_all(line.as_bytes()).unwrap(); - writer.write_all(b"\n").unwrap(); - } + std::mem::swap(front_buffer, back_buffer); + let batch = std::mem::take(back_buffer); + let count = *front_count; + *front_count = 0; - // thread::sleep(Duration::from_millis(20)); // delay of 20ms - simulate ~50ops/s - amount + flush_tx + .send(FlushCommand::WriteBatch(batch, count, false)) + .expect("error when sending batch to disk writer"); }; - let flush_writer = |writer: &mut BufWriter| { - writer.flush().expect("error when flushing writer"); - }; - - let shutdown = |writer: &mut BufWriter, buffer: &mut Vec| { - state.running.store(false, Ordering::Relaxed); - let amount = buffer_flush(writer, buffer); - state.queue_size.fetch_sub(amount, Ordering::Relaxed); - flush_writer(writer); - }; - - let buffer_capacity = 50; - let mut buffer: Vec = Vec::with_capacity(buffer_capacity); - loop { - match rx.recv_timeout(Duration::from_millis(100)) { - Ok(QueueCommand::WriteLine(line)) => { - buffer.push(line); - if buffer.len() >= buffer_capacity { - let amount = buffer_flush(&mut writer, &mut buffer); - state.queue_size.fetch_sub(amount, Ordering::Relaxed); + match ingest_rx.recv_timeout(Duration::from_millis(100)) { + Ok(IngestCommand::WriteLine(line)) => { + front_buffer.extend_from_slice(line.as_bytes()); + front_buffer.push(b'\n'); + front_count += 1; + + if front_buffer.len() >= buffer_capacity { + flush_current_batch( + &mut front_buffer, + &mut back_buffer, + &mut front_count, + &flush_tx, + ); } } Err(RecvTimeoutError::Timeout) => { - if !buffer.is_empty() { - let amount = buffer_flush(&mut writer, &mut buffer); - state.queue_size.fetch_sub(amount, Ordering::Relaxed); + if front_count > 0 { + flush_current_batch( + &mut front_buffer, + &mut back_buffer, + &mut front_count, + &flush_tx, + ); } } - Ok(QueueCommand::Shutdown) => { - println!("\nStarting shutdown..."); - shutdown(&mut writer, &mut buffer); - break; // quit the loop + Ok(IngestCommand::Shutdown) => { + if front_count > 0 { + flush_current_batch( + &mut front_buffer, + &mut back_buffer, + &mut front_count, + &flush_tx, + ); + } + + flush_tx + .send(FlushCommand::Shutdown) + .expect("error when sending shutdown to disk writer"); + + break; } Err(RecvTimeoutError::Disconnected) => { - println!("error when receiving command: disconnected"); - shutdown(&mut writer, &mut buffer); - break; // quit the loop + if front_count > 0 { + flush_current_batch( + &mut front_buffer, + &mut back_buffer, + &mut front_count, + &flush_tx, + ); + } + + let _ = flush_tx.send(FlushCommand::Shutdown); + break; + } + } + } +} + +fn spawn_disk_writer_facade( + flush_rx: Receiver, + state: &BaseState, +) -> JoinHandle<()> { + let state = state.clone(); + thread::spawn(move || disk_writer_facade(state, flush_rx)) +} + +fn disk_writer_facade(state: BaseState, flush_rx: Receiver) { + std::fs::create_dir_all("data").expect("error when creating data dir"); + + let file = OpenOptions::new() + .append(true) + .create(true) + //.truncate(true) + .open("data/metrics.ndjson") + .expect("error when opening file"); + + let mut writer = BufWriter::new(file); + + loop { + match flush_rx.recv() { + Ok(FlushCommand::WriteBatch(batch, count, flush)) => { + writer + .write_all(&batch) + .expect("error when writing batch to file"); + + if flush { writer.flush().expect("error when flushing writer"); } + + state.pending_writes.fetch_sub(count, Ordering::Relaxed); + } + Ok(FlushCommand::Shutdown) => { + writer.flush().expect("error when flushing writer"); + break; + } + Err(_) => { + writer.flush().expect("error when flushing writer"); + break; } } } @@ -168,23 +262,32 @@ fn writer_facade(state: BaseState, rx: Receiver) { async fn handler( State(state): State, - Json(metric): Json) -> StatusCode { + Json(metric): Json, +) -> StatusCode { let line = serde_json::to_string(&metric).expect("error when serializing metric"); - state.queue_size.fetch_add(1, Ordering::Relaxed); - match state.sender.try_send(QueueCommand::WriteLine(line)) { - Ok(_) => { - StatusCode::OK - }, + + state.pending_writes.fetch_add(1, Ordering::Relaxed); + + match state.ingest_tx.try_send(IngestCommand::WriteLine(line)) { + Ok(_) => StatusCode::OK, Err(_) => { - state.queue_size.fetch_sub(1, Ordering::Relaxed); + state.pending_writes.fetch_sub(1, Ordering::Relaxed); StatusCode::SERVICE_UNAVAILABLE - }, + } } } async fn shutdown_signal(state: BaseState) { - signal::ctrl_c().await.expect("fail when waiting for Ctrl+C"); - state.sender.send(QueueCommand::Shutdown).expect("error when sending flush signal"); + signal::ctrl_c() + .await + .expect("fail when waiting for Ctrl+C"); + + println!("\nStarting shutdown..."); + + state + .ingest_tx + .send(IngestCommand::Shutdown) + .expect("error when sending shutdown signal"); } fn print_metrics_report(elapsed: Duration, path: &str) { @@ -228,10 +331,13 @@ fn print_metrics_report(elapsed: Duration, path: &str) { let elapsed_time = elapsed.as_secs_f64().max(0.001); let rate = total as f64 / elapsed_time; + println!("Number of valid metrics: {total} in {elapsed_time}s ({rate} metrics/s)"); + if invalid_lines > 0 { println!("Ignored values: {invalid_lines}"); } + println!(); for (metric, count) in &counts {