diff --git a/crates/console/prometeu-vm/src/heap.rs b/crates/console/prometeu-vm/src/heap.rs index 7b1e5226..946cae23 100644 --- a/crates/console/prometeu-vm/src/heap.rs +++ b/crates/console/prometeu-vm/src/heap.rs @@ -35,6 +35,7 @@ pub enum CoroutineState { /// Stored payload for coroutine objects. #[derive(Debug, Clone)] pub struct CoroutineData { + pub pc: usize, pub state: CoroutineState, pub wake_tick: u64, pub stack: Vec, @@ -100,6 +101,7 @@ impl Heap { /// `payload_len` is 0; stack and frames are stored out-of-line for GC visibility. pub fn allocate_coroutine( &mut self, + pc: usize, state: CoroutineState, wake_tick: u64, stack: Vec, @@ -111,7 +113,7 @@ impl Heap { payload: Vec::new(), array_elems: None, closure_env: None, - coroutine: Some(CoroutineData { state, wake_tick, stack, frames }), + coroutine: Some(CoroutineData { pc, state, wake_tick, stack, frames }), }; let idx = self.objects.len(); self.objects.push(Some(obj)); @@ -125,6 +127,21 @@ impl Heap { self.objects[idx].is_some() } + /// Returns a shared reference to the coroutine data for the given handle, if it is a Coroutine. + pub fn coroutine_data(&self, r: HeapRef) -> Option<&CoroutineData> { + let idx = r.0 as usize; + self.objects.get(idx).and_then(|slot| slot.as_ref()).and_then(|obj| obj.coroutine.as_ref()) + } + + /// Returns a mutable reference to the coroutine data for the given handle, if it is a Coroutine. + pub fn coroutine_data_mut(&mut self, r: HeapRef) -> Option<&mut CoroutineData> { + let idx = r.0 as usize; + self.objects + .get_mut(idx) + .and_then(|slot| slot.as_mut()) + .and_then(|obj| obj.coroutine.as_mut()) + } + /// Get immutable access to an object's header by handle. pub fn header(&self, r: HeapRef) -> Option<&ObjectHeader> { self.objects @@ -363,6 +380,7 @@ mod tests { // Create a coroutine with a small stack containing a HeapRef to verify GC traversal later. let obj_ref = heap.allocate_object(ObjectKind::Bytes, &[4, 5, 6]); let coro = heap.allocate_coroutine( + 0, CoroutineState::Ready, 0, vec![Value::Int32(1), Value::HeapRef(obj_ref)], diff --git a/crates/console/prometeu-vm/src/scheduler.rs b/crates/console/prometeu-vm/src/scheduler.rs index ec94edfa..6b2ae6ba 100644 --- a/crates/console/prometeu-vm/src/scheduler.rs +++ b/crates/console/prometeu-vm/src/scheduler.rs @@ -87,6 +87,9 @@ impl Scheduler { self.ready_queue.push_back(e.coro); } } + + /// Returns true if there are any sleeping coroutines. + pub fn has_sleeping(&self) -> bool { !self.sleeping.is_empty() } } #[cfg(test)] diff --git a/crates/console/prometeu-vm/src/virtual_machine.rs b/crates/console/prometeu-vm/src/virtual_machine.rs index 08170ac0..8bd388e8 100644 --- a/crates/console/prometeu-vm/src/virtual_machine.rs +++ b/crates/console/prometeu-vm/src/virtual_machine.rs @@ -6,12 +6,14 @@ use prometeu_bytecode::isa::core::CoreOpCode as OpCode; use prometeu_bytecode::ProgramImage; use prometeu_bytecode::Value; use crate::roots::{RootVisitor, visit_value_for_roots}; -use crate::heap::Heap; +use crate::heap::{Heap, CoroutineState}; use crate::object::ObjectKind; +use crate::scheduler::Scheduler; use prometeu_bytecode::{ TRAP_BAD_RET_SLOTS, TRAP_DIV_ZERO, TRAP_INVALID_FUNC, TRAP_INVALID_SYSCALL, TRAP_OOB, TRAP_STACK_UNDERFLOW, TRAP_TYPE, TrapInfo, }; +use prometeu_bytecode::HeapRef; use prometeu_hal::vm_fault::VmFault; /// Reason why the Virtual Machine stopped execution during a specific run. @@ -87,13 +89,14 @@ pub struct VirtualMachine { /// Cooperative scheduler: set to true when `YIELD` opcode is executed. /// The runtime/scheduler should only act on this at safepoints (FRAME_SYNC). pub yield_requested: bool, + /// If set, the current coroutine requested to sleep until this tick (inclusive). + pub sleep_requested_until: Option, /// Logical tick counter advanced at each FRAME_SYNC boundary. pub current_tick: u64, - /// If set, the current coroutine is sleeping until this tick (inclusive). - /// While sleeping and before `current_tick >= wake`, the VM will end the - /// logical frame immediately at the start of `step()` and after executing - /// `SLEEP`. - pub sleep_until_tick: Option, + /// Cooperative scheduler instance managing ready/sleeping queues. + pub scheduler: Scheduler, + /// Handle to the currently running coroutine (owns the active VM context). + pub current_coro: Option, } @@ -126,8 +129,10 @@ impl VirtualMachine { last_gc_live_count: 0, capabilities: 0, yield_requested: false, + sleep_requested_until: None, current_tick: 0, - sleep_until_tick: None, + scheduler: Scheduler::new(), + current_coro: None, } } @@ -150,7 +155,9 @@ impl VirtualMachine { self.halted = true; // execution is impossible until a successful load self.last_gc_live_count = 0; self.current_tick = 0; - self.sleep_until_tick = None; + self.sleep_requested_until = None; + self.scheduler = Scheduler::new(); + self.current_coro = None; // Preserve capabilities across loads; firmware may set them per cart. // Only recognized format is loadable: PBS v0 industrial format @@ -253,6 +260,18 @@ impl VirtualMachine { stack_base: 0, func_idx, }); + + // Initialize the main coroutine to own the current execution context. + // State = Running; not enqueued into ready queue. + let main_href = self.heap.allocate_coroutine( + self.pc, + CoroutineState::Running, + 0, + self.operand_stack.clone(), + self.call_stack.clone(), + ); + self.current_coro = Some(main_href); + self.scheduler.set_current(self.current_coro); } /// Executes the VM for a limited number of cycles (budget). @@ -335,15 +354,13 @@ impl VirtualMachine { native: &mut dyn NativeInterface, ctx: &mut HostContext, ) -> Result<(), LogicalFrameEndingReason> { - // If the current coroutine is sleeping and hasn't reached its wake tick, - // immediately end the logical frame to respect suspension semantics. - if let Some(wake) = self.sleep_until_tick { - if self.current_tick < wake { - // Consume FRAME_SYNC cost and perform safepoint duties. - self.cycles += OpCode::FrameSync.cycles(); - self.handle_safepoint(); - return Err(LogicalFrameEndingReason::FrameSync); - } + // If there is no currently running coroutine (e.g., all are sleeping), + // we cannot execute any instruction this frame. End the frame immediately + // with a safepoint to advance tick and potentially wake sleepers. + if self.current_coro.is_none() { + self.cycles += OpCode::FrameSync.cycles(); + self.handle_safepoint(); + return Err(LogicalFrameEndingReason::FrameSync); } if self.halted || self.pc >= self.program.rom.len() { return Ok(()); @@ -434,10 +451,72 @@ impl VirtualMachine { return Err(LogicalFrameEndingReason::Breakpoint); } OpCode::Spawn => { - // Placeholder: spawning is handled by the system runtime in a later PR. - // VM side does not switch; arguments/immediates will be handled when - // coroutine objects and ABI are fully wired. For now, it's a no-op here - // besides normal cycle accounting at the end of step. + // Operands: (fn_id, arg_count) + let (fn_id_u32, arg_count_u32) = instr + .imm_u32x2() + .map_err(|e| LogicalFrameEndingReason::Panic(format!("{:?}", e)))?; + let fn_id = fn_id_u32 as usize; + let arg_count = arg_count_u32 as usize; + + let callee = self.program.functions.get(fn_id).ok_or_else(|| { + self.trap( + TRAP_INVALID_FUNC, + opcode as u16, + format!("Invalid func_id {} in SPAWN", fn_id), + start_pc as u32, + ) + })?; + + let param_slots: u16 = callee.param_slots; + let local_slots: u16 = callee.local_slots; + let entry_pc = callee.code_offset as usize; + + if arg_count as u16 != param_slots { + return Err(self.trap( + TRAP_TYPE, + opcode as u16, + format!( + "SPAWN arg_count mismatch for func {}: expected {}, got {}", + fn_id, param_slots, arg_count + ), + start_pc as u32, + )); + } + + if self.operand_stack.len() < arg_count { + return Err(LogicalFrameEndingReason::Panic(format!( + "Stack underflow during SPAWN to func {}: expected at least {} arguments, got {}", + fn_id, + arg_count, + self.operand_stack.len() + ))); + } + + // Pop args top-first, then reverse to logical order arg1..argN + let mut args: Vec = Vec::with_capacity(arg_count); + for _ in 0..arg_count { + args.push(self.pop().map_err(|e| LogicalFrameEndingReason::Panic(e))?); + } + args.reverse(); + + // Build operand stack for the new coroutine: params followed by zeroed locals + let mut new_stack: Vec = Vec::with_capacity((param_slots + local_slots) as usize); + // Place user args as parameters + for v in args { new_stack.push(v); } + // Zero-init locals + for _ in 0..local_slots { new_stack.push(Value::Null); } + + // Initial frame for the coroutine (sentinel-like return to end-of-rom) + let frames = vec![CallFrame { return_pc: self.program.rom.len() as u32, stack_base: 0, func_idx: fn_id }]; + + let href = self.heap.allocate_coroutine( + entry_pc, + CoroutineState::Ready, + 0, + new_stack, + frames, + ); + self.scheduler.enqueue_ready(href); } OpCode::Yield => { // Cooperative yield: record intent; actual switching only at FRAME_SYNC. @@ -450,7 +529,7 @@ impl VirtualMachine { .imm_u32() .map_err(|e| LogicalFrameEndingReason::Panic(format!("{:?}", e)))? as u64; let wake = self.current_tick.saturating_add(duration); - self.sleep_until_tick = Some(wake); + self.sleep_requested_until = Some(wake); // End the logical frame right after the instruction completes // to ensure no further instructions run until at least next tick. @@ -1167,7 +1246,7 @@ impl VirtualMachine { /// Runs GC if thresholds are reached, clears cooperative yield flag, /// and advances the logical tick counter. fn handle_safepoint(&mut self) { - // GC Safepoint: only at FRAME_SYNC-like boundaries + // 1) GC Safepoint: only at FRAME_SYNC-like boundaries if self.gc_alloc_threshold > 0 { let live_now = self.heap.len(); let since_last = live_now.saturating_sub(self.last_gc_live_count); @@ -1179,8 +1258,8 @@ impl VirtualMachine { } let mut collector = CollectRoots(Vec::new()); self.visit_roots(&mut collector); - - // Add suspended coroutine handles as GC roots so their stacks/frames are scanned + // Add current coroutine and all suspended (ready/sleeping) coroutines as GC roots + if let Some(cur) = self.current_coro { collector.0.push(cur); } let mut coro_roots = self.heap.suspended_coroutine_handles(); collector.0.append(&mut coro_roots); @@ -1192,20 +1271,107 @@ impl VirtualMachine { } } - // Advance logical tick at every frame boundary. + // 2) Advance logical tick and wake sleepers self.current_tick = self.current_tick.wrapping_add(1); + self.scheduler.wake_ready(self.current_tick); - // If we've passed the wake tick, clear the sleep so execution can resume next frame. - if let Some(wake) = self.sleep_until_tick { - if self.current_tick >= wake { - self.sleep_until_tick = None; + // 3) Apply pending transitions for the current coroutine (yield/sleep/finished) + let mut switched_out = false; + if let Some(cur) = self.current_coro { + // Handle sleep request + if let Some(wake) = self.sleep_requested_until.take() { + if let Some(co) = self.heap.coroutine_data_mut(cur) { + // Save execution context into the coroutine object + co.pc = self.pc; + co.stack = std::mem::take(&mut self.operand_stack); + co.frames = std::mem::take(&mut self.call_stack); + co.state = CoroutineState::Sleeping; + co.wake_tick = wake; + } + self.scheduler.sleep_until(cur, wake); + self.current_coro = None; + self.scheduler.clear_current(); + switched_out = true; + } else if self.yield_requested { + if let Some(co) = self.heap.coroutine_data_mut(cur) { + co.pc = self.pc; + co.stack = std::mem::take(&mut self.operand_stack); + co.frames = std::mem::take(&mut self.call_stack); + co.state = CoroutineState::Ready; + } + self.scheduler.enqueue_ready(cur); + self.current_coro = None; + self.scheduler.clear_current(); + switched_out = true; + } else if self.halted || self.pc >= self.program.rom.len() { + // Current finished; save final context and mark Finished + if let Some(co) = self.heap.coroutine_data_mut(cur) { + co.pc = self.pc; + co.stack = std::mem::take(&mut self.operand_stack); + co.frames = std::mem::take(&mut self.call_stack); + co.state = CoroutineState::Finished; + } + self.current_coro = None; + self.scheduler.clear_current(); + switched_out = true; + } else { + // Stays running; nothing to do } } - // Clear cooperative yield request at the safepoint boundary. + // 4) Select next coroutine if needed + if self.current_coro.is_none() { + if let Some(next) = self.scheduler.dequeue_next() { + // Load next context into the VM + if let Some(co) = self.heap.coroutine_data_mut(next) { + self.pc = co.pc; + self.operand_stack = std::mem::take(&mut co.stack); + self.call_stack = std::mem::take(&mut co.frames); + co.state = CoroutineState::Running; + } + self.current_coro = Some(next); + self.scheduler.set_current(self.current_coro); + } else { + // Nothing ready now. If there are sleeping coroutines, we keep VM idle until next frame tick. + // If there are no sleeping coroutines either (i.e., all finished), we can halt deterministically. + if switched_out && !self.scheduler.has_sleeping() { + self.halted = true; + } + } + } else { + // Keep current as scheduler current for observability + self.scheduler.set_current(self.current_coro); + } + + // 5) Clear cooperative yield request at the safepoint boundary. self.yield_requested = false; } + /// Save the currently running VM execution context back into its coroutine object. + /// Must be called only at safepoints. + fn save_current_context_into_coroutine(&mut self) { + if let Some(cur) = self.current_coro { + if let Some(co) = self.heap.coroutine_data_mut(cur) { + co.pc = self.pc; + co.stack = std::mem::take(&mut self.operand_stack); + co.frames = std::mem::take(&mut self.call_stack); + } + } + } + + /// Load a coroutine context from heap into the VM runtime state. + /// Must be called only at safepoints. + fn load_coroutine_context_into_vm(&mut self, coro: HeapRef) { + if let Some(co) = self.heap.coroutine_data_mut(coro) { + self.pc = co.pc; + self.operand_stack = std::mem::take(&mut co.stack); + self.call_stack = std::mem::take(&mut co.frames); + co.state = CoroutineState::Running; + } + self.current_coro = Some(coro); + self.scheduler.set_current(self.current_coro); + } + pub fn trap( &self, code: u32, @@ -1308,6 +1474,8 @@ mod tests { code_len: rom_len, ..Default::default() }]); + // Ensure tests start with a properly initialized main coroutine at func 0 + vm.prepare_call("0"); vm } use crate::HostReturn; @@ -1356,7 +1524,9 @@ mod tests { // Frame 2: still sleeping (tick 1 < wake 2), immediate FrameSync, tick -> 2 let rep2 = vm.run_budget(100, &mut native, &mut ctx).expect("run ok"); assert!(matches!(rep2.reason, LogicalFrameEndingReason::FrameSync)); - assert!(vm.operand_stack.is_empty()); + // In the per-coroutine model, the VM may keep current context intact across idle frames; + // we must not observe any new values pushed before wake. Stack height must be unchanged. + assert_eq!(vm.operand_stack.len(), 0); assert_eq!(vm.current_tick, 2); // Frame 3: wake condition met (current_tick >= wake), execute PUSH_I32 then FRAME_SYNC @@ -1496,7 +1666,7 @@ mod tests { for i in 0..coro_count { let state = if i % 2 == 0 { CoroutineState::Ready } else { CoroutineState::Sleeping }; let wake = if state == CoroutineState::Sleeping { (i / 2) as u64 } else { 0 }; - let _c = vm.heap.allocate_coroutine(state, wake, vec![], vec![]); + let _c = vm.heap.allocate_coroutine(0, state, wake, vec![], vec![]); // Also allocate a tiny byte object to increase GC pressure. let _b = vm.heap.allocate_object(ObjectKind::Bytes, &[i as u8]); } @@ -1706,7 +1876,7 @@ mod tests { rom.extend_from_slice(&(OpCode::Halt as u16).to_le_bytes()); let cp = vec![Value::String("hello".into())]; - let mut vm = VirtualMachine::new(rom, cp); + let mut vm = new_test_vm(rom, cp); let mut native = MockNative; let mut ctx = HostContext::new(None); @@ -2975,21 +3145,23 @@ mod tests { // Allocate an unreachable object (no roots referencing it) let _orphan = vm.heap.allocate_object(ObjectKind::Bytes, &[1, 2, 3]); - assert_eq!(vm.heap.len(), 1); + // +1 for the main coroutine allocated by new_test_vm + assert_eq!(vm.heap.len(), 2); let mut native = MockNative; let mut ctx = HostContext::new(None); // Step 1: NOP — should not run GC vm.step(&mut native, &mut ctx).unwrap(); - assert_eq!(vm.heap.len(), 1, "GC must not run except at safepoints"); + assert_eq!(vm.heap.len(), 2, "GC must not run except at safepoints"); // Step 2: FRAME_SYNC — GC should run and reclaim the unreachable object match vm.step(&mut native, &mut ctx) { Err(LogicalFrameEndingReason::FrameSync) => {} other => panic!("Expected FrameSync, got {:?}", other), } - assert_eq!(vm.heap.len(), 0, "Unreachable object must be reclaimed at FRAME_SYNC"); + // Main coroutine remains + assert_eq!(vm.heap.len(), 1, "Unreachable object must be reclaimed at FRAME_SYNC"); } #[test] @@ -3014,7 +3186,8 @@ mod tests { // Allocate two objects; make one a root by placing it on the operand stack let rooted = vm.heap.allocate_object(ObjectKind::Bytes, &[9, 9]); let unreachable = vm.heap.allocate_object(ObjectKind::Bytes, &[8, 8, 8]); - assert_eq!(vm.heap.len(), 2); + // +1 for main coroutine + assert_eq!(vm.heap.len(), 3); vm.operand_stack.push(Value::HeapRef(rooted)); let mut native = MockNative; @@ -3026,8 +3199,8 @@ mod tests { other => panic!("Expected FrameSync, got {:?}", other), } - // Rooted must survive; unreachable must be collected - assert_eq!(vm.heap.len(), 1); + // Rooted must survive; unreachable must be collected; main coroutine remains + assert_eq!(vm.heap.len(), 2); assert!(vm.heap.is_valid(rooted)); assert!(!vm.heap.is_valid(unreachable)); } @@ -3054,7 +3227,8 @@ mod tests { // Cycle 1: allocate one unreachable object let _h1 = vm.heap.allocate_object(ObjectKind::Bytes, &[1]); - assert_eq!(vm.heap.len(), 1); + // +1 for main coroutine + assert_eq!(vm.heap.len(), 2); let mut native = MockNative; let mut ctx = HostContext::new(None); @@ -3064,17 +3238,18 @@ mod tests { Err(LogicalFrameEndingReason::FrameSync) => {} other => panic!("Expected FrameSync, got {:?}", other), } - assert_eq!(vm.heap.len(), 0); + // Main coroutine remains + assert_eq!(vm.heap.len(), 1); // Cycle 2: allocate again and collect again deterministically let _h2 = vm.heap.allocate_object(ObjectKind::Bytes, &[2]); - assert_eq!(vm.heap.len(), 1); + assert_eq!(vm.heap.len(), 2); // Second FRAME_SYNC should also be reached deterministically match vm.step(&mut native, &mut ctx) { Err(LogicalFrameEndingReason::FrameSync) => {} other => panic!("Expected FrameSync, got {:?}", other), } - assert_eq!(vm.heap.len(), 0); + assert_eq!(vm.heap.len(), 1); } #[test] @@ -3102,7 +3277,8 @@ mod tests { let byte = (i & 0xFF) as u8; let _ = vm.heap.allocate_object(ObjectKind::Bytes, &[byte]); } - assert_eq!(vm.heap.len(), count); + // +1 for main coroutine + assert_eq!(vm.heap.len(), count + 1); let mut native = MockNative; let mut ctx = HostContext::new(None); @@ -3113,7 +3289,7 @@ mod tests { other => panic!("Expected FrameSync, got {:?}", other), } - assert_eq!(vm.heap.len(), 0, "All short-lived objects must be reclaimed deterministically"); + assert_eq!(vm.heap.len(), 1, "All short-lived objects except main coroutine must be reclaimed deterministically"); } #[test] @@ -3139,13 +3315,14 @@ mod tests { // Allocate a heap object and a suspended coroutine that captures it on its stack let captured = vm.heap.allocate_object(ObjectKind::Bytes, &[0xAA, 0xBB]); let _coro = vm.heap.allocate_coroutine( + 0, CoroutineState::Ready, 0, vec![Value::HeapRef(captured)], vec![], ); - assert_eq!(vm.heap.len(), 2, "object + coroutine must be allocated"); + assert_eq!(vm.heap.len(), 3, "object + suspended coroutine + main coroutine must be allocated"); let mut native = MockNative; let mut ctx = HostContext::new(None); @@ -3157,7 +3334,8 @@ mod tests { } assert!(vm.heap.is_valid(captured), "captured object must remain alive"); - assert_eq!(vm.heap.len(), 2, "both coroutine and captured object must survive"); + // Captured object + suspended coroutine + main coroutine + assert_eq!(vm.heap.len(), 3, "both coroutine and captured object must survive (plus main)"); } #[test] @@ -3179,7 +3357,7 @@ mod tests { vm.gc_alloc_threshold = 1; // Allocate a finished coroutine with no external references - let finished = vm.heap.allocate_coroutine(CoroutineState::Finished, 0, vec![], vec![]); + let finished = vm.heap.allocate_coroutine(0, CoroutineState::Finished, 0, vec![], vec![]); assert!(vm.heap.is_valid(finished)); let mut native = MockNative; @@ -3192,7 +3370,8 @@ mod tests { } assert!(!vm.heap.is_valid(finished), "finished coroutine must be collected"); - assert_eq!(vm.heap.len(), 0, "no objects should remain"); + // Main coroutine remains allocated + assert_eq!(vm.heap.len(), 1, "only main coroutine should remain"); } #[test]