156 lines
4.8 KiB
Rust
156 lines
4.8 KiB
Rust
use std::collections::VecDeque;
|
|
|
|
use prometeu_bytecode::HeapRef;
|
|
|
|
/// Deterministic cooperative scheduler core.
|
|
///
|
|
/// Policy:
|
|
/// - FIFO for ready coroutines.
|
|
/// - Sleeping coroutines are ordered by `wake_tick` and moved to ready when `wake_tick <= current_tick`.
|
|
/// - No randomness, no preemption, no context switching here.
|
|
#[derive(Debug, Default)]
|
|
pub struct Scheduler {
|
|
/// Queue of runnable coroutines (FIFO)
|
|
ready_queue: VecDeque<HeapRef>,
|
|
/// Sleeping list kept sorted by (wake_tick, insertion_order)
|
|
sleeping: Vec<SleepEntry>,
|
|
/// Currently selected coroutine (purely informational here)
|
|
current: Option<HeapRef>,
|
|
/// Monotonic counter to preserve deterministic order for same wake_tick
|
|
next_seq: u64,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
struct SleepEntry {
|
|
wake_tick: u64,
|
|
seq: u64,
|
|
coro: HeapRef,
|
|
}
|
|
|
|
impl Scheduler {
|
|
pub fn new() -> Self {
|
|
Self::default()
|
|
}
|
|
|
|
// ---------- Ready queue operations ----------
|
|
|
|
/// Enqueue a coroutine to the ready FIFO.
|
|
pub fn enqueue_ready(&mut self, coro: HeapRef) {
|
|
self.ready_queue.push_back(coro);
|
|
}
|
|
|
|
/// Dequeue next ready coroutine (front of FIFO).
|
|
pub fn dequeue_next(&mut self) -> Option<HeapRef> {
|
|
self.ready_queue.pop_front()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub fn is_ready_empty(&self) -> bool {
|
|
self.ready_queue.is_empty()
|
|
}
|
|
#[cfg(test)]
|
|
pub fn ready_len(&self) -> usize {
|
|
self.ready_queue.len()
|
|
}
|
|
|
|
// ---------- Current tracking (no switching here) ----------
|
|
pub fn set_current(&mut self, coro: Option<HeapRef>) {
|
|
self.current = coro;
|
|
}
|
|
#[cfg(test)]
|
|
pub fn current(&self) -> Option<HeapRef> {
|
|
self.current
|
|
}
|
|
pub fn clear_current(&mut self) {
|
|
self.current = None;
|
|
}
|
|
|
|
// ---------- Sleeping operations ----------
|
|
|
|
/// Put a coroutine to sleep until `wake_tick`.
|
|
/// A coroutine is woken when `current_tick >= wake_tick`.
|
|
/// The sleeping list is kept stably ordered to guarantee determinism.
|
|
pub fn sleep_until(&mut self, coro: HeapRef, wake_tick: u64) {
|
|
let entry = SleepEntry { wake_tick, seq: self.next_seq, coro };
|
|
self.next_seq = self.next_seq.wrapping_add(1);
|
|
|
|
// Binary search insertion point by wake_tick, then by seq to keep total order deterministic
|
|
let idx = match self.sleeping.binary_search_by(|e| {
|
|
if e.wake_tick == entry.wake_tick {
|
|
e.seq.cmp(&entry.seq)
|
|
} else {
|
|
e.wake_tick.cmp(&entry.wake_tick)
|
|
}
|
|
}) {
|
|
Ok(i) => i, // equal element position; insert after to preserve FIFO among equals
|
|
Err(i) => i,
|
|
};
|
|
self.sleeping.insert(idx, entry);
|
|
}
|
|
|
|
/// Move all sleeping coroutines with `wake_tick <= current_tick` to ready queue (FIFO by wake order).
|
|
pub fn wake_ready(&mut self, current_tick: u64) {
|
|
// Find split point where wake_tick > current_tick
|
|
let split = self.sleeping.partition_point(|e| e.wake_tick <= current_tick);
|
|
if split == 0 {
|
|
return;
|
|
}
|
|
let mut ready_slice: Vec<SleepEntry> = self.sleeping.drain(0..split).collect();
|
|
// Already in order by (wake_tick, seq); push in that order to preserve determinism
|
|
for e in ready_slice.drain(..) {
|
|
self.ready_queue.push_back(e.coro);
|
|
}
|
|
}
|
|
|
|
/// Returns true if there are any sleeping coroutines.
|
|
pub fn has_sleeping(&self) -> bool {
|
|
!self.sleeping.is_empty()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
fn hr(id: u32) -> HeapRef {
|
|
HeapRef(id)
|
|
}
|
|
|
|
#[test]
|
|
fn fifo_ready_queue_is_deterministic() {
|
|
let mut s = Scheduler::new();
|
|
s.enqueue_ready(hr(1));
|
|
s.enqueue_ready(hr(2));
|
|
s.enqueue_ready(hr(3));
|
|
|
|
assert_eq!(s.ready_len(), 3);
|
|
assert_eq!(s.dequeue_next(), Some(hr(1)));
|
|
assert_eq!(s.dequeue_next(), Some(hr(2)));
|
|
assert_eq!(s.dequeue_next(), Some(hr(3)));
|
|
assert_eq!(s.dequeue_next(), None);
|
|
}
|
|
|
|
#[test]
|
|
fn sleeping_wake_is_stable_and_fifo_by_insertion() {
|
|
let mut s = Scheduler::new();
|
|
s.sleep_until(hr(10), 5); // first with wake at 5
|
|
s.sleep_until(hr(11), 5); // second with same wake
|
|
s.sleep_until(hr(12), 6); // later wake
|
|
|
|
// Before tick 5: nothing wakes
|
|
s.wake_ready(4);
|
|
assert!(s.is_ready_empty());
|
|
|
|
// At tick 5: first two wake in insertion order
|
|
s.wake_ready(5);
|
|
assert_eq!(s.dequeue_next(), Some(hr(10)));
|
|
assert_eq!(s.dequeue_next(), Some(hr(11)));
|
|
assert!(s.is_ready_empty());
|
|
|
|
// At tick 6: last one wakes
|
|
s.wake_ready(6);
|
|
assert_eq!(s.dequeue_next(), Some(hr(12)));
|
|
assert!(s.is_ready_empty());
|
|
}
|
|
}
|