From 89d7ad00217e5eb031f0e3c82a3c292198bd0197 Mon Sep 17 00:00:00 2001 From: bQUARKz Date: Tue, 24 Mar 2026 08:17:53 +0000 Subject: [PATCH] improvements phase: toy -> med --- Cargo.lock | 16 ++++ Cargo.toml | 3 +- scripts/test/send.lua | 7 ++ scripts/test/send.sh | 11 +++ scripts/test/spawn.sh | 2 +- src/main.rs | 173 ++++++++++++++++++++++++++++++++++-------- 6 files changed, 179 insertions(+), 33 deletions(-) create mode 100644 scripts/test/send.lua create mode 100755 scripts/test/send.sh diff --git a/Cargo.lock b/Cargo.lock index d733e21..b8a1214 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,21 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "errno" version = "0.3.14" @@ -375,6 +390,7 @@ name = "simple-tsdb" version = "0.1.0" dependencies = [ "axum", + "crossbeam-channel", "serde", "serde_json", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 75f7644..4d50e38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ edition = "2024" tokio = { version = "1.50.0", features = ["signal", "rt", "rt-multi-thread", "macros"] } axum = "0.8.8" serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.149" \ No newline at end of file +serde_json = "1.0.149" +crossbeam-channel = "0.5.15" \ No newline at end of file diff --git a/scripts/test/send.lua b/scripts/test/send.lua new file mode 100644 index 0000000..f8cc37f --- /dev/null +++ b/scripts/test/send.lua @@ -0,0 +1,7 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/json" + +function request() + local body = '{"metric":"cpu_usage","timestamp":' .. os.time() .. ',"value":42.3}' + return wrk.format(nil, "/metrics", nil, body) +end \ No newline at end of file diff --git a/scripts/test/send.sh b/scripts/test/send.sh new file mode 100755 index 0000000..a65b3a6 --- /dev/null +++ b/scripts/test/send.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -euo pipefail + +THREADS=${1:-4} +CONNECTIONS=${2:-10} +DURATION_SECS=${3:-30} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +wrk -t"$THREADS" -c"$CONNECTIONS" -d"$DURATION_SECS"s -s "$SCRIPT_DIR/send.lua" http://localhost:3000 \ No newline at end of file diff --git a/scripts/test/spawn.sh b/scripts/test/spawn.sh index f19dc02..1c18922 100755 --- a/scripts/test/spawn.sh +++ b/scripts/test/spawn.sh @@ -23,7 +23,7 @@ cleanup() { echo "All workers stopped." } -trap cleanup SIGINT SIGTERM EXIT +trap cleanup SIGINT SIGTERM echo "Starting $THREADS workers with RATE=$RATE each..." diff --git a/src/main.rs b/src/main.rs index d3ebf90..e366b99 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,19 @@ use axum::extract::State; +use axum::http::StatusCode; use axum::{ extract::Json, routing::post, Router, }; +use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fs::{File, OpenOptions}; -use std::io::{BufRead, BufReader, Write}; -use std::sync::{Arc, Mutex}; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; use std::time::Duration; use tokio::net::TcpListener; use tokio::signal; @@ -20,60 +25,166 @@ struct Metric { value: f64, } +enum QueueCommand { + WriteLine(String), + Shutdown, +} + #[derive(Clone)] struct BaseState { - file: Arc>, + sender: Sender, + queue_size: Arc, + running: Arc, } impl BaseState { - fn new(file: File) -> Self { + fn new(sender: Sender) -> Self { BaseState { - file: Arc::new(Mutex::new(file)), + sender, + queue_size: Arc::new(AtomicUsize::new(0)), + running: Arc::new(AtomicBool::new(true)), } } } #[tokio::main] async fn main() { + let (tx, rx) = bounded::(1000); + let state = BaseState::new(tx.clone()); + println!("Starting Simple TSDB... listening on: localhost "); + + let writer = spawn_writer_facade(rx, &state); + let counter = spawn_counter_facade(&state); + + let app = Router::new().route("/metrics", post(handler)).with_state(state.clone()); + let listener = TcpListener::bind("0.0.0.0:3000").await.expect("error when opening port 3000"); + + let start_time = std::time::SystemTime::now(); + axum::serve(listener, app).with_graceful_shutdown(shutdown_signal(state.clone())).await.expect("internal server error"); + + writer.join().expect("error thread panic"); + counter.join().expect("error thread panic"); + println!("\nServer Stopped. Analysing metrics...\n"); + let elapsed = start_time.elapsed().expect("error when measuring elapsed time"); + print_metrics_report(elapsed, "data/metrics.ndjson"); +} + +fn spawn_counter_facade(state: &BaseState) -> JoinHandle<()> { + let s2 = state.clone(); + let counter = thread::spawn(move || counter_facade(s2)); + counter +} + +fn counter_facade(state: BaseState) { + let spinner = ['⠁','⠂','⠄','⡀','⢀','⠠','⠐','⠈']; + let mut i = 0; + + while state.running.load(Ordering::Relaxed) + { + let size = state.queue_size.load(Ordering::Relaxed); + let spin = spinner[i % spinner.len()]; + print!("\r\x1b[2K{} pending_writes: {:5}", spin, size); + std::io::stdout().flush().unwrap(); + + i += 1; + thread::sleep(Duration::from_millis(80)); + } + + println!("\r\x1b[2K✔ done"); +} + +fn spawn_writer_facade(rx: Receiver, state: &BaseState) -> JoinHandle<()> { + let s1 = state.clone(); + let writer = thread::spawn(move || writer_facade(s1, rx)); + writer +} + +fn writer_facade(state: BaseState, rx: Receiver) { + std::fs::create_dir_all("data").expect("error when creating data dir"); let file = OpenOptions::new() .append(true) .create(true) .open("data/metrics.ndjson") .expect("error when opening file"); - let state = BaseState::new(file); - let app = Router::new().route("/metrics", post(receive_metric)).with_state(state.clone()); - let listener = TcpListener::bind("0.0.0.0:3000").await.expect("error when opening port 3000"); - println!("Starting Simple TSDB... listening on: localhost "); + let mut writer = BufWriter::new(file); - let start_time = std::time::SystemTime::now(); - axum::serve(listener, app).with_graceful_shutdown(shutdown_signal()).await.expect("internal server error"); + let buffer_flush = |writer: &mut BufWriter, buffer: &mut Vec| -> usize { + let amount = buffer.len(); + if amount <= 0 { + return 0usize; + } - { - let mut file = state.file.lock().expect("error when locking file"); - file.flush().expect("error when final flushing"); + for line in buffer.drain(..) { + writer.write_all(line.as_bytes()).unwrap(); + writer.write_all(b"\n").unwrap(); + } + + // thread::sleep(Duration::from_millis(20)); // delay of 20ms - simulate ~50ops/s + amount + }; + + let flush_writer = |writer: &mut BufWriter| { + writer.flush().expect("error when flushing writer"); + }; + + let shutdown = |writer: &mut BufWriter, buffer: &mut Vec| { + state.running.store(false, Ordering::Relaxed); + let amount = buffer_flush(writer, buffer); + state.queue_size.fetch_sub(amount, Ordering::Relaxed); + flush_writer(writer); + }; + + let buffer_capacity = 50; + let mut buffer: Vec = Vec::with_capacity(buffer_capacity); + + loop { + match rx.recv_timeout(Duration::from_millis(100)) { + Ok(QueueCommand::WriteLine(line)) => { + buffer.push(line); + if buffer.len() >= buffer_capacity { + let amount = buffer_flush(&mut writer, &mut buffer); + state.queue_size.fetch_sub(amount, Ordering::Relaxed); + } + } + Err(RecvTimeoutError::Timeout) => { + if !buffer.is_empty() { + let amount = buffer_flush(&mut writer, &mut buffer); + state.queue_size.fetch_sub(amount, Ordering::Relaxed); + } + } + Ok(QueueCommand::Shutdown) => { + println!("\nStarting shutdown..."); + shutdown(&mut writer, &mut buffer); + break; // quit the loop + } + Err(RecvTimeoutError::Disconnected) => { + println!("error when receiving command: disconnected"); + shutdown(&mut writer, &mut buffer); + break; // quit the loop + } + } } - - let elapsed = start_time.elapsed().expect("error when measuring elapsed time"); - println!("\nStoping server. Analysing metrics...\n"); - print_metrics_report(elapsed, "data/metrics.ndjson"); } -async fn receive_metric( +async fn handler( State(state): State, - Json(metric): Json) -> &'static str { - println!("metric received: {:?}", metric); - + Json(metric): Json) -> StatusCode { let line = serde_json::to_string(&metric).expect("error when serializing metric"); - - let mut file = state.file.lock().expect("error when locking file"); - writeln!(file, "{line}").expect("error when writing to file"); - - "ok" + state.queue_size.fetch_add(1, Ordering::Relaxed); + match state.sender.try_send(QueueCommand::WriteLine(line)) { + Ok(_) => { + StatusCode::OK + }, + Err(_) => { + state.queue_size.fetch_sub(1, Ordering::Relaxed); + StatusCode::SERVICE_UNAVAILABLE + }, + } } -async fn shutdown_signal() { +async fn shutdown_signal(state: BaseState) { signal::ctrl_c().await.expect("fail when waiting for Ctrl+C"); - println!("\nCtrl+C ack. Starting shutdown..."); + state.sender.send(QueueCommand::Shutdown).expect("error when sending flush signal"); } fn print_metrics_report(elapsed: Duration, path: &str) { @@ -115,8 +226,8 @@ fn print_metrics_report(elapsed: Duration, path: &str) { return; } - let elapsed_time = elapsed.as_secs(); - let rate = total as f64 / elapsed_time as f64; + let elapsed_time = elapsed.as_secs_f64().max(0.001); + let rate = total as f64 / elapsed_time; println!("Number of valid metrics: {total} in {elapsed_time}s ({rate} metrics/s)"); if invalid_lines > 0 { println!("Ignored values: {invalid_lines}");