improvements phase: toy -> med
This commit is contained in:
parent
50846646f6
commit
89d7ad0021
16
Cargo.lock
generated
16
Cargo.lock
generated
@ -66,6 +66,21 @@ version = "1.11.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
|
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crossbeam-channel"
|
||||||
|
version = "0.5.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
|
||||||
|
dependencies = [
|
||||||
|
"crossbeam-utils",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crossbeam-utils"
|
||||||
|
version = "0.8.21"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "errno"
|
name = "errno"
|
||||||
version = "0.3.14"
|
version = "0.3.14"
|
||||||
@ -375,6 +390,7 @@ name = "simple-tsdb"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
|
"crossbeam-channel",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|||||||
@ -7,4 +7,5 @@ edition = "2024"
|
|||||||
tokio = { version = "1.50.0", features = ["signal", "rt", "rt-multi-thread", "macros"] }
|
tokio = { version = "1.50.0", features = ["signal", "rt", "rt-multi-thread", "macros"] }
|
||||||
axum = "0.8.8"
|
axum = "0.8.8"
|
||||||
serde = { version = "1.0.228", features = ["derive"] }
|
serde = { version = "1.0.228", features = ["derive"] }
|
||||||
serde_json = "1.0.149"
|
serde_json = "1.0.149"
|
||||||
|
crossbeam-channel = "0.5.15"
|
||||||
7
scripts/test/send.lua
Normal file
7
scripts/test/send.lua
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
wrk.method = "POST"
|
||||||
|
wrk.headers["Content-Type"] = "application/json"
|
||||||
|
|
||||||
|
function request()
|
||||||
|
local body = '{"metric":"cpu_usage","timestamp":' .. os.time() .. ',"value":42.3}'
|
||||||
|
return wrk.format(nil, "/metrics", nil, body)
|
||||||
|
end
|
||||||
11
scripts/test/send.sh
Executable file
11
scripts/test/send.sh
Executable file
@ -0,0 +1,11 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
THREADS=${1:-4}
|
||||||
|
CONNECTIONS=${2:-10}
|
||||||
|
DURATION_SECS=${3:-30}
|
||||||
|
|
||||||
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||||
|
|
||||||
|
wrk -t"$THREADS" -c"$CONNECTIONS" -d"$DURATION_SECS"s -s "$SCRIPT_DIR/send.lua" http://localhost:3000
|
||||||
@ -23,7 +23,7 @@ cleanup() {
|
|||||||
echo "All workers stopped."
|
echo "All workers stopped."
|
||||||
}
|
}
|
||||||
|
|
||||||
trap cleanup SIGINT SIGTERM EXIT
|
trap cleanup SIGINT SIGTERM
|
||||||
|
|
||||||
echo "Starting $THREADS workers with RATE=$RATE each..."
|
echo "Starting $THREADS workers with RATE=$RATE each..."
|
||||||
|
|
||||||
|
|||||||
173
src/main.rs
173
src/main.rs
@ -1,14 +1,19 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use axum::http::StatusCode;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::Json,
|
extract::Json,
|
||||||
routing::post,
|
routing::post,
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
|
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fs::{File, OpenOptions};
|
use std::fs::{File, OpenOptions};
|
||||||
use std::io::{BufRead, BufReader, Write};
|
use std::io::{BufRead, BufReader, BufWriter, Write};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread;
|
||||||
|
use std::thread::JoinHandle;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::signal;
|
use tokio::signal;
|
||||||
@ -20,60 +25,166 @@ struct Metric {
|
|||||||
value: f64,
|
value: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum QueueCommand {
|
||||||
|
WriteLine(String),
|
||||||
|
Shutdown,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct BaseState {
|
struct BaseState {
|
||||||
file: Arc<Mutex<File>>,
|
sender: Sender<QueueCommand>,
|
||||||
|
queue_size: Arc<AtomicUsize>,
|
||||||
|
running: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BaseState {
|
impl BaseState {
|
||||||
fn new(file: File) -> Self {
|
fn new(sender: Sender<QueueCommand>) -> Self {
|
||||||
BaseState {
|
BaseState {
|
||||||
file: Arc::new(Mutex::new(file)),
|
sender,
|
||||||
|
queue_size: Arc::new(AtomicUsize::new(0)),
|
||||||
|
running: Arc::new(AtomicBool::new(true)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
let (tx, rx) = bounded::<QueueCommand>(1000);
|
||||||
|
let state = BaseState::new(tx.clone());
|
||||||
|
println!("Starting Simple TSDB... listening on: localhost <port: 3000>");
|
||||||
|
|
||||||
|
let writer = spawn_writer_facade(rx, &state);
|
||||||
|
let counter = spawn_counter_facade(&state);
|
||||||
|
|
||||||
|
let app = Router::new().route("/metrics", post(handler)).with_state(state.clone());
|
||||||
|
let listener = TcpListener::bind("0.0.0.0:3000").await.expect("error when opening port 3000");
|
||||||
|
|
||||||
|
let start_time = std::time::SystemTime::now();
|
||||||
|
axum::serve(listener, app).with_graceful_shutdown(shutdown_signal(state.clone())).await.expect("internal server error");
|
||||||
|
|
||||||
|
writer.join().expect("error thread panic");
|
||||||
|
counter.join().expect("error thread panic");
|
||||||
|
println!("\nServer Stopped. Analysing metrics...\n");
|
||||||
|
let elapsed = start_time.elapsed().expect("error when measuring elapsed time");
|
||||||
|
print_metrics_report(elapsed, "data/metrics.ndjson");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_counter_facade(state: &BaseState) -> JoinHandle<()> {
|
||||||
|
let s2 = state.clone();
|
||||||
|
let counter = thread::spawn(move || counter_facade(s2));
|
||||||
|
counter
|
||||||
|
}
|
||||||
|
|
||||||
|
fn counter_facade(state: BaseState) {
|
||||||
|
let spinner = ['⠁','⠂','⠄','⡀','⢀','⠠','⠐','⠈'];
|
||||||
|
let mut i = 0;
|
||||||
|
|
||||||
|
while state.running.load(Ordering::Relaxed)
|
||||||
|
{
|
||||||
|
let size = state.queue_size.load(Ordering::Relaxed);
|
||||||
|
let spin = spinner[i % spinner.len()];
|
||||||
|
print!("\r\x1b[2K{} pending_writes: {:5}", spin, size);
|
||||||
|
std::io::stdout().flush().unwrap();
|
||||||
|
|
||||||
|
i += 1;
|
||||||
|
thread::sleep(Duration::from_millis(80));
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("\r\x1b[2K✔ done");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_writer_facade(rx: Receiver<QueueCommand>, state: &BaseState) -> JoinHandle<()> {
|
||||||
|
let s1 = state.clone();
|
||||||
|
let writer = thread::spawn(move || writer_facade(s1, rx));
|
||||||
|
writer
|
||||||
|
}
|
||||||
|
|
||||||
|
fn writer_facade(state: BaseState, rx: Receiver<QueueCommand>) {
|
||||||
|
std::fs::create_dir_all("data").expect("error when creating data dir");
|
||||||
let file = OpenOptions::new()
|
let file = OpenOptions::new()
|
||||||
.append(true)
|
.append(true)
|
||||||
.create(true)
|
.create(true)
|
||||||
.open("data/metrics.ndjson")
|
.open("data/metrics.ndjson")
|
||||||
.expect("error when opening file");
|
.expect("error when opening file");
|
||||||
let state = BaseState::new(file);
|
let mut writer = BufWriter::new(file);
|
||||||
let app = Router::new().route("/metrics", post(receive_metric)).with_state(state.clone());
|
|
||||||
let listener = TcpListener::bind("0.0.0.0:3000").await.expect("error when opening port 3000");
|
|
||||||
println!("Starting Simple TSDB... listening on: localhost <port: 3000>");
|
|
||||||
|
|
||||||
let start_time = std::time::SystemTime::now();
|
let buffer_flush = |writer: &mut BufWriter<File>, buffer: &mut Vec<String>| -> usize {
|
||||||
axum::serve(listener, app).with_graceful_shutdown(shutdown_signal()).await.expect("internal server error");
|
let amount = buffer.len();
|
||||||
|
if amount <= 0 {
|
||||||
|
return 0usize;
|
||||||
|
}
|
||||||
|
|
||||||
{
|
for line in buffer.drain(..) {
|
||||||
let mut file = state.file.lock().expect("error when locking file");
|
writer.write_all(line.as_bytes()).unwrap();
|
||||||
file.flush().expect("error when final flushing");
|
writer.write_all(b"\n").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// thread::sleep(Duration::from_millis(20)); // delay of 20ms - simulate ~50ops/s
|
||||||
|
amount
|
||||||
|
};
|
||||||
|
|
||||||
|
let flush_writer = |writer: &mut BufWriter<File>| {
|
||||||
|
writer.flush().expect("error when flushing writer");
|
||||||
|
};
|
||||||
|
|
||||||
|
let shutdown = |writer: &mut BufWriter<File>, buffer: &mut Vec<String>| {
|
||||||
|
state.running.store(false, Ordering::Relaxed);
|
||||||
|
let amount = buffer_flush(writer, buffer);
|
||||||
|
state.queue_size.fetch_sub(amount, Ordering::Relaxed);
|
||||||
|
flush_writer(writer);
|
||||||
|
};
|
||||||
|
|
||||||
|
let buffer_capacity = 50;
|
||||||
|
let mut buffer: Vec<String> = Vec::with_capacity(buffer_capacity);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match rx.recv_timeout(Duration::from_millis(100)) {
|
||||||
|
Ok(QueueCommand::WriteLine(line)) => {
|
||||||
|
buffer.push(line);
|
||||||
|
if buffer.len() >= buffer_capacity {
|
||||||
|
let amount = buffer_flush(&mut writer, &mut buffer);
|
||||||
|
state.queue_size.fetch_sub(amount, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Timeout) => {
|
||||||
|
if !buffer.is_empty() {
|
||||||
|
let amount = buffer_flush(&mut writer, &mut buffer);
|
||||||
|
state.queue_size.fetch_sub(amount, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(QueueCommand::Shutdown) => {
|
||||||
|
println!("\nStarting shutdown...");
|
||||||
|
shutdown(&mut writer, &mut buffer);
|
||||||
|
break; // quit the loop
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Disconnected) => {
|
||||||
|
println!("error when receiving command: disconnected");
|
||||||
|
shutdown(&mut writer, &mut buffer);
|
||||||
|
break; // quit the loop
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let elapsed = start_time.elapsed().expect("error when measuring elapsed time");
|
|
||||||
println!("\nStoping server. Analysing metrics...\n");
|
|
||||||
print_metrics_report(elapsed, "data/metrics.ndjson");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_metric(
|
async fn handler(
|
||||||
State(state): State<BaseState>,
|
State(state): State<BaseState>,
|
||||||
Json(metric): Json<Metric>) -> &'static str {
|
Json(metric): Json<Metric>) -> StatusCode {
|
||||||
println!("metric received: {:?}", metric);
|
|
||||||
|
|
||||||
let line = serde_json::to_string(&metric).expect("error when serializing metric");
|
let line = serde_json::to_string(&metric).expect("error when serializing metric");
|
||||||
|
state.queue_size.fetch_add(1, Ordering::Relaxed);
|
||||||
let mut file = state.file.lock().expect("error when locking file");
|
match state.sender.try_send(QueueCommand::WriteLine(line)) {
|
||||||
writeln!(file, "{line}").expect("error when writing to file");
|
Ok(_) => {
|
||||||
|
StatusCode::OK
|
||||||
"ok"
|
},
|
||||||
|
Err(_) => {
|
||||||
|
state.queue_size.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
StatusCode::SERVICE_UNAVAILABLE
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal(state: BaseState) {
|
||||||
signal::ctrl_c().await.expect("fail when waiting for Ctrl+C");
|
signal::ctrl_c().await.expect("fail when waiting for Ctrl+C");
|
||||||
println!("\nCtrl+C ack. Starting shutdown...");
|
state.sender.send(QueueCommand::Shutdown).expect("error when sending flush signal");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn print_metrics_report(elapsed: Duration, path: &str) {
|
fn print_metrics_report(elapsed: Duration, path: &str) {
|
||||||
@ -115,8 +226,8 @@ fn print_metrics_report(elapsed: Duration, path: &str) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let elapsed_time = elapsed.as_secs();
|
let elapsed_time = elapsed.as_secs_f64().max(0.001);
|
||||||
let rate = total as f64 / elapsed_time as f64;
|
let rate = total as f64 / elapsed_time;
|
||||||
println!("Number of valid metrics: {total} in {elapsed_time}s ({rate} metrics/s)");
|
println!("Number of valid metrics: {total} in {elapsed_time}s ({rate} metrics/s)");
|
||||||
if invalid_lines > 0 {
|
if invalid_lines > 0 {
|
||||||
println!("Ignored values: {invalid_lines}");
|
println!("Ignored values: {invalid_lines}");
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user