From 4f269b7cfee03519999eb294c214b2a4f4ce7312 Mon Sep 17 00:00:00 2001 From: Dorian Weber <weber@informatik.hu-berlin.de> Date: Fri, 1 Nov 2024 17:23:28 +0100 Subject: [PATCH] Refactoring of unrelated fields in the Calendar into the Simulator, documentation, some performance improvements, and renaming of some structures to bring them closer to ODEM-rs. --- Cargo.toml | 1 + build.rs | 10 +- examples/barbershop.rs | 16 +-- examples/ferry.rs | 8 +- examples/philosophers.rs | 6 +- src/lib.rs | 278 +++++++++++++++++++++------------------ src/util.rs | 90 ++++++------- 7 files changed, 216 insertions(+), 193 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 919ef13..a6c6229 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [profile.release] lto = true +debug = "full" opt-level = 3 [profile.bench] diff --git a/build.rs b/build.rs index 3c92fb4..b90be49 100644 --- a/build.rs +++ b/build.rs @@ -97,13 +97,15 @@ fn main() { // there is no need to stop the build altogether if the compilation failed if let Err(msg) = result { - println!("cargo:warning=Compiling ODEMx-lite has failed."); + println!("cargo::warning=Compiling ODEMx-lite has failed."); println!( - "cargo:warning=This library will still work, \ + "cargo::warning=This library will still work, \ the C++ benchmarks will not!" ); - println!("cargo:warning={}", msg); + println!("cargo::warning={}", msg); } else { - println!("cargo:rustc-cfg=feature=\"odemx\""); + println!("cargo::rustc-cfg=odemx"); } + + println!("cargo::rustc-check-cfg=cfg(odemx)"); } diff --git a/examples/barbershop.rs b/examples/barbershop.rs index 948e9dd..6738127 100644 --- a/examples/barbershop.rs +++ b/examples/barbershop.rs @@ -54,8 +54,8 @@ impl Customer { sim.global().wait_time.tabulate(sim.now() - arrival_time); // Simulate the time taken for the haircut. - sim.advance(sim.global().rng_s.borrow_mut().gen_range(12.0..18.0)) - .await; + let cut = sim.global().rng_s.borrow_mut().gen_range(12.0..18.0); + sim.advance(cut).await; // Release the barber for the next customer. sim.global().joe.release(); @@ -72,8 +72,8 @@ async fn sim_main(sim: Sim<'_, Barbershop>, duration: Time) { sim.activate(async move { loop { // Wait for a random time before the next customer arrives. - sim.advance(sim.global().rng_a.borrow_mut().gen_range(12.0..24.0)) - .await; + let wait_time = sim.global().rng_a.borrow_mut().gen_range(12.0..24.0); + sim.advance(wait_time).await; // If the simulation time exceeds the duration, stop generating customers. if sim.now() >= duration { @@ -110,11 +110,11 @@ fn main() { wait_time: RandomVar::new(), }, // Simulation entry point. - |sim| Process::new(sim, sim_main(sim, 60.0 * 24.0 * 7.0 * 3.0)), // Simulate for 3 weeks. + |sim| Process::new(sim, sim_main(sim, 3.0 * 7.0 * 24.0 * 60.0)), ); // Print statistics after the simulation ends. - println!("wait_time: {:#.3}", result.wait_time); + println!("wait_time: {:#.3?}", result.wait_time); } #[cfg(test)] @@ -134,7 +134,7 @@ mod bench { criterion_group, AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, }; - #[cfg(feature = "odemx")] + #[cfg(odemx)] mod odemx { use std::os::raw::c_double; @@ -199,7 +199,7 @@ mod bench { } // Benchmark the C++ implementation. - #[cfg(feature = "odemx")] + #[cfg(odemx)] group.bench_function(BenchmarkId::new("ODEMx", sim_duration), |b| { b.iter(|| unsafe { odemx::barbershop(sim_duration as _) }) }); diff --git a/examples/ferry.rs b/examples/ferry.rs index cd2babd..0d31a28 100644 --- a/examples/ferry.rs +++ b/examples/ferry.rs @@ -216,7 +216,7 @@ fn main() { |sim| { Process::new( sim, - sim_main(sim, 24.0 * 60.0 * 7.0, FERRY_COUNT, HARBOR_COUNT), + sim_main(sim, 24.0 * 60.0 * 365.0, FERRY_COUNT, HARBOR_COUNT), ) }, ); @@ -224,9 +224,9 @@ fn main() { // Output simulation statistics. println!("Number of harbors: {}", HARBOR_COUNT); println!("Number of ferries: {}", FERRY_COUNT); - println!("Car wait time: {:#.3}", result.car_wait_time); - println!("Ferry cargo len: {:#.3}", result.ferry_cargo_len); - println!("Ferry load time: {:#.3}", result.ferry_load_time); + println!("Car wait time: {:#.3?}", result.car_wait_time); + println!("Ferry cargo len: {:#.3?}", result.ferry_cargo_len); + println!("Ferry load time: {:#.3?}", result.ferry_load_time); } #[cfg(test)] diff --git a/examples/philosophers.rs b/examples/philosophers.rs index 02d4e45..d3cdedf 100644 --- a/examples/philosophers.rs +++ b/examples/philosophers.rs @@ -183,14 +183,14 @@ fn philosophers(count: usize, reruns: usize) -> RandomVar { .get() }) .fold( - || RandomVar::new(), + RandomVar::new, |var, duration| { var.tabulate(duration); var }, ) .reduce( - || RandomVar::new(), + RandomVar::new, |var_a, var_b| { var_a.merge(&var_b); var_a @@ -206,7 +206,7 @@ fn main() { const EXPERIMENT_COUNT: usize = 500; let sim_duration = philosophers(PHILOSOPHER_COUNT, EXPERIMENT_COUNT); - println!("Simulation duration until deadlock: {:#}", sim_duration); + println!("Simulation duration until deadlock: {:#?}", sim_duration); } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index 92e3d03..d71854b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,16 @@ //! //! - **`simulation`**: Runs a single simulation given a shared global state //! and a main process function. It sets up the scheduler (`Calendar`), -//! initializes the simulation context (`Sim`), and manages the event loop -//! until the main process terminates. +//! initializes a handle to the simulation context (`Sim`), and manages the +//! event loop until the main process terminates. //! -//! - **`Sim`**: A lightweight handle to the scheduler, providing methods for -//! processes to interact with the simulation. Processes can use it to -//! schedule themselves or other processes, advance simulation time, retrieve -//! the current time, and access shared global data. +//! - **`Simulator`**: The simulation context, containing the event calendar, +//! an owning registry for all processes, and a reference to the shared data. +//! +//! - **`Sim`**: A lightweight handle to the simulation context, providing +//! methods for processes to interact with the simulation. Processes can use +//! it to schedule themselves or other processes, advance simulation time, +//! retrieve the current model-time, and access shared global data. //! //! - **`Process`**: Wraps a `Future` to represent a process in the simulation. //! It can be scheduled to run at specific model times and can also act as a @@ -52,12 +55,10 @@ //! or `Sync` safe. //! - Processes manage their own scheduling and can be reactivated by the //! simulation context. -//! - Unsafe code is used internally (e.g., raw pointers in `Sim`), but safety -//! is maintained as long as simulation contexts do not escape the closure -//! passed to `simulation` which is enforced by the type system. +//! - Unsafe code is used internally, but safety is maintained as long as +//! strongly owned processes do not escape the closure passed to `simulation` //! - Wakers are customized to integrate with the simulation's process -//! scheduling, and care is taken to prevent concurrency bugs, assuming -//! single-threaded execution. +//! scheduling. //! //! This framework is intended for educational purposes as part of my (Dorian //! Weber) dissertation to illustrate how asynchronous Rust can be used to @@ -69,7 +70,7 @@ use std::{ cmp::Ordering, collections::BinaryHeap, future::{poll_fn, Future}, - hash::{BuildHasherDefault, Hash, Hasher}, + hash::{Hash, Hasher}, mem::ManuallyDrop, pin::Pin, rc::{Rc, Weak}, @@ -87,15 +88,19 @@ pub type Time = f64; /// process. pub fn simulation<G, F>(shared: G, main: F) -> G where - F: FnOnce(Sim<'_, G>) -> Process<'_, G>, + F: for<'s> FnOnce(Sim<'s, G>) -> Process<'s, G>, { { - let sched; - let process_storage = Rc::default(); - - // create a fresh scheduler and a handle to it - sched = Calendar::new(&shared, Rc::downgrade(&process_storage)); - let sim = Sim { handle: &sched }; + let simulator; + let registry = Rc::default(); + + // create a fresh simulation context and a handle to it + simulator = Simulator { + shared: &shared, + calendar: RefCell::new(Calendar::default()), + registry: Rc::downgrade(®istry), + }; + let sim = Sim { handle: &simulator }; // construct a custom context to pass into the poll()-method let event_waker = StateEventWaker::new(sim); @@ -104,57 +109,67 @@ where // evaluate the passed function for the main process and schedule it let root = main(sim); - sched.schedule(root.clone()); + simulator.calendar.borrow_mut().schedule(root.clone()); // pop processes until empty or the main process terminates - while let Some(process) = sched.next_process() { - if process.poll(&mut cx).is_ready() { - if process == root { - break; - } - process_storage.borrow_mut().shift_remove(&process); + while let Some(process) = simulator.next_process() { + if process.poll(&mut cx).is_ready() && process == root { + break; + } + + // drop the active process if only the weak reference of the + // `active` field in the event calendar remains + if Rc::weak_count(&process.0) <= 1 { + registry.borrow_mut().swap_remove(&process); } } - // Here we drop `process_storage` first and `sched` after. Since `process_storage` - // is an `IndexSet` (and not a `HashSet`), all processes are guaranteed to be dropped, - // even if a destructor of a future unwinds. This ensures that any weak process - // wakers leaked out of the `simulation` function can no longer be upgraded. + // Here we drop `registry` first and `simulator` after. Since `registry` + // is an `IndexSet` (and not a `HashSet`), all processes are guaranteed + // to be dropped, even if a destructor of a future unwinds. This ensures + // that any weak process wakers leaked out of the `simulation` function + // can no longer be upgraded. } // return the global data shared } -// priority queue of time-process-pairs using time as the key -type EventQ<'s, G> = BinaryHeap<NextEvent<'s, G>>; +/// Private simulation context that holds a reference to the shared data, the +/// event calendar, and the owning container for all processes. +struct Simulator<'s, G> { + /// A reference to the globally shared data. + shared: &'s G, + + /// The scheduler for processes. + calendar: RefCell<Calendar<'s, G>>, + + /// The container that has strong ownership of the processes during a + /// simulation run. + registry: Weak<RefCell<indexmap::IndexSet<StrongProcess<'s, G>>>> +} -type FxIndexSet<T> = indexmap::IndexSet<T, BuildHasherDefault<rustc_hash::FxHasher>>; +impl<'s, G> Simulator<'s, G> { + /// Returns the next process according to the event calendar. + fn next_process(&self) -> Option<StrongProcess<'s, G>> { + self.calendar.borrow_mut().next_process() + } +} /// The (private) scheduler for processes. struct Calendar<'s, G> { /// The current simulation time. - now: Cell<Time>, - - inner: RefCell<CalendarInner<'s, G>>, - - /// Globally-accessible data. - shared: &'s G, - - process_storage: Weak<RefCell<FxIndexSet<StrongProcess<'s, G>>>>, -} - -struct CalendarInner<'s, G> { + now: Time, /// The event-calendar organized chronologically. - calendar: EventQ<'s, G>, - + calendar: BinaryHeap<NextEvent<'s, G>>, /// The currently active process. active: Option<Process<'s, G>>, } -impl<G> Default for CalendarInner<'_, G> { +impl<G> Default for Calendar<'_, G> { fn default() -> Self { Self { + now: Time::default(), calendar: Default::default(), active: Default::default(), } @@ -162,58 +177,47 @@ impl<G> Default for CalendarInner<'_, G> { } impl<'s, G> Calendar<'s, G> { - /// Creates a new scheduler. - #[inline] - fn new( - shared: &'s G, - process_storage: Weak<RefCell<FxIndexSet<StrongProcess<'s, G>>>>, - ) -> Self { - Self { - now: Cell::default(), - inner: RefCell::default(), - shared, - process_storage, - } - } - /// Schedules a process at the current simulation time. #[inline] - fn schedule(&self, process: Process<'s, G>) { + fn schedule(&mut self, process: Process<'s, G>) { self.schedule_in(Time::default(), process); } /// Schedules a process at a later simulation time. #[inline] - fn schedule_in(&self, dt: Time, process: Process<'s, G>) { - let calender = &mut self.inner.borrow_mut().calendar; - debug_assert!( - !calender.iter().any(|NextEvent(_, p)| *p == process), - "already scheduled" - ); - calender.push(NextEvent(self.now.get() + dt, process)); + fn schedule_in(&mut self, dt: Time, process: Process<'s, G>) { + let calender = &mut self.calendar; + let strong_process = process.upgrade().expect("already terminated"); + let is_scheduled = &strong_process.0.is_scheduled; + + assert!(!is_scheduled.get(), "already scheduled"); + is_scheduled.set(true); + + calender.push(NextEvent(self.now + dt, process)); } /// Removes the process with the next event time from the calendar and /// activates it. #[inline] - fn next_process(&self) -> Option<StrongProcess<'s, G>> { - let inner = &mut *self.inner.borrow_mut(); + fn next_process(&mut self) -> Option<StrongProcess<'s, G>> { loop { - let NextEvent(now, process) = inner.calendar.pop()?; + let NextEvent(now, process) = self.calendar.pop()?; let Some(strong_process) = process.upgrade() else { // process was terminated after being queued continue; }; - self.now.set(now); - inner.active = Some(process); - break Some(strong_process); + strong_process.0.is_scheduled.set(false); + self.now = now; + self.active = Some(process); + + return Some(strong_process); } } } -/// A light-weight handle to the scheduler. +/// A light-weight handle to the simulation context. pub struct Sim<'s, G = ()> { - handle: &'s Calendar<'s, G>, + handle: &'s Simulator<'s, G>, } // this allows the creation of copies @@ -231,8 +235,7 @@ impl<'s, G> Sim<'s, G> { /// Returns a (reference-counted) copy of the currently active process. #[inline] pub fn active(&self) -> Process<'s, G> { - self.sched() - .inner + self.handle.calendar .borrow() .active .as_ref() @@ -252,72 +255,72 @@ impl<'s, G> Sim<'s, G> { /// Reactivates a process that has been suspended with wait(). #[inline] pub fn reactivate(&self, process: Process<'s, G>) { - assert!(!process.is_terminated()); - self.sched().schedule(process); + self.handle.calendar.borrow_mut().schedule(process); } /// Reactivates the currently active process after some time has passed. #[inline] pub async fn advance(&self, dt: Time) { - self.sched().schedule_in(dt, self.active()); + let active = self.active(); + self.handle.calendar.borrow_mut().schedule_in(dt, active); sleep().await } /// Returns the current simulation time. #[inline] pub fn now(&self) -> Time { - self.sched().now.get() + self.handle.calendar.borrow().now } /// Returns a shared reference to the global data. #[inline] - pub fn global(&self) -> &G { - self.sched().shared - } - - /// Private function to get a safe reference to the scheduler. - #[inline] - fn sched(&self) -> &Calendar<'s, G> { - self.handle + pub fn global(&self) -> &'s G { + self.handle.shared } } /// A bare-bone process type that can also be used as a waker. -pub struct Process<'s, G>(Weak<RefCell<ProcessInner<'s, G>>>); +pub struct Process<'s, G>(Weak<ProcessInner<'s, G>>); /// A private accessor to the inner parts of a [`Process`]. /// -/// **WARNING**: This must never leak into user code! Forgetting a `StrongProcess` causes UB! -struct StrongProcess<'s, G>(Rc<RefCell<ProcessInner<'s, G>>>); +/// **WARNING**: This must never leak into user code! Forgetting a +/// `StrongProcess` (`std::mem::forget`) causes *undefined behavior* because +/// it will keep the processes alive past the simulation, leading to dangling +/// pointers due to the lifetime erasure experienced by the wakers. +struct StrongProcess<'s, G>(Rc<ProcessInner<'s, G>>); /// The private details of the [`Process`](struct.Process.html) type. struct ProcessInner<'s, G> { /// The simulation context needed to implement the `Waker` interface. context: Sim<'s, G>, + /// A boolean flag indicating whether this process is currently scheduled. + is_scheduled: Cell<bool>, /// `Some` [`Future`] associated with this process or `None` if it has been /// terminated externally. /// /// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html - state: Option<Pin<Box<dyn Future<Output = ()> + 's>>>, + state: RefCell<Option<Pin<Box<dyn Future<Output = ()> + 's>>>>, } impl<'s, G> Process<'s, G> { /// Combines a future and a simulation context to a process. #[inline] pub fn new(sim: Sim<'s, G>, fut: impl Future<Output = ()> + 's) -> Self { - let strong_process = StrongProcess(Rc::new(RefCell::new(ProcessInner { + let strong_process = StrongProcess(Rc::new(ProcessInner { context: sim, - state: Some(Box::pin(fut)), - }))); + is_scheduled: Cell::new(false), + state: RefCell::new(Some(Box::pin(fut))), + })); let process = strong_process.downgrade(); - let process_storage = sim + let registry = sim .handle - .process_storage + .registry .upgrade() .expect("attempted to create process after simulation ended"); - process_storage.borrow_mut().insert(strong_process); + registry.borrow_mut().insert(strong_process); process } @@ -334,24 +337,24 @@ impl<'s, G> Process<'s, G> { // ok, we're already terminated return; }; - let sim = strong_process.0.borrow().context; + let sim = strong_process.0.context; assert!( - sim.sched() - .inner + sim.handle.calendar .borrow() .active .as_ref() .is_none_or(|active| active != self), "attempted to terminate active process" ); - if let Some(process_storage) = sim.sched().process_storage.upgrade() { - process_storage.borrow_mut().shift_remove(&strong_process); + if let Some(registry) = sim.handle.registry.upgrade() { + registry.borrow_mut().swap_remove(&strong_process); } else { - // This can happen if we're dropping the `process_storage` and the destructor - // of another process tries to terminate this process. In this case, we need to - // manually drop our future now to ensure that there are no dangling references - // from our future to their future. - strong_process.0.borrow_mut().state = None; + // This can happen if we're dropping the `registry` and the + // destructor of another process tries to terminate this process. + // In this case, we need to manually drop our future now to ensure + // that there are no dangling references from our future to their + // future. + *strong_process.0.state.borrow_mut() = None; }; drop(strong_process); assert!(self.is_terminated(), "failed to terminate process"); @@ -363,12 +366,13 @@ impl<'s, G> Process<'s, G> { unsafe { task::Waker::from_raw(self.raw_waker()) } } + /// Returns whether this process has finished its life-cycle. #[inline] - fn is_terminated(&self) -> bool { + pub fn is_terminated(&self) -> bool { self.upgrade() - .is_none_or(|strong_process| strong_process.0.borrow().state.is_none()) + .is_none_or(|strong_process| strong_process.0.state.borrow().is_none()) } - + /// Gets private access to the process. /// /// # Safety @@ -386,18 +390,25 @@ impl<'s, G> StrongProcess<'s, G> { #[inline] fn poll(&self, cx: &mut Context<'_>) -> Poll<()> { self.0 - .borrow_mut() .state + .borrow_mut() .as_mut() .expect("attempted to poll terminated task") .as_mut() .poll(cx) } + /// Converts the owning process into a weaker referencing process. #[inline] fn downgrade(&self) -> Process<'s, G> { Process(Rc::downgrade(&self.0)) } + + /// Returns whether this process is currently scheduled. + #[inline] + fn is_scheduled(&self) -> bool { + self.0.is_scheduled.get() + } } // Increases the reference counter of this process. @@ -447,7 +458,7 @@ impl<G> Eq for StrongProcess<'_, G> {} // hash processes for pointer equality impl<G> Hash for StrongProcess<'_, G> { fn hash<H: Hasher>(&self, state: &mut H) { - self.0.as_ptr().hash(state); + Rc::as_ptr(&self.0).hash(state); } } @@ -495,31 +506,38 @@ impl<'s, G> Process<'s, G> { unsafe fn clone(this: *const ()) -> task::RawWaker { let waker_ref = - &*ManuallyDrop::new(Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>)); + &*ManuallyDrop::new(Weak::from_raw(this as *const ProcessInner<'_, G>)); // increase the reference counter once - let waker = waker_ref.clone(); - // this is technically unsafe because Wakers are Send + Sync and so this // call might be executed from a different thread, creating a data race // hazard; we leave preventing this as an exercise to the reader! + let waker = waker_ref.clone(); + task::RawWaker::new(Weak::into_raw(waker) as *const (), &Self::VTABLE) } unsafe fn wake(this: *const ()) { - let waker = Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>); + let waker = Weak::from_raw(this as *const ProcessInner<'_, G>); let process = Process(waker); process.wake_impl(); } unsafe fn wake_by_ref(this: *const ()) { // keep the waker alive by incrementing the reference count - let waker = Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>); + let waker = Weak::from_raw(this as *const ProcessInner<'_, G>); let process = &*ManuallyDrop::new(Process(waker)); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! process.clone().wake_impl(); } fn wake_impl(self) { + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! let Some(strong_process) = self.upgrade() else { // this can happen if a synchronization structure forgets to clean // up registered Waker objects on destruct; this would lead to @@ -527,11 +545,9 @@ impl<'s, G> Process<'s, G> { panic!("attempted to wake a terminated process"); }; - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - let sim = strong_process.0.borrow().context; - sim.reactivate(self); + if !strong_process.is_scheduled() { + strong_process.0.context.reactivate(self); + } } unsafe fn drop(this: *const ()) { @@ -563,9 +579,11 @@ impl<'s, G> StateEventWaker<'s, G> { } /// Constructs a new waker using only a reference. - /// - /// This function is unsafe because it is up to the user to ensure that the - /// waker doesn't outlive the reference. + /// + /// # Safety + /// + /// This function is unsafe because it is up to the caller to ensure that + /// the waker doesn't outlive the reference. #[inline] unsafe fn as_waker(&self) -> task::Waker { task::Waker::from_raw(task::RawWaker::new( @@ -652,6 +670,7 @@ mod tests { me: i32, drop_order: &'s RefCell<Vec<i32>>, } + impl Drop for PushDropOrderOnDrop<'_> { fn drop(&mut self) { self.drop_order.borrow_mut().push(self.me); @@ -706,6 +725,7 @@ mod tests { })) .unwrap_err(); + shared.drop_order.borrow_mut().sort(); assert_eq!(*shared.drop_order.borrow(), [1, 2, 3]); shared.channel.take().unwrap().wake(); diff --git a/src/util.rs b/src/util.rs index 0f0cde3..0f728fa 100644 --- a/src/util.rs +++ b/src/util.rs @@ -41,20 +41,13 @@ use crate::{sleep, waker, Process, Sim}; use std::{ cell::{Cell, RefCell}, collections::VecDeque, - fmt::{Display, Formatter}, + fmt, future::Future, pin::Pin, rc::{Rc, Weak}, task::{self, Context, Poll}, }; -/// 😿 -/// -/// This is fundamentally broken and needs to be replaced. -fn deep_will_wake(a: &task::Waker, b: &task::Waker) -> bool { - a.data() == b.data() && a.vtable() == b.vtable() -} - /// GPSS-inspired facility that houses one exclusive resource to be used by /// arbitrary many processes. #[derive(Default)] @@ -140,7 +133,7 @@ where use std::mem::transmute; // create a one-shot channel that reactivates the caller on write - let promise = &Promise::new(sim.active().waker()); + let promise = Promise::new(sim.active().waker()); // this unsafe block shortens the guaranteed lifetimes of the processes // contained in the scheduler; it is safe because the constructed future @@ -152,10 +145,10 @@ where { // create the two competing processes // a future optimization would be to keep them on the stack - let p1 = Process::new(sim, async move { + let p1 = Process::new(sim, async { promise.fulfill(either.await); }); - let p2 = Process::new(sim, async move { + let p2 = Process::new(sim, async { promise.fulfill(or.await); }); @@ -166,9 +159,10 @@ where } } - // `p1` and `p2` borrow from `promise` and must therefore be dropped before `promise` - // is dropped. In particular, we must ensure that the future returned by the outer - // `async fn` outlives the futures crated by the `async` blocks above. + // `p1` and `p2` borrow from `promise` and must therefore be dropped + // before `promise` is dropped. In particular, we must ensure that the + // future returned by the outer `async fn` outlives the futures crated + // by the `async` blocks above. let _g1 = TerminateOnDrop(p1.clone()); let _g2 = TerminateOnDrop(p2.clone()); @@ -238,7 +232,7 @@ impl<T> Drop for Sender<T> { if let Some(chan) = self.0.upgrade() { // check if we're the last sender to drop if Rc::weak_count(&chan) == 1 { - // awake all the waiting receivers so that they get to return + // awake all waiting receivers so that they get to return for process in chan.borrow_mut().waiting.drain(..) { process.wake(); } @@ -278,7 +272,7 @@ impl<T> Channel<T> { /// Private method that unregisters a previously registered waker. fn unregister(&mut self, waker: task::Waker) { - self.waiting.retain(|elem| !deep_will_wake(&waker, elem)); + self.waiting.retain(|elem| !waker.will_wake(elem)); } /// Private method inserting a message into the queue. @@ -370,7 +364,7 @@ impl<T> Drop for ReceiveFuture<'_, T> { /// [wakers]: https://doc.rust-lang.org/std/task/struct.Waker.html /// [`until()`]: fn.until.html /// [`Control`]: struct.Control.html -pub trait Controlled { +pub trait Publisher { /// Allows a [waker] to subscribe to the controlled expression to be /// informed about any and all state changes. /// @@ -390,13 +384,19 @@ pub trait Controlled { /// Unsubscribes a previously subscribed [waker] from the controlled /// expression. /// + /// # Safety + /// This method is unsafe, because it asserts that the waker that it + /// receives has been registered using [subscribe] previously, and has not + /// been unsubscribed from yet. + /// /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + /// [subscribe]: Self::subscribe unsafe fn unsubscribe(&self, waker: &task::Waker); } /// Guarding structure that ensures that waker and controlled expression are /// valid and fixed in space. -pub struct WakerSpan<'s, C: ?Sized + Controlled> { +pub struct WakerSpan<'s, C: ?Sized + Publisher> { /// The controlled expression. cv: &'s C, /// The waker. @@ -404,7 +404,7 @@ pub struct WakerSpan<'s, C: ?Sized + Controlled> { } // implement a constructor for the guard -impl<'s, C: ?Sized + Controlled> WakerSpan<'s, C> { +impl<'s, C: ?Sized + Publisher> WakerSpan<'s, C> { /// Subscribes a [waker] to a controlled expression for a certain duration. /// /// This method subscribes the waker to the controlled expression and @@ -424,25 +424,26 @@ impl<'s, C: ?Sized + Controlled> WakerSpan<'s, C> { } // implement drop for the guard -impl<C: ?Sized + Controlled> Drop for WakerSpan<'_, C> { +impl<C: ?Sized + Publisher> Drop for WakerSpan<'_, C> { #[inline] fn drop(&mut self) { + // this is safe because we subscribed in the only public constructor unsafe { self.cv.unsubscribe(self.waker); } } } -// a pointer to a controlled expression is also a controlled expression -impl<T: Controlled> Controlled for &'_ T { +// a reference of a controlled expression is also a controlled expression +impl<T: Publisher> Publisher for &'_ T { #[inline] unsafe fn subscribe(&self, waker: &task::Waker) { - Controlled::subscribe(*self, waker); + Publisher::subscribe(*self, waker); } #[inline] unsafe fn unsubscribe(&self, waker: &task::Waker) { - Controlled::unsubscribe(*self, waker); + Publisher::unsubscribe(*self, waker); } } @@ -506,7 +507,7 @@ impl<T> Control<T> { } // implement the trait marking control variables as controlled expressions -impl<T: Copy> Controlled for Control<T> { +impl<T: Copy> Publisher for Control<T> { #[inline] unsafe fn subscribe(&self, waker: &task::Waker) { self.waiting.borrow_mut().push(waker.clone()); @@ -514,7 +515,7 @@ impl<T: Copy> Controlled for Control<T> { unsafe fn unsubscribe(&self, waker: &task::Waker) { let mut waiting = self.waiting.borrow_mut(); - let pos = waiting.iter().position(|w| deep_will_wake(w, waker)); + let pos = waiting.iter().position(|w| w.will_wake(waker)); if let Some(pos) = pos { waiting.remove(pos); @@ -533,14 +534,12 @@ impl<T: Default> Default for Control<T> { } } -/// An internal macro to generate implementations of the [`Controlled`] trait -/// for tuples of [`Controlled`] expressions. -/// -/// [`Controlled`]: trait.Controlled.html +/// An internal macro to generate implementations of the [`Publisher`] trait +/// for tuples of expressions. macro_rules! controlled_tuple_impl { // base rule generating an implementation for a concrete tuple ($($T:ident -> $ID:tt),* .) => { - impl<$($T:Controlled),*> Controlled for ($($T,)*) { + impl<$($T:Publisher),*> Publisher for ($($T,)*) { unsafe fn subscribe(&self, _waker: &task::Waker) { $(self.$ID.subscribe(_waker);)* } @@ -567,29 +566,30 @@ controlled_tuple_impl! { } /// Returns a future that suspends until an arbitrary boolean condition -/// involving [`Controlled`] expressions evaluates to `true`. -/// -/// [`Controlled`]: trait.Controlled.html +/// involving control expressions evaluates to `true`. #[inline] -pub async fn until<C: Controlled>(cntl: C, cond: impl Fn(&C) -> bool) { - if !cond(&cntl) { +pub async fn until<C: Publisher>(set: C, pred: impl Fn(&C) -> bool) { + if !pred(&set) { let waker = waker().await; - let span = WakerSpan::new(&cntl, &waker); + let _span = WakerSpan::new(&set, &waker); loop { sleep().await; - if cond(&cntl) { + if pred(&set) { break; } } - - drop(span); } } /* ************************* statistical facilities ************************* */ /// A simple collector for statistical data, inspired by SLX's random_variable. +/// +/// # Warning +/// This implementation is not numerically stable, since it keeps sums of the +/// passed values and of the squared values. It would be better to use a +/// numerically stable online algorithm instead. #[derive(Clone)] pub struct RandomVar { total: Cell<u32>, @@ -658,19 +658,19 @@ impl Default for RandomVar { } } -impl Display for RandomVar { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for RandomVar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let total = self.total.get(); let mean = self.sum.get() / f64::from(total); let variance = self.sqr.get() / f64::from(total) - mean * mean; let std_dev = variance.sqrt(); f.debug_struct("RandomVar") - .field("total", &total) - .field("mean", &mean) - .field("std_dev", &std_dev) + .field("n", &total) .field("min", &self.min.get()) .field("max", &self.max.get()) + .field("mean", &mean) + .field("sdev", &std_dev) .finish() } } -- GitLab