From c9a9dfb0bc2835c73c98e75e1ab08340fd4de419 Mon Sep 17 00:00:00 2001 From: Lukas Markeffsky <lukas.markeffsky@informatik.hu-berlin.de> Date: Tue, 29 Oct 2024 13:33:17 +0100 Subject: [PATCH] possibly sound --- Cargo.lock | 32 +- Cargo.toml | 2 + build.rs | 214 ++++--- examples/barbershop.rs | 357 ++++++----- examples/ferry.rs | 566 +++++++++-------- examples/philosophers.rs | 458 +++++++------- examples/slx/mod.rs | 180 +++--- src/bin/coroutines.rs | 371 +++++------ src/lib.rs | 1255 ++++++++++++++++++++++---------------- src/main.rs | 29 +- src/util.rs | 887 ++++++++++++++------------- 11 files changed, 2301 insertions(+), 2050 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 100f032..f6d8aa0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "aho-corasick" @@ -189,6 +189,12 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "getrandom" version = "0.2.15" @@ -210,12 +216,28 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" + [[package]] name = "hermit-abi" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" +[[package]] +name = "indexmap" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is-terminal" version = "0.4.13" @@ -459,6 +481,12 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "ryu" version = "1.0.18" @@ -518,10 +546,12 @@ version = "0.1.0" dependencies = [ "cc", "criterion", + "indexmap", "itertools 0.13.0", "rand", "rand_distr", "rayon", + "rustc-hash", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index db7cbc5..919ef13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,8 @@ lto = true rand = { version = "0.8", default-features = false, features = ["small_rng"] } rand_distr = "0.4" rayon = "1.5" +indexmap = "2" +rustc-hash = "2" [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports"]} diff --git a/build.rs b/build.rs index 2131f3e..3c92fb4 100644 --- a/build.rs +++ b/build.rs @@ -1,111 +1,109 @@ - // compiles the ODEMx-library and the C++ examples to link against fn main() { - // attempt to translate the library using the native platform's compiler - let result = cc::Build::new() - .cpp(true) - .define("RUST_FFI", None) - .includes(&[ - "odemx-lite/include", - "odemx-lite/external/CppLog/include" - ]) - .flag_if_supported("-std=c++14") - .flag_if_supported("-Wno-potentially-evaluated-expression") - .flag_if_supported("-Wno-deprecated-declarations") - .warnings(false) - .files(&[ - "odemx-lite/src/base/Comparators.cpp", - "odemx-lite/src/base/Continuous.cpp", - "odemx-lite/src/base/DefaultSimulation.cpp", - "odemx-lite/src/base/Event.cpp", - "odemx-lite/src/base/ExecutionList.cpp", - "odemx-lite/src/base/Process.cpp", - "odemx-lite/src/base/Sched.cpp", - "odemx-lite/src/base/Scheduler.cpp", - "odemx-lite/src/base/Simulation.cpp", - "odemx-lite/src/base/continuous/Continuous.cpp", - "odemx-lite/src/base/continuous/DfDt.cpp", - "odemx-lite/src/base/continuous/JacobiMatrix.cpp", - "odemx-lite/src/base/continuous/Monitor.cpp", - "odemx-lite/src/base/continuous/ODEObject.cpp", - "odemx-lite/src/base/continuous/ODESolver.cpp", - "odemx-lite/src/base/continuous/Rate.cpp", - "odemx-lite/src/base/continuous/State.cpp", - "odemx-lite/src/base/continuous/StateEvent.cpp", - "odemx-lite/src/base/continuous/VariableContainer.cpp", - "odemx-lite/src/base/control/ControlBase.cpp", - "odemx-lite/src/coroutine/Coroutine.cpp", - "odemx-lite/src/coroutine/CoroutineContext.cpp", - "odemx-lite/src/coroutine/ucFiber.cpp", - "odemx-lite/src/data/buffer/SimRecordBuffer.cpp", - "odemx-lite/src/data/buffer/StatisticsBuffer.cpp", - "odemx-lite/src/data/LoggingManager.cpp", - "odemx-lite/src/data/ManagedChannels.cpp", - "odemx-lite/src/data/output/ErrorWriter.cpp", - "odemx-lite/src/data/output/GermanTime.cpp", - "odemx-lite/src/data/output/Iso8601Time.cpp", - "odemx-lite/src/data/output/OStreamReport.cpp", - "odemx-lite/src/data/output/OStreamWriter.cpp", - "odemx-lite/src/data/output/TimeFormat.cpp", - "odemx-lite/src/data/Producer.cpp", - "odemx-lite/src/data/Report.cpp", - "odemx-lite/src/data/ReportProducer.cpp", - "odemx-lite/src/data/ReportTable.cpp", - "odemx-lite/src/data/SimRecord.cpp", - "odemx-lite/src/data/SimRecordFilter.cpp", - "odemx-lite/src/protocol/Device.cpp", - "odemx-lite/src/protocol/Entity.cpp", - "odemx-lite/src/protocol/ErrorModelDraw.cpp", - "odemx-lite/src/protocol/Layer.cpp", - "odemx-lite/src/protocol/Medium.cpp", - "odemx-lite/src/protocol/Sap.cpp", - "odemx-lite/src/protocol/Service.cpp", - "odemx-lite/src/protocol/ServiceProvider.cpp", - "odemx-lite/src/protocol/Stack.cpp", - "odemx-lite/src/random/ContinuousConst.cpp", - "odemx-lite/src/random/ContinuousDist.cpp", - "odemx-lite/src/random/DiscreteConst.cpp", - "odemx-lite/src/random/DiscreteDist.cpp", - "odemx-lite/src/random/Dist.cpp", - "odemx-lite/src/random/DistContext.cpp", - "odemx-lite/src/random/Draw.cpp", - "odemx-lite/src/random/Erlang.cpp", - "odemx-lite/src/random/NegativeExponential.cpp", - "odemx-lite/src/random/Normal.cpp", - "odemx-lite/src/random/Poisson.cpp", - "odemx-lite/src/random/RandomInt.cpp", - "odemx-lite/src/random/Uniform.cpp", - "odemx-lite/src/statistics/Accumulate.cpp", - "odemx-lite/src/statistics/Count.cpp", - "odemx-lite/src/statistics/Histogram.cpp", - "odemx-lite/src/statistics/Regression.cpp", - "odemx-lite/src/statistics/Sum.cpp", - "odemx-lite/src/statistics/Tab.cpp", - "odemx-lite/src/statistics/Tally.cpp", - "odemx-lite/src/synchronization/Bin.cpp", - "odemx-lite/src/synchronization/CondQ.cpp", - "odemx-lite/src/synchronization/IMemory.cpp", - "odemx-lite/src/synchronization/Memory.cpp", - "odemx-lite/src/synchronization/ProcessQueue.cpp", - "odemx-lite/src/synchronization/Queue.cpp", - "odemx-lite/src/synchronization/Res.cpp", - "odemx-lite/src/synchronization/Timer.cpp", - "odemx-lite/src/synchronization/Wait.cpp", - "odemx-lite/src/synchronization/WaitQ.cpp", - // example scenarios - "cpp/Barbershop/main.cpp", - "cpp/Ferry/main.cpp", - "cpp/Philosophers/main.cpp", - ]) - .try_compile("odemx"); - - // there is no need to stop the build altogether if the compilation failed - if let Err(msg) = result { - println!("cargo:warning=Compiling ODEMx-lite has failed."); - println!("cargo:warning=This library will still work, \ - the C++ benchmarks will not!"); - println!("cargo:warning={}", msg); - } else { - println!("cargo:rustc-cfg=feature=\"odemx\""); - } + // attempt to translate the library using the native platform's compiler + let result = cc::Build::new() + .cpp(true) + .define("RUST_FFI", None) + .includes(&["odemx-lite/include", "odemx-lite/external/CppLog/include"]) + .flag_if_supported("-std=c++14") + .flag_if_supported("-Wno-potentially-evaluated-expression") + .flag_if_supported("-Wno-deprecated-declarations") + .warnings(false) + .files(&[ + "odemx-lite/src/base/Comparators.cpp", + "odemx-lite/src/base/Continuous.cpp", + "odemx-lite/src/base/DefaultSimulation.cpp", + "odemx-lite/src/base/Event.cpp", + "odemx-lite/src/base/ExecutionList.cpp", + "odemx-lite/src/base/Process.cpp", + "odemx-lite/src/base/Sched.cpp", + "odemx-lite/src/base/Scheduler.cpp", + "odemx-lite/src/base/Simulation.cpp", + "odemx-lite/src/base/continuous/Continuous.cpp", + "odemx-lite/src/base/continuous/DfDt.cpp", + "odemx-lite/src/base/continuous/JacobiMatrix.cpp", + "odemx-lite/src/base/continuous/Monitor.cpp", + "odemx-lite/src/base/continuous/ODEObject.cpp", + "odemx-lite/src/base/continuous/ODESolver.cpp", + "odemx-lite/src/base/continuous/Rate.cpp", + "odemx-lite/src/base/continuous/State.cpp", + "odemx-lite/src/base/continuous/StateEvent.cpp", + "odemx-lite/src/base/continuous/VariableContainer.cpp", + "odemx-lite/src/base/control/ControlBase.cpp", + "odemx-lite/src/coroutine/Coroutine.cpp", + "odemx-lite/src/coroutine/CoroutineContext.cpp", + "odemx-lite/src/coroutine/ucFiber.cpp", + "odemx-lite/src/data/buffer/SimRecordBuffer.cpp", + "odemx-lite/src/data/buffer/StatisticsBuffer.cpp", + "odemx-lite/src/data/LoggingManager.cpp", + "odemx-lite/src/data/ManagedChannels.cpp", + "odemx-lite/src/data/output/ErrorWriter.cpp", + "odemx-lite/src/data/output/GermanTime.cpp", + "odemx-lite/src/data/output/Iso8601Time.cpp", + "odemx-lite/src/data/output/OStreamReport.cpp", + "odemx-lite/src/data/output/OStreamWriter.cpp", + "odemx-lite/src/data/output/TimeFormat.cpp", + "odemx-lite/src/data/Producer.cpp", + "odemx-lite/src/data/Report.cpp", + "odemx-lite/src/data/ReportProducer.cpp", + "odemx-lite/src/data/ReportTable.cpp", + "odemx-lite/src/data/SimRecord.cpp", + "odemx-lite/src/data/SimRecordFilter.cpp", + "odemx-lite/src/protocol/Device.cpp", + "odemx-lite/src/protocol/Entity.cpp", + "odemx-lite/src/protocol/ErrorModelDraw.cpp", + "odemx-lite/src/protocol/Layer.cpp", + "odemx-lite/src/protocol/Medium.cpp", + "odemx-lite/src/protocol/Sap.cpp", + "odemx-lite/src/protocol/Service.cpp", + "odemx-lite/src/protocol/ServiceProvider.cpp", + "odemx-lite/src/protocol/Stack.cpp", + "odemx-lite/src/random/ContinuousConst.cpp", + "odemx-lite/src/random/ContinuousDist.cpp", + "odemx-lite/src/random/DiscreteConst.cpp", + "odemx-lite/src/random/DiscreteDist.cpp", + "odemx-lite/src/random/Dist.cpp", + "odemx-lite/src/random/DistContext.cpp", + "odemx-lite/src/random/Draw.cpp", + "odemx-lite/src/random/Erlang.cpp", + "odemx-lite/src/random/NegativeExponential.cpp", + "odemx-lite/src/random/Normal.cpp", + "odemx-lite/src/random/Poisson.cpp", + "odemx-lite/src/random/RandomInt.cpp", + "odemx-lite/src/random/Uniform.cpp", + "odemx-lite/src/statistics/Accumulate.cpp", + "odemx-lite/src/statistics/Count.cpp", + "odemx-lite/src/statistics/Histogram.cpp", + "odemx-lite/src/statistics/Regression.cpp", + "odemx-lite/src/statistics/Sum.cpp", + "odemx-lite/src/statistics/Tab.cpp", + "odemx-lite/src/statistics/Tally.cpp", + "odemx-lite/src/synchronization/Bin.cpp", + "odemx-lite/src/synchronization/CondQ.cpp", + "odemx-lite/src/synchronization/IMemory.cpp", + "odemx-lite/src/synchronization/Memory.cpp", + "odemx-lite/src/synchronization/ProcessQueue.cpp", + "odemx-lite/src/synchronization/Queue.cpp", + "odemx-lite/src/synchronization/Res.cpp", + "odemx-lite/src/synchronization/Timer.cpp", + "odemx-lite/src/synchronization/Wait.cpp", + "odemx-lite/src/synchronization/WaitQ.cpp", + // example scenarios + "cpp/Barbershop/main.cpp", + "cpp/Ferry/main.cpp", + "cpp/Philosophers/main.cpp", + ]) + .try_compile("odemx"); + + // there is no need to stop the build altogether if the compilation failed + if let Err(msg) = result { + println!("cargo:warning=Compiling ODEMx-lite has failed."); + println!( + "cargo:warning=This library will still work, \ + the C++ benchmarks will not!" + ); + println!("cargo:warning={}", msg); + } else { + println!("cargo:rustc-cfg=feature=\"odemx\""); + } } diff --git a/examples/barbershop.rs b/examples/barbershop.rs index cb744fe..948e9dd 100644 --- a/examples/barbershop.rs +++ b/examples/barbershop.rs @@ -1,63 +1,65 @@ //! A discrete event simulation of a barbershop using `simcore_rs`, the //! simulator core developed as part of my (Dorian Weber) dissertation. -//! +//! //! This module models a simple barbershop scenario where customers arrive at //! random intervals, wait if the barber is busy, receive service, and then //! depart. The simulation tracks customer wait times and provides statistical //! analysis over the simulated period. -//! +//! //! # Notes //! - The simulation assumes a continuous operation of the barbershop over the //! specified duration. //! - Customers arriving after the shop closes are not admitted; however, the //! barber will finish servicing any remaining customers. -use simcore_rs::{Time, Sim, Process, util::{Facility, RandomVar}}; -use rand::{Rng, rngs::SmallRng}; +use rand::{rngs::SmallRng, Rng}; +use simcore_rs::{ + util::{Facility, RandomVar}, + Process, Sim, Time, +}; use std::cell::RefCell; // Helper constants for random number generator seeds. -const SEED_A: u64 = 100_000; -const SEED_S: u64 = 200_000; +const SEED_A: u64 = 100_000; +const SEED_S: u64 = 200_000; /// Globally shared data for the barbershop simulation. struct Barbershop { - /// Random number generator for customer arrival times. - rng_a: RefCell<SmallRng>, - /// Random number generator for service times. - rng_s: RefCell<SmallRng>, - /// Facility representing the barber (Joe). - joe: Facility, - /// Statistical accumulator for customer wait times. - wait_time: RandomVar + /// Random number generator for customer arrival times. + rng_a: RefCell<SmallRng>, + /// Random number generator for service times. + rng_s: RefCell<SmallRng>, + /// Facility representing the barber (Joe). + joe: Facility, + /// Statistical accumulator for customer wait times. + wait_time: RandomVar, } /// Represents a customer in the barbershop simulation. struct Customer; impl Customer { - /// Defines the actions performed by a customer in the simulation. - /// - /// The customer arrives at the barbershop, waits for the barber to be - /// available, gets a haircut (spends time being serviced), and then leaves. - pub async fn actions(self, sim: Sim<'_, Barbershop>) { - // Record the arrival time for wait time calculation. - let arrival_time = sim.now(); - - // Seize the barber (wait if not available). - sim.global().joe.seize().await; - - // Calculate and record the customer's wait time. - sim.global().wait_time.tabulate(sim.now() - arrival_time); - - // Simulate the time taken for the haircut. - sim.advance( - sim.global().rng_s.borrow_mut().gen_range(12.0..18.0) - ).await; - - // Release the barber for the next customer. - sim.global().joe.release(); - } + /// Defines the actions performed by a customer in the simulation. + /// + /// The customer arrives at the barbershop, waits for the barber to be + /// available, gets a haircut (spends time being serviced), and then leaves. + pub async fn actions(self, sim: Sim<'_, Barbershop>) { + // Record the arrival time for wait time calculation. + let arrival_time = sim.now(); + + // Seize the barber (wait if not available). + sim.global().joe.seize().await; + + // Calculate and record the customer's wait time. + sim.global().wait_time.tabulate(sim.now() - arrival_time); + + // Simulate the time taken for the haircut. + sim.advance(sim.global().rng_s.borrow_mut().gen_range(12.0..18.0)) + .await; + + // Release the barber for the next customer. + sim.global().joe.release(); + } } /// The main simulation function. @@ -66,27 +68,28 @@ impl Customer { /// for the specified duration, and ensures that all customers are serviced /// before the simulation ends. async fn sim_main(sim: Sim<'_, Barbershop>, duration: Time) { - // Activate a process to generate customers at random intervals. - sim.activate(async move { - loop { - // Wait for a random time before the next customer arrives. - sim.advance( - sim.global().rng_a.borrow_mut().gen_range(12.0..24.0) - ).await; - - // If the simulation time exceeds the duration, stop generating customers. - if sim.now() >= duration { return; } - - // Activate a new customer process. - sim.activate(Customer.actions(sim)); - } - }); - - // Run the simulation until the store closes. - sim.advance(duration).await; - - // Ensure the barber finishes servicing any remaining customers. - sim.global().joe.seize().await; + // Activate a process to generate customers at random intervals. + sim.activate(async move { + loop { + // Wait for a random time before the next customer arrives. + sim.advance(sim.global().rng_a.borrow_mut().gen_range(12.0..24.0)) + .await; + + // If the simulation time exceeds the duration, stop generating customers. + if sim.now() >= duration { + return; + } + + // Activate a new customer process. + sim.activate(Customer.actions(sim)); + } + }); + + // Run the simulation until the store closes. + sim.advance(duration).await; + + // Ensure the barber finishes servicing any remaining customers. + sim.global().joe.seize().await; } /// Entry point for the barbershop simulation. @@ -95,24 +98,23 @@ async fn sim_main(sim: Sim<'_, Barbershop>, duration: Time) { /// specified duration. #[cfg(not(test))] fn main() { - use rand::SeedableRng; - - // Run the simulation and collect the result. - let result = simcore_rs::simulation( - // Initialize the global data for the simulation. - Barbershop { - rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), - rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), - joe: Facility::new(), - wait_time: RandomVar::new() - }, - // Simulation entry point. - |sim| - Process::new(sim, sim_main(sim, 60.0 * 24.0 * 7.0 * 3.0)) // Simulate for 3 weeks. - ); - - // Print statistics after the simulation ends. - println!("wait_time: {:#.3}", result.wait_time); + use rand::SeedableRng; + + // Run the simulation and collect the result. + let result = simcore_rs::simulation( + // Initialize the global data for the simulation. + Barbershop { + rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), + rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), + joe: Facility::new(), + wait_time: RandomVar::new(), + }, + // Simulation entry point. + |sim| Process::new(sim, sim_main(sim, 60.0 * 24.0 * 7.0 * 3.0)), // Simulate for 3 weeks. + ); + + // Print statistics after the simulation ends. + println!("wait_time: {:#.3}", result.wait_time); } #[cfg(test)] @@ -127,111 +129,102 @@ mod slx; /// implementations and simulation durations. #[cfg(test)] mod bench { - use super::*; - use criterion::{Criterion, BenchmarkId, BatchSize, PlotConfiguration, - AxisScale, criterion_group}; - - #[cfg(feature = "odemx")] - mod odemx { - use std::os::raw::c_double; - - #[link(name = "odemx", kind = "static")] - extern { - pub fn barbershop(duration: c_double); - } - } - - #[cfg(windows)] - const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; - const RANGE: u32 = 10; - const STEP: Time = 1000.0; - - /// Benchmarks the barbershop simulation using different implementations and - /// simulation durations. - fn barbershop_bench(c: &mut Criterion) { - use rand::{rngs::SmallRng, SeedableRng}; - - let mut group = c.benchmark_group("Barbershop"); - - // Set up the benchmark parameters. - group.confidence_level(0.99); - group.plot_config( - PlotConfiguration::default() - .summary_scale(AxisScale::Logarithmic) - ); - - #[cfg(windows)] - let slx_path = { - let path = slx::slx_version(SLX_PATH); - - if path.is_none() { - println!("SLX not found, skipping SLX benchmarks!"); - } else { - println!("Using SLX program at {:?}", path.as_ref().unwrap()); - } - - path - }; - - // Vary the length of the simulation run. - for sim_duration in (0..RANGE).map(|c| Time::from(1 << c) * STEP) { - #[cfg(windows)] - if let Some(path) = slx_path.as_ref() { - let duration = sim_duration.to_string(); - let args = [ - "/silent", - "/stdout", - "/noicon", - "/nowarn", - "/noxwarn", - "/#BENCH", - "slx\\barbershop.slx", - duration.as_str() - ]; - - // Benchmark the SLX implementation. - group.bench_function( - BenchmarkId::new("SLX", sim_duration), - |b| b.iter_custom(|iters| - slx::slx_bench( - path.as_os_str(), - &args, - iters as usize - ).expect("couldn't benchmark the SLX program") - ) - ); - } - - // Benchmark the C++ implementation. - #[cfg(feature = "odemx")] - group.bench_function( - BenchmarkId::new("ODEMx", sim_duration), - |b| b.iter( - || unsafe { odemx::barbershop(sim_duration as _) } - ) - ); - - // Benchmark the Rust implementation. - group.bench_function( - BenchmarkId::new("Rust", sim_duration), - |b| b.iter_batched( - || Barbershop { - rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), - rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), - joe: Facility::new(), - wait_time: RandomVar::new() - }, - |shared| simcore_rs::simulation( - shared, - |sim| Process::new(sim, sim_main(sim, sim_duration)) - ), - BatchSize::SmallInput - ) - ); - } - - group.finish(); - } - - criterion_group!(benches, barbershop_bench); + use super::*; + use criterion::{ + criterion_group, AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, + }; + + #[cfg(feature = "odemx")] + mod odemx { + use std::os::raw::c_double; + + #[link(name = "odemx", kind = "static")] + extern "C" { + pub fn barbershop(duration: c_double); + } + } + + #[cfg(windows)] + const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; + const RANGE: u32 = 10; + const STEP: Time = 1000.0; + + /// Benchmarks the barbershop simulation using different implementations and + /// simulation durations. + fn barbershop_bench(c: &mut Criterion) { + use rand::{rngs::SmallRng, SeedableRng}; + + let mut group = c.benchmark_group("Barbershop"); + + // Set up the benchmark parameters. + group.confidence_level(0.99); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + #[cfg(windows)] + let slx_path = { + let path = slx::slx_version(SLX_PATH); + + if path.is_none() { + println!("SLX not found, skipping SLX benchmarks!"); + } else { + println!("Using SLX program at {:?}", path.as_ref().unwrap()); + } + + path + }; + + // Vary the length of the simulation run. + for sim_duration in (0..RANGE).map(|c| Time::from(1 << c) * STEP) { + #[cfg(windows)] + if let Some(path) = slx_path.as_ref() { + let duration = sim_duration.to_string(); + let args = [ + "/silent", + "/stdout", + "/noicon", + "/nowarn", + "/noxwarn", + "/#BENCH", + "slx\\barbershop.slx", + duration.as_str(), + ]; + + // Benchmark the SLX implementation. + group.bench_function(BenchmarkId::new("SLX", sim_duration), |b| { + b.iter_custom(|iters| { + slx::slx_bench(path.as_os_str(), &args, iters as usize) + .expect("couldn't benchmark the SLX program") + }) + }); + } + + // Benchmark the C++ implementation. + #[cfg(feature = "odemx")] + group.bench_function(BenchmarkId::new("ODEMx", sim_duration), |b| { + b.iter(|| unsafe { odemx::barbershop(sim_duration as _) }) + }); + + // Benchmark the Rust implementation. + group.bench_function(BenchmarkId::new("Rust", sim_duration), |b| { + b.iter_batched( + || Barbershop { + rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), + rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), + joe: Facility::new(), + wait_time: RandomVar::new(), + }, + |shared| { + simcore_rs::simulation(shared, |sim| { + Process::new(sim, sim_main(sim, sim_duration)) + }) + }, + BatchSize::SmallInput, + ) + }); + } + + group.finish(); + } + + criterion_group!(benches, barbershop_bench); } diff --git a/examples/ferry.rs b/examples/ferry.rs index e1c6582..cd2babd 100644 --- a/examples/ferry.rs +++ b/examples/ferry.rs @@ -1,199 +1,202 @@ //! A discrete event simulation of a car-ferry system using `simcore_rs`, the //! simulator core developed as part of my (Dorian Weber) dissertation. -//! +//! //! This module models a ferry system where cars arrive at multiple harbors and //! wait to be transported by ferries to other harbors. The simulation tracks //! various statistics such as car wait times, ferry cargo lengths, and ferry //! load times over the simulated period. -//! +//! //! # Notes -//! +//! //! - The simulation assumes continuous operation over the specified duration. //! - Cars that are not picked up by the end of the simulation are accounted for //! in the wait time statistics. //! - The ferry routes are set up in such a way that each ferry starts at a //! different harbor and cycles through all harbors. -use simcore_rs::{Time, Sim, Process, util::{Sender, Receiver, RandomVar, select, channel}}; -use rand::{rngs::SmallRng, SeedableRng, Rng}; -use rand_distr::{Exp, Normal, Distribution}; +use rand::{rngs::SmallRng, Rng, SeedableRng}; +use rand_distr::{Distribution, Exp, Normal}; +use simcore_rs::{ + util::{channel, select, RandomVar, Receiver, Sender}, + Process, Sim, Time, +}; use std::cell::RefCell; // Constants for simulation parameters. const SEED: u64 = 100_000; const FERRY_COUNT: usize = 2; const HARBOR_COUNT: usize = 4; -const HARBOR_DISTANCE: Time = 10.0; -const FERRY_TIMEOUT : Time = 5.0; -const FERRY_CAPACITY : usize = 5; +const HARBOR_DISTANCE: Time = 10.0; +const FERRY_TIMEOUT: Time = 5.0; +const FERRY_CAPACITY: usize = 5; /// Shared global data for the ferry simulation. struct Ferries { - /// Master random number generator for seeding. - master_rng: RefCell<SmallRng>, - /// Statistical accumulator for ferry cargo lengths. - ferry_cargo_len: RandomVar, - /// Statistical accumulator for ferry load times. - ferry_load_time: RandomVar, - /// Statistical accumulator for car wait times. - car_wait_time: RandomVar, + /// Master random number generator for seeding. + master_rng: RefCell<SmallRng>, + /// Statistical accumulator for ferry cargo lengths. + ferry_cargo_len: RandomVar, + /// Statistical accumulator for ferry load times. + ferry_load_time: RandomVar, + /// Statistical accumulator for car wait times. + car_wait_time: RandomVar, } /// Represents a car in the simulation. #[derive(Debug)] struct Car { - /// Time when the car arrives at the pier. - arrival_time: Time, - /// Duration required to load the car onto the ferry. - load_duration: Time, + /// Time when the car arrives at the pier. + arrival_time: Time, + /// Duration required to load the car onto the ferry. + load_duration: Time, } /// Represents a pier (harbor) where cars arrive and wait for ferries. struct Pier { - /// Random number generator specific to this pier. - rng: SmallRng, - /// Channel to send cars to the ferry. - landing_site: Sender<Car>, + /// Random number generator specific to this pier. + rng: SmallRng, + /// Channel to send cars to the ferry. + landing_site: Sender<Car>, } /// Represents a ferry that transports cars between harbors. struct Ferry { - /// Cargo hold containing the cars on the ferry. - cargo: Vec<Car>, - /// Timeout duration for ferry departure. - timeout: Time, - /// Time taken to travel between harbors. - travel_time: Time, - /// Receivers from piers to accept arriving cars. - piers: Vec<Receiver<Car>>, + /// Cargo hold containing the cars on the ferry. + cargo: Vec<Car>, + /// Timeout duration for ferry departure. + timeout: Time, + /// Time taken to travel between harbors. + travel_time: Time, + /// Receivers from piers to accept arriving cars. + piers: Vec<Receiver<Car>>, } impl Pier { - /// Simulates the actions of a pier in the simulation. - /// - /// Cars arrive at the pier following an exponential distribution - /// and are sent to the ferry when it arrives. - async fn actions(mut self, sim: Sim<'_, Ferries>) { - // Arrival and loading time distributions. - let arrival_delay = Exp::new(0.1).unwrap(); - let loading_delay = Normal::new(0.5, 0.2).unwrap(); - - loop { - // Wait for the next car to arrive. - sim.advance(arrival_delay.sample(&mut self.rng)).await; - - // Create a new car with arrival and load times. - self.landing_site.send(Car { - arrival_time: sim.now(), - load_duration: loading_delay - .sample_iter(&mut self.rng) - .find(|&val| val >= 0.0) - .unwrap(), - }).await.expect("No ferries in the simulation"); - } - } + /// Simulates the actions of a pier in the simulation. + /// + /// Cars arrive at the pier following an exponential distribution + /// and are sent to the ferry when it arrives. + async fn actions(mut self, sim: Sim<'_, Ferries>) { + // Arrival and loading time distributions. + let arrival_delay = Exp::new(0.1).unwrap(); + let loading_delay = Normal::new(0.5, 0.2).unwrap(); + + loop { + // Wait for the next car to arrive. + sim.advance(arrival_delay.sample(&mut self.rng)).await; + + // Create a new car with arrival and load times. + self.landing_site + .send(Car { + arrival_time: sim.now(), + load_duration: loading_delay + .sample_iter(&mut self.rng) + .find(|&val| val >= 0.0) + .unwrap(), + }) + .await + .expect("No ferries in the simulation"); + } + } } impl Ferry { - /// Simulates the actions of a ferry in the simulation. - /// - /// The ferry travels between harbors, loads cars, waits for a timeout or - /// until it reaches capacity, and then moves to the next harbor. - async fn actions(mut self, sim: Sim<'_, Ferries>) { - loop { - for pier in self.piers.iter() { - // Unload the cars at the current pier. - for car in self.cargo.drain(..) { - sim.advance(car.load_duration).await; - } - - let begin_loading = sim.now(); - - // Load cars until capacity is reached or timeout occurs. - while self.cargo.len() < self.cargo.capacity() { - match select(sim, pier.recv(), async { - sim.advance(self.timeout).await; - None - }).await { - // A car arrived before timeout. - Some(car) => { - sim.global() - .car_wait_time - .tabulate(sim.now() - car.arrival_time); - sim.advance(car.load_duration).await; - self.cargo.push(car); - } - // Timeout occurred; depart to next harbor. - None => break, - } - } - - // Record ferry loading statistics. - sim.global() - .ferry_load_time - .tabulate(sim.now() - begin_loading); - sim.global() - .ferry_cargo_len - .tabulate(self.cargo.len() as f64); - - // Travel to the next harbor. - sim.advance(self.travel_time).await; - } - } - } + /// Simulates the actions of a ferry in the simulation. + /// + /// The ferry travels between harbors, loads cars, waits for a timeout or + /// until it reaches capacity, and then moves to the next harbor. + async fn actions(mut self, sim: Sim<'_, Ferries>) { + loop { + for pier in self.piers.iter() { + // Unload the cars at the current pier. + for car in self.cargo.drain(..) { + sim.advance(car.load_duration).await; + } + + let begin_loading = sim.now(); + + // Load cars until capacity is reached or timeout occurs. + while self.cargo.len() < self.cargo.capacity() { + match select(sim, pier.recv(), async { + sim.advance(self.timeout).await; + None + }) + .await + { + // A car arrived before timeout. + Some(car) => { + sim.global() + .car_wait_time + .tabulate(sim.now() - car.arrival_time); + sim.advance(car.load_duration).await; + self.cargo.push(car); + } + // Timeout occurred; depart to next harbor. + None => break, + } + } + + // Record ferry loading statistics. + sim.global() + .ferry_load_time + .tabulate(sim.now() - begin_loading); + sim.global() + .ferry_cargo_len + .tabulate(self.cargo.len() as f64); + + // Travel to the next harbor. + sim.advance(self.travel_time).await; + } + } + } } /// The main simulation function. /// /// Sets up the piers and ferries, activates their processes, and runs the simulation /// for the specified duration. -async fn sim_main( - sim: Sim<'_, Ferries>, - duration: Time, - ferries: usize, - harbors: usize, -) { - let mut ports = Vec::with_capacity(harbors); - - // Create all the harbors (piers). - for _ in 0..harbors { - let (sx, rx) = channel(); - let harbor = Pier { - rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), - landing_site: sx, - }; - sim.activate(harbor.actions(sim)); - ports.push(rx); - } - - // Create all the ferries. - for i in 0..ferries { - let ferry = Ferry { - cargo: Vec::with_capacity(FERRY_CAPACITY), - timeout: FERRY_TIMEOUT, - travel_time: HARBOR_DISTANCE, - piers: ports - .iter() - .skip(i) - .chain(ports.iter().take(i)) - .cloned() - .collect(), - }; - sim.activate(ferry.actions(sim)); - } - - // Run the simulation for the specified duration. - sim.advance(duration).await; - - // Handle any cars that weren't picked up by a ferry. - for port in ports { - for _ in 0..port.len() { - let car = port.recv().await.unwrap(); - sim.global() - .car_wait_time - .tabulate(sim.now() - car.arrival_time); - } - } +async fn sim_main(sim: Sim<'_, Ferries>, duration: Time, ferries: usize, harbors: usize) { + let mut ports = Vec::with_capacity(harbors); + + // Create all the harbors (piers). + for _ in 0..harbors { + let (sx, rx) = channel(); + let harbor = Pier { + rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), + landing_site: sx, + }; + sim.activate(harbor.actions(sim)); + ports.push(rx); + } + + // Create all the ferries. + for i in 0..ferries { + let ferry = Ferry { + cargo: Vec::with_capacity(FERRY_CAPACITY), + timeout: FERRY_TIMEOUT, + travel_time: HARBOR_DISTANCE, + piers: ports + .iter() + .skip(i) + .chain(ports.iter().take(i)) + .cloned() + .collect(), + }; + sim.activate(ferry.actions(sim)); + } + + // Run the simulation for the specified duration. + sim.advance(duration).await; + + // Handle any cars that weren't picked up by a ferry. + for port in ports { + for _ in 0..port.len() { + let car = port.recv().await.unwrap(); + sim.global() + .car_wait_time + .tabulate(sim.now() - car.arrival_time); + } + } } /// Entry point for the ferry simulation. @@ -201,27 +204,29 @@ async fn sim_main( /// Initializes the simulation environment and runs the simulation for one week. #[cfg(not(test))] fn main() { - let result = simcore_rs::simulation( - // Global shared data. - Ferries { - master_rng: RefCell::new(SmallRng::seed_from_u64(SEED)), - ferry_cargo_len: RandomVar::new(), - ferry_load_time: RandomVar::new(), - car_wait_time: RandomVar::new(), - }, - // Simulation entry point. - |sim| Process::new( - sim, - sim_main(sim, 24.0 * 60.0 * 7.0, FERRY_COUNT, HARBOR_COUNT), - ), - ); - - // Output simulation statistics. - println!("Number of harbors: {}", HARBOR_COUNT); - println!("Number of ferries: {}", FERRY_COUNT); - println!("Car wait time: {:#.3}", result.car_wait_time); - println!("Ferry cargo len: {:#.3}", result.ferry_cargo_len); - println!("Ferry load time: {:#.3}", result.ferry_load_time); + let result = simcore_rs::simulation( + // Global shared data. + Ferries { + master_rng: RefCell::new(SmallRng::seed_from_u64(SEED)), + ferry_cargo_len: RandomVar::new(), + ferry_load_time: RandomVar::new(), + car_wait_time: RandomVar::new(), + }, + // Simulation entry point. + |sim| { + Process::new( + sim, + sim_main(sim, 24.0 * 60.0 * 7.0, FERRY_COUNT, HARBOR_COUNT), + ) + }, + ); + + // Output simulation statistics. + println!("Number of harbors: {}", HARBOR_COUNT); + println!("Number of ferries: {}", FERRY_COUNT); + println!("Car wait time: {:#.3}", result.car_wait_time); + println!("Ferry cargo len: {:#.3}", result.ferry_cargo_len); + println!("Ferry load time: {:#.3}", result.ferry_load_time); } #[cfg(test)] @@ -236,124 +241,105 @@ mod slx; /// implementations and simulation durations. #[cfg(test)] mod bench { - use super::*; - use criterion::{Criterion, BenchmarkId, BatchSize, PlotConfiguration, - AxisScale, criterion_group}; - - #[cfg(feature = "odemx")] - mod odemx { - use std::os::raw::{c_double, c_uint}; - - #[link(name = "odemx", kind = "static")] - extern { - pub fn ferry(duration: c_double, ferries: c_uint, harbors: c_uint); - } - } - - #[cfg(windows)] - const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; - const RANGE: u32 = 10; - const STEP: Time = 1000.0; - - /// Benchmarks the ferry simulation using different implementations and - /// simulation durations. - fn ferry_bench(c: &mut Criterion) { - let mut group = c.benchmark_group("Ferry"); - - // Set up the benchmark parameters. - group.confidence_level(0.99); - group.plot_config( - PlotConfiguration::default() - .summary_scale(AxisScale::Logarithmic), - ); - - #[cfg(windows)] - let slx_path = { - let path = slx::slx_version(SLX_PATH); - - if path.is_none() { - println!("SLX not found, skipping SLX benchmarks!"); - } else { - println!("Using SLX program at {:?}", path.as_ref().unwrap()); - } - - path - }; - - // Vary the length of the simulation run. - for (i, sim_duration) in (0..RANGE) - .map(|c| Time::from(1 << c) * STEP) - .enumerate() - { - #[cfg(windows)] - if let Some(path) = slx_path.as_ref() { - let duration = sim_duration.to_string(); - let args = [ - "/silent", - "/stdout", - "/noicon", - "/nowarn", - "/noxwarn", - "/#BENCH", - "slx\\ferry.slx", - duration.as_str(), - ]; - - // Benchmark the SLX implementation. - group.bench_function( - BenchmarkId::new("SLX", sim_duration), - |b| b.iter_custom(|iters| - slx::slx_bench( - path.as_os_str(), - &args, - iters as usize, - ) - .expect("Couldn't benchmark the SLX program"), - ), - ); - } - - // Benchmark the C++ implementation. - #[cfg(feature = "odemx")] - group.bench_function( - BenchmarkId::new("ODEMx", sim_duration), - |b| { - b.iter(|| unsafe { - odemx::ferry( - sim_duration as _, - FERRY_COUNT as _, - HARBOR_COUNT as _, - ) - }) - }, - ); - - // Benchmark the Rust implementation. - group.bench_function( - BenchmarkId::new("Rust", sim_duration), - |b| b.iter_batched( - || Ferries { - master_rng: RefCell::new(SmallRng::seed_from_u64( - SEED * ((i + 1) as u64), - )), - ferry_cargo_len: RandomVar::new(), - ferry_load_time: RandomVar::new(), - car_wait_time: RandomVar::new(), - }, - |shared| simcore_rs::simulation( - shared, - |sim| Process::new( - sim, - sim_main(sim, sim_duration, FERRY_COUNT, HARBOR_COUNT), - ), - ), - BatchSize::SmallInput, - ), - ); - } - - group.finish(); - } - - criterion_group!(benches, ferry_bench); + use super::*; + use criterion::{ + criterion_group, AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, + }; + + #[cfg(feature = "odemx")] + mod odemx { + use std::os::raw::{c_double, c_uint}; + + #[link(name = "odemx", kind = "static")] + extern "C" { + pub fn ferry(duration: c_double, ferries: c_uint, harbors: c_uint); + } + } + + #[cfg(windows)] + const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; + const RANGE: u32 = 10; + const STEP: Time = 1000.0; + + /// Benchmarks the ferry simulation using different implementations and + /// simulation durations. + fn ferry_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("Ferry"); + + // Set up the benchmark parameters. + group.confidence_level(0.99); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + #[cfg(windows)] + let slx_path = { + let path = slx::slx_version(SLX_PATH); + + if path.is_none() { + println!("SLX not found, skipping SLX benchmarks!"); + } else { + println!("Using SLX program at {:?}", path.as_ref().unwrap()); + } + + path + }; + + // Vary the length of the simulation run. + for (i, sim_duration) in (0..RANGE).map(|c| Time::from(1 << c) * STEP).enumerate() { + #[cfg(windows)] + if let Some(path) = slx_path.as_ref() { + let duration = sim_duration.to_string(); + let args = [ + "/silent", + "/stdout", + "/noicon", + "/nowarn", + "/noxwarn", + "/#BENCH", + "slx\\ferry.slx", + duration.as_str(), + ]; + + // Benchmark the SLX implementation. + group.bench_function(BenchmarkId::new("SLX", sim_duration), |b| { + b.iter_custom(|iters| { + slx::slx_bench(path.as_os_str(), &args, iters as usize) + .expect("Couldn't benchmark the SLX program") + }) + }); + } + + // Benchmark the C++ implementation. + #[cfg(feature = "odemx")] + group.bench_function(BenchmarkId::new("ODEMx", sim_duration), |b| { + b.iter(|| unsafe { + odemx::ferry(sim_duration as _, FERRY_COUNT as _, HARBOR_COUNT as _) + }) + }); + + // Benchmark the Rust implementation. + group.bench_function(BenchmarkId::new("Rust", sim_duration), |b| { + b.iter_batched( + || Ferries { + master_rng: RefCell::new(SmallRng::seed_from_u64(SEED * ((i + 1) as u64))), + ferry_cargo_len: RandomVar::new(), + ferry_load_time: RandomVar::new(), + car_wait_time: RandomVar::new(), + }, + |shared| { + simcore_rs::simulation(shared, |sim| { + Process::new( + sim, + sim_main(sim, sim_duration, FERRY_COUNT, HARBOR_COUNT), + ) + }) + }, + BatchSize::SmallInput, + ) + }); + } + + group.finish(); + } + + criterion_group!(benches, ferry_bench); } diff --git a/examples/philosophers.rs b/examples/philosophers.rs index 86e3456..02d4e45 100644 --- a/examples/philosophers.rs +++ b/examples/philosophers.rs @@ -1,25 +1,31 @@ //! A discrete event simulation of the Dining Philosophers problem using //! `simcore_rs`, the simulator core developed as part of my (Dorian Weber) //! dissertation. -//! +//! //! This module models the classic Dining Philosophers problem, where multiple //! philosophers sit around a table and alternate between thinking and eating. //! They need two forks to eat and must coordinate to avoid deadlocks. The //! simulation tracks the time until a deadlock occurs and collects statistical //! data over multiple runs. -//! +//! //! # Notes -//! +//! //! - The simulation detects deadlocks when all philosophers are waiting for //! forks and none can proceed. //! - The statistical data collected includes the time until deadlock over //! multiple simulation runs. -use simcore_rs::{Sim, Time, Process, util::{RandomVar, Control, until}}; -use std::{rc::Rc, cell::{RefCell, Cell}}; -use rand::{rngs::SmallRng, SeedableRng, Rng}; -use rand_distr::{Exp, Normal, Uniform, Distribution}; +use rand::{rngs::SmallRng, Rng, SeedableRng}; +use rand_distr::{Distribution, Exp, Normal, Uniform}; use rayon::prelude::*; +use simcore_rs::{ + util::{until, Control, RandomVar}, + Process, Sim, Time, +}; +use std::{ + cell::{Cell, RefCell}, + rc::Rc, +}; // Constants for simulation parameters. const PHILOSOPHER_COUNT: usize = 5; @@ -27,122 +33,125 @@ const SEED: u64 = 100_000; /// Shared global data for the simulation. struct Philosophers { - /// Master random number generator for seeding. - master_rng: RefCell<SmallRng>, - /// Cell to store the simulation duration until deadlock. - sim_duration: Cell<Time>, + /// Master random number generator for seeding. + master_rng: RefCell<SmallRng>, + /// Cell to store the simulation duration until deadlock. + sim_duration: Cell<Time>, } /// Represents the shared table with forks for the philosophers. struct Table { - /// Controls for each fork, indicating whether it is held. - forks: Vec<Control<bool>>, - /// Counter for the number of forks currently held. - forks_held: Control<usize>, - /// Counter for the number of forks currently awaited. - forks_awaited: Control<usize>, + /// Controls for each fork, indicating whether it is held. + forks: Vec<Control<bool>>, + /// Counter for the number of forks currently held. + forks_held: Control<usize>, + /// Counter for the number of forks currently awaited. + forks_awaited: Control<usize>, } impl Table { - /// Creates a new table with a specified number of forks. - fn new(forks: usize) -> Self { - Table { - forks: (0..forks).map(|_| Control::new(false)).collect(), - forks_held: Control::default(), - forks_awaited: Control::default(), - } - } - - /// Acquires a fork at a given position, blocking until it is available. - async fn acquire_fork(&self, i: usize) { - let num = i % self.forks.len(); - self.forks_awaited.set(self.forks_awaited.get() + 1); - until(&self.forks[num], |fork| !fork.get()).await; - self.forks[num].set(true); - self.forks_awaited.set(self.forks_awaited.get() - 1); - self.forks_held.set(self.forks_held.get() + 1); - } - - /// Releases a fork at a given position back to the table. - fn release_fork(&self, i: usize) { - self.forks[i % self.forks.len()].set(false); - self.forks_held.set(self.forks_held.get() - 1); - } + /// Creates a new table with a specified number of forks. + fn new(forks: usize) -> Self { + Table { + forks: (0..forks).map(|_| Control::new(false)).collect(), + forks_held: Control::default(), + forks_awaited: Control::default(), + } + } + + /// Acquires a fork at a given position, blocking until it is available. + async fn acquire_fork(&self, i: usize) { + let num = i % self.forks.len(); + self.forks_awaited.set(self.forks_awaited.get() + 1); + until(&self.forks[num], |fork| !fork.get()).await; + self.forks[num].set(true); + self.forks_awaited.set(self.forks_awaited.get() - 1); + self.forks_held.set(self.forks_held.get() + 1); + } + + /// Releases a fork at a given position back to the table. + fn release_fork(&self, i: usize) { + self.forks[i % self.forks.len()].set(false); + self.forks_held.set(self.forks_held.get() - 1); + } } /// Represents a philosopher in the simulation. struct Philosopher { - /// Reference to the shared table. - table: Rc<Table>, - /// The seat index of the philosopher at the table. - seat: usize, - /// Random number generator for this philosopher. - rng: SmallRng, + /// Reference to the shared table. + table: Rc<Table>, + /// The seat index of the philosopher at the table. + seat: usize, + /// Random number generator for this philosopher. + rng: SmallRng, } impl Philosopher { - /// Simulates the actions of a philosopher. - /// - /// The philosopher alternates between thinking and eating, and tries to - /// acquire forks. - async fn actions(mut self, sim: Sim<'_, Philosophers>) { - let thinking_duration = Exp::new(1.0).unwrap(); - let artificial_delay = Uniform::new(0.1, 0.2); - let eating_duration = Normal::new(0.5, 0.2).unwrap(); - - loop { - // Spend some time pondering the nature of things. - sim.advance(thinking_duration.sample(&mut self.rng)).await; - - // Acquire the first fork. - self.table.acquire_fork(self.seat).await; - - // Introduce an artificial delay to leave room for deadlocks. - sim.advance(artificial_delay.sample(&mut self.rng)).await; - - // Acquire the second fork. - self.table.acquire_fork(self.seat + 1).await; - - // Spend some time eating. - sim.advance( - eating_duration - .sample_iter(&mut self.rng) - .find(|&val| val >= 0.0) - .unwrap(), - ).await; - - // Release the forks. - self.table.release_fork(self.seat + 1); - self.table.release_fork(self.seat); - } - } + /// Simulates the actions of a philosopher. + /// + /// The philosopher alternates between thinking and eating, and tries to + /// acquire forks. + async fn actions(mut self, sim: Sim<'_, Philosophers>) { + let thinking_duration = Exp::new(1.0).unwrap(); + let artificial_delay = Uniform::new(0.1, 0.2); + let eating_duration = Normal::new(0.5, 0.2).unwrap(); + + loop { + // Spend some time pondering the nature of things. + sim.advance(thinking_duration.sample(&mut self.rng)).await; + + // Acquire the first fork. + self.table.acquire_fork(self.seat).await; + + // Introduce an artificial delay to leave room for deadlocks. + sim.advance(artificial_delay.sample(&mut self.rng)).await; + + // Acquire the second fork. + self.table.acquire_fork(self.seat + 1).await; + + // Spend some time eating. + sim.advance( + eating_duration + .sample_iter(&mut self.rng) + .find(|&val| val >= 0.0) + .unwrap(), + ) + .await; + + // Release the forks. + self.table.release_fork(self.seat + 1); + self.table.release_fork(self.seat); + } + } } /// Runs a single simulation instance of the Dining Philosophers problem. /// /// Initializes the table and philosophers, and waits until a deadlock occurs. async fn sim_main(sim: Sim<'_, Philosophers>, count: usize) { - let table = Rc::new(Table::new(count)); - - // Create the philosopher processes and seat them. - for i in 0..count { - sim.activate( - Philosopher { - table: table.clone(), - seat: i, - rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), - }.actions(sim), - ); - } - - // Wait for the precise configuration indicating a deadlock. - until( - (&table.forks_held, &table.forks_awaited), - |(held, awaited)| held.get() == count && awaited.get() == count, - ).await; - - // Record the current simulation time. - sim.global().sim_duration.set(sim.now()); + let table = Rc::new(Table::new(count)); + + // Create the philosopher processes and seat them. + for i in 0..count { + sim.activate( + Philosopher { + table: table.clone(), + seat: i, + rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), + } + .actions(sim), + ); + } + + // Wait for the precise configuration indicating a deadlock. + until( + (&table.forks_held, &table.forks_awaited), + |(held, awaited)| held.get() == count && awaited.get() == count, + ) + .await; + + // Record the current simulation time. + sim.global().sim_duration.set(sim.now()); } /// Runs multiple simulation instances in parallel and collects statistics. @@ -157,36 +166,36 @@ async fn sim_main(sim: Sim<'_, Philosophers>, count: usize) { /// A `RandomVar` containing statistical data of simulation durations until /// deadlock. fn philosophers(count: usize, reruns: usize) -> RandomVar { - // Use thread-based parallelism to concurrently run simulation models. - (1..=reruns) - .into_par_iter() - .map(|i| { - simcore_rs::simulation( - // Global data. - Philosophers { - master_rng: RefCell::new(SmallRng::seed_from_u64(i as u64 * SEED)), - sim_duration: Cell::default(), - }, - // Simulation entry point. - |sim| Process::new(sim, sim_main(sim, count)), - ) - .sim_duration - .get() - }) - .fold( - || RandomVar::new(), - |var, duration| { - var.tabulate(duration); - var - }, - ) - .reduce( - || RandomVar::new(), - |var_a, var_b| { - var_a.merge(&var_b); - var_a - }, - ) + // Use thread-based parallelism to concurrently run simulation models. + (1..=reruns) + .into_par_iter() + .map(|i| { + simcore_rs::simulation( + // Global data. + Philosophers { + master_rng: RefCell::new(SmallRng::seed_from_u64(i as u64 * SEED)), + sim_duration: Cell::default(), + }, + // Simulation entry point. + |sim| Process::new(sim, sim_main(sim, count)), + ) + .sim_duration + .get() + }) + .fold( + || RandomVar::new(), + |var, duration| { + var.tabulate(duration); + var + }, + ) + .reduce( + || RandomVar::new(), + |var_a, var_b| { + var_a.merge(&var_b); + var_a + }, + ) } /// Entry point for the Dining Philosophers simulation. @@ -194,10 +203,10 @@ fn philosophers(count: usize, reruns: usize) -> RandomVar { /// Runs multiple simulations and prints out the statistical results. #[cfg(not(test))] fn main() { - const EXPERIMENT_COUNT: usize = 500; - - let sim_duration = philosophers(PHILOSOPHER_COUNT, EXPERIMENT_COUNT); - println!("Simulation duration until deadlock: {:#}", sim_duration); + const EXPERIMENT_COUNT: usize = 500; + + let sim_duration = philosophers(PHILOSOPHER_COUNT, EXPERIMENT_COUNT); + println!("Simulation duration until deadlock: {:#}", sim_duration); } #[cfg(test)] @@ -212,96 +221,87 @@ mod slx; /// of reruns. #[cfg(test)] mod bench { - use super::*; - use criterion::{criterion_group, AxisScale, BenchmarkId, Criterion, PlotConfiguration}; - - #[cfg(feature = "odemx")] - mod odemx { - use std::os::raw::c_uint; - - #[link(name = "odemx", kind = "static")] - extern "C" { - pub fn philosophers(count: c_uint, reruns: c_uint); - } - } - - #[cfg(windows)] - const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; - const RANGE: u32 = 10; - const STEP: usize = 3; - - /// Benchmarks the Dining Philosophers simulation using different numbers of - /// reruns. - fn philosopher_bench(c: &mut Criterion) { - let mut group = c.benchmark_group("Philosophers"); - - // Set up the benchmark parameters. - group.confidence_level(0.99); - group.plot_config( - PlotConfiguration::default().summary_scale(AxisScale::Logarithmic), - ); - - #[cfg(windows)] - let slx_path = { - let path = slx::slx_version(SLX_PATH); - - if path.is_none() { - println!("SLX not found, skipping SLX benchmarks!"); - } else { - println!("Using SLX program at {:?}", path.as_ref().unwrap()); - } - - path - }; - - // Vary the number of performed reruns in each experiment. - for experiment_count in (0..RANGE).map(|c| (1 << c) * STEP) { - #[cfg(windows)] - if let Some(path) = slx_path.as_ref() { - let count = experiment_count.to_string(); - let args = [ - "/silent", - "/stdout", - "/noicon", - "/nowarn", - "/noxwarn", - "/#BENCH", - "slx\\philosophers.slx", - count.as_str(), - ]; - - // Benchmark the SLX implementation. - group.bench_function( - BenchmarkId::new("SLX", experiment_count), - |b| { - b.iter_custom(|iters| { - slx::slx_bench(path.as_os_str(), &args, iters as usize) - .expect("Couldn't benchmark the SLX program") - }) - }, - ); - } - - // Benchmark the C++ implementation. - #[cfg(feature = "odemx")] - group.bench_function( - BenchmarkId::new("ODEMx", experiment_count), - |b| { - b.iter(|| unsafe { - odemx::philosophers(PHILOSOPHER_COUNT as _, experiment_count as _) - }) - }, - ); - - // Benchmark the Rust implementation. - group.bench_function( - BenchmarkId::new("Rust", experiment_count), - |b| b.iter(|| philosophers(PHILOSOPHER_COUNT, experiment_count)), - ); - } - - group.finish(); - } - - criterion_group!(benches, philosopher_bench); + use super::*; + use criterion::{criterion_group, AxisScale, BenchmarkId, Criterion, PlotConfiguration}; + + #[cfg(feature = "odemx")] + mod odemx { + use std::os::raw::c_uint; + + #[link(name = "odemx", kind = "static")] + extern "C" { + pub fn philosophers(count: c_uint, reruns: c_uint); + } + } + + #[cfg(windows)] + const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; + const RANGE: u32 = 10; + const STEP: usize = 3; + + /// Benchmarks the Dining Philosophers simulation using different numbers of + /// reruns. + fn philosopher_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("Philosophers"); + + // Set up the benchmark parameters. + group.confidence_level(0.99); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + #[cfg(windows)] + let slx_path = { + let path = slx::slx_version(SLX_PATH); + + if path.is_none() { + println!("SLX not found, skipping SLX benchmarks!"); + } else { + println!("Using SLX program at {:?}", path.as_ref().unwrap()); + } + + path + }; + + // Vary the number of performed reruns in each experiment. + for experiment_count in (0..RANGE).map(|c| (1 << c) * STEP) { + #[cfg(windows)] + if let Some(path) = slx_path.as_ref() { + let count = experiment_count.to_string(); + let args = [ + "/silent", + "/stdout", + "/noicon", + "/nowarn", + "/noxwarn", + "/#BENCH", + "slx\\philosophers.slx", + count.as_str(), + ]; + + // Benchmark the SLX implementation. + group.bench_function(BenchmarkId::new("SLX", experiment_count), |b| { + b.iter_custom(|iters| { + slx::slx_bench(path.as_os_str(), &args, iters as usize) + .expect("Couldn't benchmark the SLX program") + }) + }); + } + + // Benchmark the C++ implementation. + #[cfg(feature = "odemx")] + group.bench_function(BenchmarkId::new("ODEMx", experiment_count), |b| { + b.iter(|| unsafe { + odemx::philosophers(PHILOSOPHER_COUNT as _, experiment_count as _) + }) + }); + + // Benchmark the Rust implementation. + group.bench_function(BenchmarkId::new("Rust", experiment_count), |b| { + b.iter(|| philosophers(PHILOSOPHER_COUNT, experiment_count)) + }); + } + + group.finish(); + } + + criterion_group!(benches, philosopher_bench); } diff --git a/examples/slx/mod.rs b/examples/slx/mod.rs index 089844a..0fb39b1 100644 --- a/examples/slx/mod.rs +++ b/examples/slx/mod.rs @@ -1,102 +1,112 @@ -use std::{process::Command, time::Duration, path::Path, iter::once}; -use std::ffi::{OsString, OsStr}; use itertools::Itertools; +use std::ffi::{OsStr, OsString}; +use std::{iter::once, path::Path, process::Command, time::Duration}; /// Runs the SLX runtime environment once for an SLX program that measures its /// own duration and returns it on the standard output. -/// +/// /// Errors during this execution are returned back to the caller. -fn slx_run_once(program: &OsStr, arguments: &[&str], iterations: usize) -> Result<Duration, (i32, String)> { - // start the SLX runtime and wait for its return - let rc = Command::new(program) - .args(arguments) - .arg(iterations.to_string()) - .output() - .map_err(|err| (-1, format!("Failed to run the SLX program: {}", err)))?; - - if !rc.status.success() { - // check the error code and add a description to it - Err(rc.status - .code() - .map_or_else( - || (-10000, "Unknown error code".to_string()), - |code| { - (code, match code { - -10001 => "Unable to find a security key with SLX permission".to_string(), - -10002 => "The command line includes an invalid option".to_string(), - -10003 => "The source file cannot be found".to_string(), - -10004 => "The specified/implied output file cannot be written".to_string(), - -10005 => "The program contains compile-time errors".to_string(), - -10006 => "The program has terminated with a run-time error".to_string(), - -10007 => "The /genrts option failed".to_string(), - _ => format!("Unknown error code: {}", code) - }) - } - ) - ) - } else { - Ok(Duration::from_secs_f64( - (&String::from_utf8_lossy(&rc.stdout)).trim().parse().unwrap() - )) - } +fn slx_run_once( + program: &OsStr, + arguments: &[&str], + iterations: usize, +) -> Result<Duration, (i32, String)> { + // start the SLX runtime and wait for its return + let rc = Command::new(program) + .args(arguments) + .arg(iterations.to_string()) + .output() + .map_err(|err| (-1, format!("Failed to run the SLX program: {}", err)))?; + + if !rc.status.success() { + // check the error code and add a description to it + Err(rc.status.code().map_or_else( + || (-10000, "Unknown error code".to_string()), + |code| { + ( + code, + match code { + -10001 => "Unable to find a security key with SLX permission".to_string(), + -10002 => "The command line includes an invalid option".to_string(), + -10003 => "The source file cannot be found".to_string(), + -10004 => "The specified/implied output file cannot be written".to_string(), + -10005 => "The program contains compile-time errors".to_string(), + -10006 => "The program has terminated with a run-time error".to_string(), + -10007 => "The /genrts option failed".to_string(), + _ => format!("Unknown error code: {}", code), + }, + ) + }, + )) + } else { + Ok(Duration::from_secs_f64( + (&String::from_utf8_lossy(&rc.stdout)) + .trim() + .parse() + .unwrap(), + )) + } } /// Repeats runs of an SLX program for a specified number of times and collects /// the total (real-time) duration of those combined runs. -/// +/// /// The SLX program in question has to report its own real-time duration using /// the standard output. -pub fn slx_bench(program: &OsStr, arguments: &[&str], iterations: usize) -> Result<Duration, String> { - // try to complete the iterations in a single run of the SLX program - slx_run_once(program, arguments, iterations) - .or_else(|(code, desc)| { - if code == -10006 { - // Runtime error: this happens when the free version of SLX - // exceeds its total instance budget, either through a - // complicated scenario or too many iterations in one call. - // We can still make this work by running SLX multiple times - // with lower iteration counts and combining the timings. - // This effectively trades benchmark speed with simulation model - // size. - (1..) - .map(|i| iterations >> i) - .take_while(|&chunk_size| chunk_size > 0) - // .inspect(|chunk_size| println!("reducing the chunk size to {}", chunk_size)) - .map(|chunk_size| { - (0..(iterations - 1)/chunk_size) - .map(|_| chunk_size) - .chain(once((iterations - 1) % chunk_size + 1)) - .map(|size| - slx_run_once(program, arguments, size) - ) - .fold_ok(Duration::default(), |duration, run| duration + run) - }) - .find_map(|result| result.ok()) - .ok_or(desc) - } else { - Err(desc) - } - }) +pub fn slx_bench( + program: &OsStr, + arguments: &[&str], + iterations: usize, +) -> Result<Duration, String> { + // try to complete the iterations in a single run of the SLX program + slx_run_once(program, arguments, iterations).or_else(|(code, desc)| { + if code == -10006 { + // Runtime error: this happens when the free version of SLX + // exceeds its total instance budget, either through a + // complicated scenario or too many iterations in one call. + // We can still make this work by running SLX multiple times + // with lower iteration counts and combining the timings. + // This effectively trades benchmark speed with simulation model + // size. + (1..) + .map(|i| iterations >> i) + .take_while(|&chunk_size| chunk_size > 0) + // .inspect(|chunk_size| println!("reducing the chunk size to {}", chunk_size)) + .map(|chunk_size| { + (0..(iterations - 1) / chunk_size) + .map(|_| chunk_size) + .chain(once((iterations - 1) % chunk_size + 1)) + .map(|size| slx_run_once(program, arguments, size)) + .fold_ok(Duration::default(), |duration, run| duration + run) + }) + .find_map(|result| result.ok()) + .ok_or(desc) + } else { + Err(desc) + } + }) } /// Attempts to find the best SLX version for the benchmarks, falling back on /// the free version if no SLX license can be found. pub fn slx_version(base: impl AsRef<Path>) -> Option<OsString> { - if base.as_ref().exists() { - let programs = ["se64.exe", "se32.exe", "sse.exe"]; - let args = ["/silent", "/noicon", "/nowarn", "/noxwarn", "bad_file.slx"]; - - for path in programs.iter().map(|p| base.as_ref().join(p)) { - match Command::new(&path).args(&args).status() { - Err(_) => continue, - Ok(rc) => if let Some(code) = rc.code() { - if code == -10003 { - return Some(path.into_os_string()) - } - } - } - } - } - - None + if base.as_ref().exists() { + let programs = ["se64.exe", "se32.exe", "sse.exe"]; + let args = ["/silent", "/noicon", "/nowarn", "/noxwarn", "bad_file.slx"]; + + for path in programs.iter().map(|p| base.as_ref().join(p)) { + match Command::new(&path).args(&args).status() { + Err(_) => continue, + Ok(rc) => { + if let Some(code) = rc.code() { + if code == -10003 { + return Some(path.into_os_string()); + } + } + } + } + } + } + + None } diff --git a/src/bin/coroutines.rs b/src/bin/coroutines.rs index 2204b68..0da0f7d 100644 --- a/src/bin/coroutines.rs +++ b/src/bin/coroutines.rs @@ -2,57 +2,63 @@ use self::Token::*; use std::rc::Rc; async fn decompress<I>(mut inp: I, out: Sender<char>) -where I: Iterator<Item = u8> { - while let Some(c) = inp.next() { - if c == 0xFF { - let len = inp.next().unwrap(); - let c = inp.next().unwrap(); - - for _ in 0..len { - out.send(c as char).await; - } - } else { - out.send(c as char).await; - } - } +where + I: Iterator<Item = u8>, +{ + while let Some(c) = inp.next() { + if c == 0xFF { + let len = inp.next().unwrap(); + let c = inp.next().unwrap(); + + for _ in 0..len { + out.send(c as char).await; + } + } else { + out.send(c as char).await; + } + } } #[derive(Clone, Debug)] -enum Token { WORD(String), PUNCT(char) } +enum Token { + WORD(String), + PUNCT(char), +} async fn tokenize(inp: Receiver<char>, out: Sender<Token>) { - while let Some(mut c) = inp.recv().await { - if c.is_alphabetic() { - let mut text = c.to_string(); - while let Some(new_c) = inp.recv().await { - if new_c.is_alphabetic() { - text.push(new_c); - } else { - c = new_c; break; - } - } - out.send(WORD(text)).await; - } - out.send(PUNCT(c)).await; - } + while let Some(mut c) = inp.recv().await { + if c.is_alphabetic() { + let mut text = c.to_string(); + while let Some(new_c) = inp.recv().await { + if new_c.is_alphabetic() { + text.push(new_c); + } else { + c = new_c; + break; + } + } + out.send(WORD(text)).await; + } + out.send(PUNCT(c)).await; + } } fn main() { - let exec = Executor::new(); - let spawner = exec.spawner(); - let input = b"He\xff\x02lo IST!".iter().cloned(); - - exec.run(async { - let (s1, r1) = channel::<char>(); - let (s2, r2) = channel::<Token>(); - - spawner.spawn(decompress(input, s1)); - spawner.spawn(tokenize(r1, s2)); - - while let Some(token) = r2.recv().await { - println!("{:?}", token); - } - }); + let exec = Executor::new(); + let spawner = exec.spawner(); + let input = b"He\xff\x02lo IST!".iter().cloned(); + + exec.run(async { + let (s1, r1) = channel::<char>(); + let (s2, r2) = channel::<Token>(); + + spawner.spawn(decompress(input, s1)); + spawner.spawn(tokenize(r1, s2)); + + while let Some(token) = r2.recv().await { + println!("{:?}", token); + } + }); } /* **************************** library contents **************************** */ @@ -60,83 +66,83 @@ fn main() { // additional imports for the necessary execution environment use std::{ - pin::Pin, - cell::RefCell, - future::Future, - task::{self, Poll, Context}, - mem::swap + cell::RefCell, + future::Future, + mem::swap, + pin::Pin, + task::{self, Context, Poll}, }; /* ************************** executor environment ************************** */ // heterogeneous, pinned list of futures -type FutureQ = Vec<Pin<Box<dyn Future<Output=()>>>>; +type FutureQ = Vec<Pin<Box<dyn Future<Output = ()>>>>; /// A simple executor that drives the event loop. struct Executor { - sched: RefCell<FutureQ> + sched: RefCell<FutureQ>, } impl Executor { - /// Constructs a new executor. - pub fn new() -> Self { - Executor { - sched: RefCell::new(vec![]) - } - } - - /// Creates and returns a Spawner that can be used to insert new futures - /// into the executors future queue. - pub fn spawner(&self) -> Spawner { - Spawner { sched: &self.sched } - } - - /// Runs a future to completion. - pub fn run<F: Future>(&self, mut future: F) -> F::Output { - // construct a custom context to pass into the poll()-method - let waker = unsafe { - task::Waker::from_raw(task::RawWaker::new( - std::ptr::null(), - &EXECUTOR_WAKER_VTABLE - )) - }; - let mut cx = task::Context::from_waker(&waker); - - // pin the passed future for the duration of this function; - // note: this is safe since we're not moving it around - let mut future = unsafe { - Pin::new_unchecked(&mut future) - }; - let mut sched = FutureQ::new(); - - // implement a busy-wait event loop for simplicity - loop { - // poll the primary future, allowing secondary futures to spawn - if let Poll::Ready(val) = future.as_mut().poll(&mut cx) { - break val; - } - - // swap the scheduled futures with an empty queue - swap(&mut sched, &mut self.sched.borrow_mut()); - - // iterate over all secondary futures presently scheduled - for mut future in sched.drain(..) { - // if they are not completed, reschedule - if future.as_mut().poll(&mut cx).is_pending() { - self.sched.borrow_mut().push(future); - } - } - } - } + /// Constructs a new executor. + pub fn new() -> Self { + Executor { + sched: RefCell::new(vec![]), + } + } + + /// Creates and returns a Spawner that can be used to insert new futures + /// into the executors future queue. + pub fn spawner(&self) -> Spawner { + Spawner { sched: &self.sched } + } + + /// Runs a future to completion. + pub fn run<F: Future>(&self, mut future: F) -> F::Output { + // construct a custom context to pass into the poll()-method + let waker = unsafe { + task::Waker::from_raw(task::RawWaker::new( + std::ptr::null(), + &EXECUTOR_WAKER_VTABLE, + )) + }; + let mut cx = task::Context::from_waker(&waker); + + // pin the passed future for the duration of this function; + // note: this is safe since we're not moving it around + let mut future = unsafe { Pin::new_unchecked(&mut future) }; + let mut sched = FutureQ::new(); + + // implement a busy-wait event loop for simplicity + loop { + // poll the primary future, allowing secondary futures to spawn + if let Poll::Ready(val) = future.as_mut().poll(&mut cx) { + break val; + } + + // swap the scheduled futures with an empty queue + swap(&mut sched, &mut self.sched.borrow_mut()); + + // iterate over all secondary futures presently scheduled + for mut future in sched.drain(..) { + // if they are not completed, reschedule + if future.as_mut().poll(&mut cx).is_pending() { + self.sched.borrow_mut().push(future); + } + } + } + } } /// A handle to the executors future queue that can be used to insert new tasks. -struct Spawner<'a> { sched: &'a RefCell<FutureQ> } +struct Spawner<'a> { + sched: &'a RefCell<FutureQ>, +} impl<'a> Spawner<'a> { - pub fn spawn(&self, future: impl Future<Output=()> + 'static) { - self.sched.borrow_mut().push(Box::pin(future)); - } + pub fn spawn(&self, future: impl Future<Output = ()> + 'static) { + self.sched.borrow_mut().push(Box::pin(future)); + } } /* **************************** specialized waker *************************** */ @@ -145,102 +151,121 @@ impl<'a> Spawner<'a> { struct TrivialWaker; impl TrivialWaker { - unsafe fn clone(_this: *const ()) -> task::RawWaker { - unimplemented!() - } - - unsafe fn wake(_this: *const ()) { - unimplemented!() - } - - unsafe fn wake_by_ref(_this: *const ()) { - unimplemented!() - } - - unsafe fn drop(_this: *const ()) {} + unsafe fn clone(_this: *const ()) -> task::RawWaker { + unimplemented!() + } + + unsafe fn wake(_this: *const ()) { + unimplemented!() + } + + unsafe fn wake_by_ref(_this: *const ()) { + unimplemented!() + } + + unsafe fn drop(_this: *const ()) {} } /// Virtual function table for the simple waker. static EXECUTOR_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( - TrivialWaker::clone, - TrivialWaker::wake, - TrivialWaker::wake_by_ref, - TrivialWaker::drop + TrivialWaker::clone, + TrivialWaker::wake, + TrivialWaker::wake_by_ref, + TrivialWaker::drop, ); /* ************************** asynchronous wrappers ************************* */ /// Simple channel with space for only one element. struct Channel<T> { - slot: RefCell<Option<T>> + slot: RefCell<Option<T>>, } /// Creates a channel and returns a pair of read and write ends. fn channel<T>() -> (Sender<T>, Receiver<T>) { - let channel = Rc::new(Channel { - slot: RefCell::new(None) - }); - (Sender { channel: channel.clone() }, Receiver { channel }) + let channel = Rc::new(Channel { + slot: RefCell::new(None), + }); + ( + Sender { + channel: channel.clone(), + }, + Receiver { channel }, + ) } /// Write-end of a channel. -struct Sender<T> { channel: Rc<Channel<T>> } +struct Sender<T> { + channel: Rc<Channel<T>>, +} /// Read-end of a channel. -struct Receiver<T> { channel: Rc<Channel<T>> } +struct Receiver<T> { + channel: Rc<Channel<T>>, +} impl<T: Clone> Sender<T> { - /// Method used to push an element into the channel. - /// Blocks until the previous element has been consumed. - pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's { - SendFuture { channel: &self.channel, elem } - } + /// Method used to push an element into the channel. + /// Blocks until the previous element has been consumed. + pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's { + SendFuture { + channel: &self.channel, + elem, + } + } } impl<T: Clone> Receiver<T> { - /// Method used to consume an element from the channel. - /// Blocks until an element can be consumed. - pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r { - ReceiveFuture { channel: &self.channel } - } + /// Method used to consume an element from the channel. + /// Blocks until an element can be consumed. + pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r { + ReceiveFuture { + channel: &self.channel, + } + } } /// A future that pushes an element into a channel. -struct SendFuture<'c,T> { channel: &'c Rc<Channel<T>>, elem: T } +struct SendFuture<'c, T> { + channel: &'c Rc<Channel<T>>, + elem: T, +} /// A future that consumes an element from a channel. -struct ReceiveFuture<'c,T> { channel: &'c Rc<Channel<T>> } - -impl<T: Clone> Future for SendFuture<'_,T> { - type Output = (); - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { - // check if there is space for the element - if self.channel.slot.borrow().is_none() { - // replace the empty element with ours - self.channel.slot.replace(Some(self.elem.clone())); - Poll::Ready(()) - } else { - // try again at a later time - Poll::Pending - } - } -} - -impl<T> Future for ReceiveFuture<'_,T> { - type Output = Option<T>; - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { - // check if there is an element in the channel - if let Some(c) = self.channel.slot.borrow_mut().take() { - // return it - Poll::Ready(Some(c)) - } else if Rc::strong_count(self.channel) == 1 { - // check if the sender disconnected - Poll::Ready(None) - } else { - // try again at a later time - Poll::Pending - } - } +struct ReceiveFuture<'c, T> { + channel: &'c Rc<Channel<T>>, +} + +impl<T: Clone> Future for SendFuture<'_, T> { + type Output = (); + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { + // check if there is space for the element + if self.channel.slot.borrow().is_none() { + // replace the empty element with ours + self.channel.slot.replace(Some(self.elem.clone())); + Poll::Ready(()) + } else { + // try again at a later time + Poll::Pending + } + } +} + +impl<T> Future for ReceiveFuture<'_, T> { + type Output = Option<T>; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { + // check if there is an element in the channel + if let Some(c) = self.channel.slot.borrow_mut().take() { + // return it + Poll::Ready(Some(c)) + } else if Rc::strong_count(self.channel) == 1 { + // check if the sender disconnected + Poll::Ready(None) + } else { + // try again at a later time + Poll::Pending + } + } } diff --git a/src/lib.rs b/src/lib.rs index 8915037..92e3d03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,542 +1,713 @@ -//! A minimal discrete-event simulation framework using Rust's async/await. -//! -//! This module provides a simple discrete-event simulation environment where -//! processes are represented as asynchronous tasks (`Future`s). The simulation -//! advances in discrete time steps, executing processes scheduled at specific -//! simulation times. -//! -//! Key components: -//! -//! - **`simulation`**: Runs a single simulation given a shared global state -//! and a main process function. It sets up the scheduler (`Calendar`), -//! initializes the simulation context (`Sim`), and manages the event loop -//! until the main process terminates. -//! -//! - **`Sim`**: A lightweight handle to the scheduler, providing methods for -//! processes to interact with the simulation. Processes can use it to -//! schedule themselves or other processes, advance simulation time, retrieve -//! the current time, and access shared global data. -//! -//! - **`Process`**: Wraps a `Future` to represent a process in the simulation. -//! It can be scheduled to run at specific model times and can also act as a -//! waker to resume suspended processes. -//! -//! - **`Calendar`**: The scheduler that maintains the current model time and a -//! priority queue of processes scheduled to run at future times. -//! -//! - **Utility functions**: -//! - `sleep()`: A future that processes can await to suspend execution until -//! reactivated. -//! - `waker()`: A future that returns the current waker, allowing processes -//! to interact with the task system. -//! -//! **Example usage:** -//! -//! ```rust -//! use simcore_rs::*; -//! -//! async fn sim_main(sim: Sim<'_>) { -//! // Main Process code here -//! println!("Process started at time {}", sim.now()); -//! sim.advance(5.0).await; // Suspend for 5 units of time -//! println!("Process resumed at time {}", sim.now()); -//! } -//! -//! // Run the simulation with empty shared data -//! let shared_data = simulation((), |sim| Process::new(sim, sim_main(sim))); -//! ``` -//! -//! **Notes:** -//! -//! - The simulation is designed for single-threaded execution and is not `Send` -//! or `Sync` safe. -//! - Processes manage their own scheduling and can be reactivated by the -//! simulation context. -//! - Unsafe code is used internally (e.g., raw pointers in `Sim`), but safety -//! is maintained as long as simulation contexts do not escape the closure -//! passed to `simulation` which is enforced by the type system. -//! - Wakers are customized to integrate with the simulation's process -//! scheduling, and care is taken to prevent concurrency bugs, assuming -//! single-threaded execution. -//! -//! This framework is intended for educational purposes as part of my (Dorian -//! Weber) dissertation to illustrate how asynchronous Rust can be used to -//! implement a discrete-event simulation framework. It may not cover all edge -//! cases or be suitable for production use. - -use std::{ - cell::{Cell, RefCell}, - cmp::Ordering, - collections::BinaryHeap, - future::Future, - pin::Pin, - rc::Rc, - task::{self, Context, Poll} -}; - -pub mod util; - -// simple time type -pub type Time = f64; - -/// Performs a single simulation run. -/// -/// Input is a function that takes a simulation context and returns the first -/// process. -pub fn simulation<G,F>(shared: G, main: F) -> G -where F: FnOnce(Sim<'_,G>) -> Process<'_,G> { - // create a fresh scheduler and a handle to it - let sched = Calendar::new(shared); - let sim = Sim { handle: &sched }; - - // construct a custom context to pass into the poll()-method - let event_waker = StateEventWaker::new(sim); - let waker = unsafe { event_waker.as_waker() }; - let mut cx = Context::from_waker(&waker); - - // evaluate the passed function for the main process and schedule it - let root = main(sim); - sched.schedule(root.clone()); - - // pop processes until empty or the main process terminates - while let Some(process) = sched.next_process() { - if process.poll(&mut cx).is_ready() && process == root { break; } - } - - // clear the scheduler before it is dropped to break the dependency - // cycle between the processes and the scheduler that we made possible - // by using a raw pointer for the simulation context - sched.clear(); - - // return the global data - sched.shared -} - -// priority queue of time-process-pairs using time as the key -type EventQ<'s,G> = BinaryHeap<NextEvent<'s,G>>; - -/// The (private) scheduler for processes. -struct Calendar<'s,G> { - /// The current simulation time. - now: Cell<Time>, - - /// The event-calendar organized chronologically. - calendar: RefCell<EventQ<'s,G>>, - - /// The currently active process. - active: RefCell<Option<Process<'s,G>>>, - - /// Globally-accessible data. - shared: G -} - -impl<'s,G> Calendar<'s,G> { - /// Creates a new scheduler. - #[inline] - fn new(shared: G) -> Self { - Self { - now: Cell::default(), - calendar: RefCell::default(), - active: RefCell::default(), - shared - } - } - - /// Clears the scheduler, dropping all the contained processes. - fn clear(&self) { - // this is a surprisingly delicate operation because any of the - // processes that are still present in the event-queue may re- or - // de-schedule processes upon drop, with the consequence of writing - // into the event-queue while it's being cleared (and therefore in an - // inconsistent state); swapping out the event queue with an empty one - // circumvents this issue completely, since clearing and dropping now - // behave like atomic operations - - // atomically replace the event queue with an empty one - let mut events = self.calendar.replace(EventQ::default()); - - // now clear the old one; the calendar may not be empty after this - // operation if any destructors scheduled new processes - events.clear(); - - // replace the contents of the calendar with the old (empty) event queue - // and drop the temporary one as an optimization - self.calendar.replace(events); - - // the user has to be actively malicious by scheduling new processes - // upon dropping them to make it this far, and we don't have all day - assert!(self.calendar.borrow().is_empty(), - "Please don't activate new processes on drop."); - - // replace the active process with an already terminated one - self.active.replace(None); - } - - /// Schedules a process at the current simulation time. - #[inline] - fn schedule(&self, process: Process<'s,G>) { - self.schedule_in(Time::default(), process); - } - - /// Schedules a process at a later simulation time. - #[inline] - fn schedule_in(&self, dt: Time, process: Process<'s,G>) { - self.calendar.borrow_mut().push( - NextEvent(self.now.get() + dt, process) - ); - } - - /// Removes the process with the next event time from the calendar and - /// activates it. - #[inline] - fn next_process(&self) -> Option<Process<'s,G>> { - let NextEvent(now, process) = self.calendar.borrow_mut().pop()?; - self.now.set(now); - self.active.replace(Some(process.clone())); - Some(process) - } -} - -/// A light-weight handle to the scheduler. -pub struct Sim<'s,G = ()> { handle: *const Calendar<'s,G> } - -// this allows the creation of copies -impl<'s,G> Clone for Sim<'s,G> { - #[inline] - fn clone(&self) -> Self { *self } -} - -// this allows moving the context without invalidating it (copy semantics) -impl<'s,G> Copy for Sim<'s,G> {} - -impl<'s,G> Sim<'s,G> { - /// Returns a (reference-counted) copy of the currently active process. - #[inline] - pub fn active(&self) -> Process<'s,G> { - self.sched().active.borrow().clone().expect("no active process") - } - - /// Activates a new process with the given future. - #[inline] - pub fn activate<F>(&self, f: F) - where F: Future<Output = ()> + 's { - self.reactivate(Process::new(*self, f)); - } - - /// Reactivates a process that has been suspended with wait(). - #[inline] - pub fn reactivate(&self, process: Process<'s,G>) { - assert!(process.0.borrow().state.is_some()); - self.sched().schedule(process); - } - - /// Reactivates the currently active process after some time has passed. - #[inline] - pub async fn advance(&self, dt: Time) { - self.sched().schedule_in(dt, self.active()); - sleep().await - } - - /// Returns the current simulation time. - #[inline] - pub fn now(&self) -> Time { - self.sched().now.get() - } - - /// Returns a shared reference to the global data. - #[inline] - pub fn global(&self) -> &G { - &self.sched().shared - } - - /// Private function to get a safe reference to the scheduler. - #[inline] - fn sched(&self) -> &Calendar<'s,G> { - // This is safe if no simulation context escapes from the closure - // passed to simulation() which is enforced through the use of - // higher-order trait-bounds. - unsafe { &*self.handle } - } -} - -/// A bare-bone process type that can also be used as a waker. -pub struct Process<'s,G>(Rc<RefCell<Inner<'s,G>>>); - -/// The private details of the [`Process`](struct.Process.html) type. -struct Inner<'s,G> { - /// The simulation context needed to implement the `Waker` interface. - context: Sim<'s,G>, - /// `Some` [`Future`] associated with this process or `None` if it has been - /// terminated externally. - /// - /// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html - state: Option<Pin<Box<dyn Future<Output = ()> + 's>>> -} - -impl<'s,G> Process<'s,G> { - /// Combines a future and a simulation context to a process. - #[inline] - pub fn new(sim: Sim<'s,G>, fut: impl Future<Output = ()> + 's) -> Self { - Process(Rc::new(RefCell::new(Inner { - context: sim, state: Some(Box::pin(fut)) - }))) - } - - /// Releases the [`Future`] contained in this process. - /// - /// This will also execute all the destructors for the local variables - /// initialized by this process. - /// - /// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html - #[inline] - pub fn terminate(&self) { - self.0.borrow_mut().state.take(); - } - - /// Returns a `Waker` for this process. - #[inline] - pub fn waker(self) -> task::Waker { - unsafe { - task::Waker::from_raw(self.raw_waker()) - } - } - - /// Private function for polling the process. - #[inline] - fn poll(&self, cx: &mut Context<'_>) -> Poll<()> { - if let Some(fut) = self.0.borrow_mut().state.as_mut() { - fut.as_mut().poll(cx) - } else { - Poll::Ready(()) - } - } -} - -// Increases the reference counter of this process. -impl<'s,G> Clone for Process<'s,G> { - #[inline] - fn clone(&self) -> Self { - Process(self.0.clone()) - } -} - -// allows processes to be compared for equality -impl<G> PartialEq for Process<'_,G> { - #[inline] - fn eq(&self, other: &Self) -> bool { - Rc::ptr_eq(&self.0, &other.0) - } -} - -// marks the equality-relation as total -impl<G> Eq for Process<'_,G> {} - -/// Time-process-pair that has a total order defined based on the time. -struct NextEvent<'p,G>(Time, Process<'p,G>); - -impl<G> PartialEq for NextEvent<'_,G> { - #[inline] - fn eq(&self, other: &Self) -> bool { - self.0 == other.0 - } -} - -impl<G> Eq for NextEvent<'_,G> {} - -impl<G> PartialOrd for NextEvent<'_,G> { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option<Ordering> { - Some(self.cmp(other)) - } -} - -impl<G> Ord for NextEvent<'_,G> { - #[inline] - fn cmp(&self, other: &Self) -> Ordering { - self.0.partial_cmp(&other.0) - .expect("illegal event time NaN").reverse() - } -} - -/* **************************** specialized waker *************************** */ - -impl<'s,G> Process<'s,G> { - /// Virtual function table for the waker. - const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( - Self::clone, - Self::wake, - Self::wake_by_ref, - Self::drop - ); - - /// Constructs a raw waker from a simulation context and a process. - #[inline] - fn raw_waker(self) -> task::RawWaker { - task::RawWaker::new( - Rc::into_raw(self.0) as *const (), - &Self::VTABLE - ) - } - - unsafe fn clone(this: *const ()) -> task::RawWaker { - let waker = Rc::from_raw( - this as *const RefCell<Inner<'_,G>> - ); - - // increase the reference counter once - let _ = Rc::into_raw(waker.clone()); - - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - task::RawWaker::new( - Rc::into_raw(waker) as *const (), - &Self::VTABLE - ) - } - - unsafe fn wake(this: *const ()) { - let waker = Rc::from_raw( - this as *const RefCell<Inner<'_,G>> - ); - - // this can happen if a synchronization structure forgets to clean - // up registered Waker objects on destruct; this would lead to - // hard-to-diagnose bugs if we were to ignore it - assert!(waker.borrow().state.is_some(), - "Attempted to wake a terminated process."); - - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - let sim = waker.borrow().context; - sim.reactivate(Process(waker)); - } - - unsafe fn wake_by_ref(this: *const ()) { - let waker = Rc::from_raw( - this as *const RefCell<Inner<'_,G>> - ); - - // keep the waker alive by incrementing the reference count - let _ = Rc::into_raw(waker.clone()); - - // this can happen if a synchronization structure forgets to clean - // up registered Waker objects on destruct; this would lead to - // hard-to-diagnose bugs if we were to ignore it - assert!(waker.borrow().state.is_some(), - "Attempted to wake a terminated process."); - - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - let sim = waker.borrow().context; - sim.reactivate(Process(waker)); - } - - unsafe fn drop(this: *const ()) { - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - Rc::from_raw( - this as *const RefCell<Inner<'_,G>> - ); - } -} - -/// Complex waker that is used to implement state events. -/// -/// This is the shallow version that is created on the stack of the function -/// running the event loop. It assumes that the stored process is the currently -/// active one and creates the deep version when it is cloned. -struct StateEventWaker<'s,G> { context: Sim<'s,G> } - -impl<'s,G> StateEventWaker<'s,G> { - /// Virtual function table for the waker. - const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( - Self::clone, - Self::wake, - Self::wake_by_ref, - Self::drop - ); - - /// Creates a new (shallow) waker using only the simulation context. - #[inline] - fn new(sim: Sim<'s,G>) -> Self { - StateEventWaker { context: sim } - } - - /// Constructs a new waker using only a reference. - /// - /// This function is unsafe because it is up to the user to ensure that the - /// waker doesn't outlive the reference. - #[inline] - unsafe fn as_waker(&self) -> task::Waker { - task::Waker::from_raw( - task::RawWaker::new( - self as *const _ as *const (), - &Self::VTABLE - ) - ) - } - - unsafe fn clone(this: *const ()) -> task::RawWaker { - // return the currently active process as a raw waker - (*(this as *const Self)).context.active().raw_waker() - } - - unsafe fn wake(_this: *const ()) { - // waking the active process can safely be ignored - } - - unsafe fn wake_by_ref(_this: *const ()) { - // waking the active process can safely be ignored - } - - unsafe fn drop(_this: *const ()) { - // memory is released in the main event loop - } -} - -/* *************************** specialized futures ************************** */ - -/// Returns a future that unconditionally puts the calling process to sleep. -#[inline] -pub fn sleep() -> impl Future<Output = ()> { - Sleep { ready: false } -} - -/// Returns a future that can be awaited to produce the currently set waker. -#[inline] -pub fn waker() -> impl Future<Output = task::Waker> { - Waker -} - -/// Future that blocks on the first call and returns on the second one. -struct Sleep { ready: bool } - -impl Future for Sleep { - type Output = (); - - #[inline] - fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { - if self.ready { - Poll::Ready(()) - } else { - self.ready = true; - Poll::Pending - } - } -} - -/// Future that returns immediately with a cloned waker. -struct Waker; - -impl Future for Waker { - type Output = task::Waker; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - Poll::Ready(cx.waker().clone()) - } -} +//! A minimal discrete-event simulation framework using Rust's async/await. +//! +//! This module provides a simple discrete-event simulation environment where +//! processes are represented as asynchronous tasks (`Future`s). The simulation +//! advances in discrete time steps, executing processes scheduled at specific +//! simulation times. +//! +//! Key components: +//! +//! - **`simulation`**: Runs a single simulation given a shared global state +//! and a main process function. It sets up the scheduler (`Calendar`), +//! initializes the simulation context (`Sim`), and manages the event loop +//! until the main process terminates. +//! +//! - **`Sim`**: A lightweight handle to the scheduler, providing methods for +//! processes to interact with the simulation. Processes can use it to +//! schedule themselves or other processes, advance simulation time, retrieve +//! the current time, and access shared global data. +//! +//! - **`Process`**: Wraps a `Future` to represent a process in the simulation. +//! It can be scheduled to run at specific model times and can also act as a +//! waker to resume suspended processes. +//! +//! - **`Calendar`**: The scheduler that maintains the current model time and a +//! priority queue of processes scheduled to run at future times. +//! +//! - **Utility functions**: +//! - `sleep()`: A future that processes can await to suspend execution until +//! reactivated. +//! - `waker()`: A future that returns the current waker, allowing processes +//! to interact with the task system. +//! +//! **Example usage:** +//! +//! ```rust +//! use simcore_rs::*; +//! +//! async fn sim_main(sim: Sim<'_>) { +//! // Main Process code here +//! println!("Process started at time {}", sim.now()); +//! sim.advance(5.0).await; // Suspend for 5 units of time +//! println!("Process resumed at time {}", sim.now()); +//! } +//! +//! // Run the simulation with empty shared data +//! let shared_data = simulation((), |sim| Process::new(sim, sim_main(sim))); +//! ``` +//! +//! **Notes:** +//! +//! - The simulation is designed for single-threaded execution and is not `Send` +//! or `Sync` safe. +//! - Processes manage their own scheduling and can be reactivated by the +//! simulation context. +//! - Unsafe code is used internally (e.g., raw pointers in `Sim`), but safety +//! is maintained as long as simulation contexts do not escape the closure +//! passed to `simulation` which is enforced by the type system. +//! - Wakers are customized to integrate with the simulation's process +//! scheduling, and care is taken to prevent concurrency bugs, assuming +//! single-threaded execution. +//! +//! This framework is intended for educational purposes as part of my (Dorian +//! Weber) dissertation to illustrate how asynchronous Rust can be used to +//! implement a discrete-event simulation framework. It may not cover all edge +//! cases or be suitable for production use. + +use std::{ + cell::{Cell, RefCell}, + cmp::Ordering, + collections::BinaryHeap, + future::{poll_fn, Future}, + hash::{BuildHasherDefault, Hash, Hasher}, + mem::ManuallyDrop, + pin::Pin, + rc::{Rc, Weak}, + task::{self, Context, Poll}, +}; + +pub mod util; + +// simple time type +pub type Time = f64; + +/// Performs a single simulation run. +/// +/// Input is a function that takes a simulation context and returns the first +/// process. +pub fn simulation<G, F>(shared: G, main: F) -> G +where + F: FnOnce(Sim<'_, G>) -> Process<'_, G>, +{ + { + let sched; + let process_storage = Rc::default(); + + // create a fresh scheduler and a handle to it + sched = Calendar::new(&shared, Rc::downgrade(&process_storage)); + let sim = Sim { handle: &sched }; + + // construct a custom context to pass into the poll()-method + let event_waker = StateEventWaker::new(sim); + let waker = unsafe { event_waker.as_waker() }; + let mut cx = Context::from_waker(&waker); + + // evaluate the passed function for the main process and schedule it + let root = main(sim); + sched.schedule(root.clone()); + + // pop processes until empty or the main process terminates + while let Some(process) = sched.next_process() { + if process.poll(&mut cx).is_ready() { + if process == root { + break; + } + process_storage.borrow_mut().shift_remove(&process); + } + } + + // Here we drop `process_storage` first and `sched` after. Since `process_storage` + // is an `IndexSet` (and not a `HashSet`), all processes are guaranteed to be dropped, + // even if a destructor of a future unwinds. This ensures that any weak process + // wakers leaked out of the `simulation` function can no longer be upgraded. + } + + // return the global data + shared +} + +// priority queue of time-process-pairs using time as the key +type EventQ<'s, G> = BinaryHeap<NextEvent<'s, G>>; + +type FxIndexSet<T> = indexmap::IndexSet<T, BuildHasherDefault<rustc_hash::FxHasher>>; + +/// The (private) scheduler for processes. +struct Calendar<'s, G> { + /// The current simulation time. + now: Cell<Time>, + + inner: RefCell<CalendarInner<'s, G>>, + + /// Globally-accessible data. + shared: &'s G, + + process_storage: Weak<RefCell<FxIndexSet<StrongProcess<'s, G>>>>, +} + +struct CalendarInner<'s, G> { + /// The event-calendar organized chronologically. + calendar: EventQ<'s, G>, + + /// The currently active process. + active: Option<Process<'s, G>>, +} + +impl<G> Default for CalendarInner<'_, G> { + fn default() -> Self { + Self { + calendar: Default::default(), + active: Default::default(), + } + } +} + +impl<'s, G> Calendar<'s, G> { + /// Creates a new scheduler. + #[inline] + fn new( + shared: &'s G, + process_storage: Weak<RefCell<FxIndexSet<StrongProcess<'s, G>>>>, + ) -> Self { + Self { + now: Cell::default(), + inner: RefCell::default(), + shared, + process_storage, + } + } + + /// Schedules a process at the current simulation time. + #[inline] + fn schedule(&self, process: Process<'s, G>) { + self.schedule_in(Time::default(), process); + } + + /// Schedules a process at a later simulation time. + #[inline] + fn schedule_in(&self, dt: Time, process: Process<'s, G>) { + let calender = &mut self.inner.borrow_mut().calendar; + debug_assert!( + !calender.iter().any(|NextEvent(_, p)| *p == process), + "already scheduled" + ); + calender.push(NextEvent(self.now.get() + dt, process)); + } + + /// Removes the process with the next event time from the calendar and + /// activates it. + #[inline] + fn next_process(&self) -> Option<StrongProcess<'s, G>> { + let inner = &mut *self.inner.borrow_mut(); + loop { + let NextEvent(now, process) = inner.calendar.pop()?; + let Some(strong_process) = process.upgrade() else { + // process was terminated after being queued + continue; + }; + self.now.set(now); + inner.active = Some(process); + break Some(strong_process); + } + } +} + +/// A light-weight handle to the scheduler. +pub struct Sim<'s, G = ()> { + handle: &'s Calendar<'s, G>, +} + +// this allows the creation of copies +impl<'s, G> Clone for Sim<'s, G> { + #[inline] + fn clone(&self) -> Self { + *self + } +} + +// this allows moving the context without invalidating it (copy semantics) +impl<'s, G> Copy for Sim<'s, G> {} + +impl<'s, G> Sim<'s, G> { + /// Returns a (reference-counted) copy of the currently active process. + #[inline] + pub fn active(&self) -> Process<'s, G> { + self.sched() + .inner + .borrow() + .active + .as_ref() + .expect("no active process") + .clone() + } + + /// Activates a new process with the given future. + #[inline] + pub fn activate<F>(&self, f: F) + where + F: Future<Output = ()> + 's, + { + self.reactivate(Process::new(*self, f)); + } + + /// Reactivates a process that has been suspended with wait(). + #[inline] + pub fn reactivate(&self, process: Process<'s, G>) { + assert!(!process.is_terminated()); + self.sched().schedule(process); + } + + /// Reactivates the currently active process after some time has passed. + #[inline] + pub async fn advance(&self, dt: Time) { + self.sched().schedule_in(dt, self.active()); + sleep().await + } + + /// Returns the current simulation time. + #[inline] + pub fn now(&self) -> Time { + self.sched().now.get() + } + + /// Returns a shared reference to the global data. + #[inline] + pub fn global(&self) -> &G { + self.sched().shared + } + + /// Private function to get a safe reference to the scheduler. + #[inline] + fn sched(&self) -> &Calendar<'s, G> { + self.handle + } +} + +/// A bare-bone process type that can also be used as a waker. +pub struct Process<'s, G>(Weak<RefCell<ProcessInner<'s, G>>>); + +/// A private accessor to the inner parts of a [`Process`]. +/// +/// **WARNING**: This must never leak into user code! Forgetting a `StrongProcess` causes UB! +struct StrongProcess<'s, G>(Rc<RefCell<ProcessInner<'s, G>>>); + +/// The private details of the [`Process`](struct.Process.html) type. +struct ProcessInner<'s, G> { + /// The simulation context needed to implement the `Waker` interface. + context: Sim<'s, G>, + /// `Some` [`Future`] associated with this process or `None` if it has been + /// terminated externally. + /// + /// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html + state: Option<Pin<Box<dyn Future<Output = ()> + 's>>>, +} + +impl<'s, G> Process<'s, G> { + /// Combines a future and a simulation context to a process. + #[inline] + pub fn new(sim: Sim<'s, G>, fut: impl Future<Output = ()> + 's) -> Self { + let strong_process = StrongProcess(Rc::new(RefCell::new(ProcessInner { + context: sim, + state: Some(Box::pin(fut)), + }))); + + let process = strong_process.downgrade(); + + let process_storage = sim + .handle + .process_storage + .upgrade() + .expect("attempted to create process after simulation ended"); + process_storage.borrow_mut().insert(strong_process); + + process + } + + /// Releases the [`Future`] contained in this process. + /// + /// This will also execute all the destructors for the local variables + /// initialized by this process. + /// + /// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html + #[inline] + pub fn terminate(&self) { + let Some(strong_process) = self.upgrade() else { + // ok, we're already terminated + return; + }; + let sim = strong_process.0.borrow().context; + assert!( + sim.sched() + .inner + .borrow() + .active + .as_ref() + .is_none_or(|active| active != self), + "attempted to terminate active process" + ); + if let Some(process_storage) = sim.sched().process_storage.upgrade() { + process_storage.borrow_mut().shift_remove(&strong_process); + } else { + // This can happen if we're dropping the `process_storage` and the destructor + // of another process tries to terminate this process. In this case, we need to + // manually drop our future now to ensure that there are no dangling references + // from our future to their future. + strong_process.0.borrow_mut().state = None; + }; + drop(strong_process); + assert!(self.is_terminated(), "failed to terminate process"); + } + + /// Returns a `Waker` for this process. + #[inline] + pub fn waker(self) -> task::Waker { + unsafe { task::Waker::from_raw(self.raw_waker()) } + } + + #[inline] + fn is_terminated(&self) -> bool { + self.upgrade() + .is_none_or(|strong_process| strong_process.0.borrow().state.is_none()) + } + + /// Gets private access to the process. + /// + /// # Safety + /// + /// The caller must ensure that the [`StrongProcess`] is not leaked. + /// In particular, it must not be passed to user code. + #[inline] + fn upgrade(&self) -> Option<StrongProcess<'s, G>> { + self.0.upgrade().map(StrongProcess) + } +} + +impl<'s, G> StrongProcess<'s, G> { + /// Private function for polling the process. + #[inline] + fn poll(&self, cx: &mut Context<'_>) -> Poll<()> { + self.0 + .borrow_mut() + .state + .as_mut() + .expect("attempted to poll terminated task") + .as_mut() + .poll(cx) + } + + #[inline] + fn downgrade(&self) -> Process<'s, G> { + Process(Rc::downgrade(&self.0)) + } +} + +// Increases the reference counter of this process. +impl<'s, G> Clone for Process<'s, G> { + #[inline] + fn clone(&self) -> Self { + Process(self.0.clone()) + } +} + +// allows processes to be compared for equality +impl<G> PartialEq for Process<'_, G> { + #[inline] + fn eq(&self, other: &Self) -> bool { + Weak::ptr_eq(&self.0, &other.0) + } +} + +// marks the equality-relation as total +impl<G> Eq for Process<'_, G> {} + +// hash processes for pointer equality +impl<G> Hash for Process<'_, G> { + fn hash<H: Hasher>(&self, state: &mut H) { + self.0.as_ptr().hash(state); + } +} + +// allows processes to be compared for equality +impl<G> PartialEq for StrongProcess<'_, G> { + #[inline] + fn eq(&self, other: &Self) -> bool { + Rc::ptr_eq(&self.0, &other.0) + } +} + +impl<'s, G> PartialEq<Process<'s, G>> for StrongProcess<'s, G> { + #[inline] + fn eq(&self, other: &Process<'s, G>) -> bool { + Rc::as_ptr(&self.0) == other.0.as_ptr() + } +} + +// marks the equality-relation as total +impl<G> Eq for StrongProcess<'_, G> {} + +// hash processes for pointer equality +impl<G> Hash for StrongProcess<'_, G> { + fn hash<H: Hasher>(&self, state: &mut H) { + self.0.as_ptr().hash(state); + } +} + +/// Time-process-pair that has a total order defined based on the time. +struct NextEvent<'p, G>(Time, Process<'p, G>); + +impl<G> PartialEq for NextEvent<'_, G> { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl<G> Eq for NextEvent<'_, G> {} + +impl<G> PartialOrd for NextEvent<'_, G> { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(other)) + } +} + +impl<G> Ord for NextEvent<'_, G> { + #[inline] + fn cmp(&self, other: &Self) -> Ordering { + self.0 + .partial_cmp(&other.0) + .expect("illegal event time NaN") + .reverse() + } +} + +/* **************************** specialized waker *************************** */ + +impl<'s, G> Process<'s, G> { + /// Virtual function table for the waker. + const VTABLE: task::RawWakerVTable = + task::RawWakerVTable::new(Self::clone, Self::wake, Self::wake_by_ref, Self::drop); + + /// Constructs a raw waker from a simulation context and a process. + #[inline] + fn raw_waker(self) -> task::RawWaker { + task::RawWaker::new(Weak::into_raw(self.0) as *const (), &Self::VTABLE) + } + + unsafe fn clone(this: *const ()) -> task::RawWaker { + let waker_ref = + &*ManuallyDrop::new(Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>)); + + // increase the reference counter once + let waker = waker_ref.clone(); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + task::RawWaker::new(Weak::into_raw(waker) as *const (), &Self::VTABLE) + } + + unsafe fn wake(this: *const ()) { + let waker = Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>); + let process = Process(waker); + process.wake_impl(); + } + + unsafe fn wake_by_ref(this: *const ()) { + // keep the waker alive by incrementing the reference count + let waker = Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>); + let process = &*ManuallyDrop::new(Process(waker)); + process.clone().wake_impl(); + } + + fn wake_impl(self) { + let Some(strong_process) = self.upgrade() else { + // this can happen if a synchronization structure forgets to clean + // up registered Waker objects on destruct; this would lead to + // hard-to-diagnose bugs if we were to ignore it + panic!("attempted to wake a terminated process"); + }; + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + let sim = strong_process.0.borrow().context; + sim.reactivate(self); + } + + unsafe fn drop(this: *const ()) { + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>); + } +} + +/// Complex waker that is used to implement state events. +/// +/// This is the shallow version that is created on the stack of the function +/// running the event loop. It assumes that the stored process is the currently +/// active one and creates the deep version when it is cloned. +struct StateEventWaker<'s, G> { + context: Sim<'s, G>, +} + +impl<'s, G> StateEventWaker<'s, G> { + /// Virtual function table for the waker. + const VTABLE: task::RawWakerVTable = + task::RawWakerVTable::new(Self::clone, Self::wake, Self::wake_by_ref, Self::drop); + + /// Creates a new (shallow) waker using only the simulation context. + #[inline] + fn new(sim: Sim<'s, G>) -> Self { + StateEventWaker { context: sim } + } + + /// Constructs a new waker using only a reference. + /// + /// This function is unsafe because it is up to the user to ensure that the + /// waker doesn't outlive the reference. + #[inline] + unsafe fn as_waker(&self) -> task::Waker { + task::Waker::from_raw(task::RawWaker::new( + self as *const _ as *const (), + &Self::VTABLE, + )) + } + + unsafe fn clone(this: *const ()) -> task::RawWaker { + // return the currently active process as a raw waker + (*(this as *const Self)).context.active().raw_waker() + } + + unsafe fn wake(_this: *const ()) { + // waking the active process can safely be ignored + } + + unsafe fn wake_by_ref(_this: *const ()) { + // waking the active process can safely be ignored + } + + unsafe fn drop(_this: *const ()) { + // memory is released in the main event loop + } +} + +/* *************************** specialized futures ************************** */ + +/// Returns a future that unconditionally puts the calling process to sleep. +#[inline] +pub fn sleep() -> impl Future<Output = ()> { + let mut ready = false; + poll_fn(move |_cx| { + if ready { + Poll::Ready(()) + } else { + ready = true; + Poll::Pending + } + }) +} + +/// Returns a future that can be awaited to produce the currently set waker. +#[inline] +pub fn waker() -> impl Future<Output = task::Waker> { + poll_fn(|cx| Poll::Ready(cx.waker().clone())) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::panic::{catch_unwind, AssertUnwindSafe}; + + /// Test that waking a leaked waker does not cause UB. + #[test] + #[should_panic = "attempted to wake a terminated process"] + fn leak_waker_simple() { + let shared = RefCell::new(None); + let shared = simulation(shared, |sim| { + Process::new(sim, async move { + poll_fn(|cx| { + *sim.global().borrow_mut() = Some(cx.waker().clone()); + Poll::Ready(()) + }) + .await; + }) + }); + shared.take().unwrap().wake(); + } + + /// Test that all processes are dropped even if a destructor of a process unwinds. + /// This is required to properly invalidate all wakers that may be leaked. + #[test] + #[should_panic = "attempted to wake a terminated process"] + fn leak_waker_unwind() { + struct PanicOnDrop; + impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("PanicOnDrop"); + } + } + + struct PushDropOrderOnDrop<'s> { + me: i32, + drop_order: &'s RefCell<Vec<i32>>, + } + impl Drop for PushDropOrderOnDrop<'_> { + fn drop(&mut self) { + self.drop_order.borrow_mut().push(self.me); + } + } + + #[derive(Default)] + struct Shared { + channel: RefCell<Option<task::Waker>>, + drop_order: RefCell<Vec<i32>>, + } + + let shared = Shared::default(); + catch_unwind(AssertUnwindSafe(|| { + simulation(&shared, |sim| { + Process::new(sim, async move { + // This process is dropped 2nd and will begin the unwind. + sim.activate(async move { + let _guard = PushDropOrderOnDrop { + me: 2, + drop_order: &sim.global().drop_order, + }; + let _guard = PanicOnDrop; + sim.advance(2.0).await; + unreachable!(); + }); + + // This process is dropped 3rd, after the unwind started. + // We test that this process cannot be woken from a leaked waker. + sim.activate(async move { + let _guard = PushDropOrderOnDrop { + me: 3, + drop_order: &sim.global().drop_order, + }; + poll_fn(|cx| { + *sim.global().channel.borrow_mut() = Some(cx.waker().clone()); + Poll::Ready(()) + }) + .await; + sim.advance(2.0).await; + unreachable!(); + }); + + let _guard: PushDropOrderOnDrop = PushDropOrderOnDrop { + me: 1, + drop_order: &sim.global().drop_order, + }; + sim.advance(1.0).await; + // Finish the main process here, this will drop the other processes. + }) + }); + })) + .unwrap_err(); + + assert_eq!(*shared.drop_order.borrow(), [1, 2, 3]); + + shared.channel.take().unwrap().wake(); + } +} diff --git a/src/main.rs b/src/main.rs index f35926e..07915ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,24 @@ - use simcore_rs::*; struct MyStruct<'s>(Sim<'s>); impl Drop for MyStruct<'_> { - fn drop(&mut self) { - eprintln!("Drop"); - self.0.activate(async move { - eprintln!("Start"); - }); - } + fn drop(&mut self) { + eprintln!("Drop"); + self.0.activate(async move { + eprintln!("Start"); + }); + } } fn main() { - simulation((), |sim| Process::new(sim, async move { - sim.activate(async move { - let _s = MyStruct(sim); - sim.advance(2.0).await; - }); - sim.advance(1.0).await; - })); + simulation((), |sim| { + Process::new(sim, async move { + sim.activate(async move { + let _s = MyStruct(sim); + sim.advance(2.0).await; + }); + sim.advance(1.0).await; + }) + }); } diff --git a/src/util.rs b/src/util.rs index bb5b829..0f0cde3 100644 --- a/src/util.rs +++ b/src/util.rs @@ -39,79 +39,89 @@ use crate::{sleep, waker, Process, Sim}; use std::{ - cell::{Cell, RefCell}, - collections::VecDeque, - fmt::{Display, Formatter}, - future::Future, - pin::Pin, - rc::{Rc, Weak}, - task::{self, Context, Poll} + cell::{Cell, RefCell}, + collections::VecDeque, + fmt::{Display, Formatter}, + future::Future, + pin::Pin, + rc::{Rc, Weak}, + task::{self, Context, Poll}, }; +/// 😿 +/// +/// This is fundamentally broken and needs to be replaced. +fn deep_will_wake(a: &task::Waker, b: &task::Waker) -> bool { + a.data() == b.data() && a.vtable() == b.vtable() +} + /// GPSS-inspired facility that houses one exclusive resource to be used by /// arbitrary many processes. #[derive(Default)] pub struct Facility { - /// A queue of waiting-to-be-activated processes. - queue: RefCell<VecDeque<task::Waker>>, - /// A boolean indicating whether the facility is in use or not. - in_use: Cell<bool> + /// A queue of waiting-to-be-activated processes. + queue: RefCell<VecDeque<task::Waker>>, + /// A boolean indicating whether the facility is in use or not. + in_use: Cell<bool>, } impl Facility { - /// Creates a new facility. - pub fn new() -> Self { - Facility { - queue: RefCell::new(VecDeque::new()), - in_use: Cell::new(false) - } - } - - /// Attempts to seize the facility, blocking until it's possible. - pub async fn seize(&self) { - if self.in_use.replace(true) { - let waker = waker().await; - self.queue.borrow_mut().push_back(waker); - sleep().await; - } - } - - /// Releases the facility and activates the next waiting process. - pub fn release(&self) { - if let Some(process) = self.queue.borrow_mut().pop_front() { - process.wake(); - } else { - self.in_use.set(false); - } - } + /// Creates a new facility. + pub fn new() -> Self { + Facility { + queue: RefCell::new(VecDeque::new()), + in_use: Cell::new(false), + } + } + + /// Attempts to seize the facility, blocking until it's possible. + pub async fn seize(&self) { + if self.in_use.replace(true) { + let waker = waker().await; + self.queue.borrow_mut().push_back(waker); + sleep().await; + } + } + + /// Releases the facility and activates the next waiting process. + pub fn release(&self) { + if let Some(process) = self.queue.borrow_mut().pop_front() { + process.wake(); + } else { + self.in_use.set(false); + } + } } /// A one-shot, writable container type that awakens a pre-registered process /// on write. pub struct Promise<T> { - /// The waker of the process to awaken on write. - caller: task::Waker, - /// The written value to be extracted by the caller. - result: Cell<Option<T>> + /// The waker of the process to awaken on write. + caller: task::Waker, + /// The written value to be extracted by the caller. + result: Cell<Option<T>>, } impl<T> Promise<T> { - /// Creates a new promise with an unwritten result. - pub fn new(waker: task::Waker) -> Self { - Self { caller: waker, result: Cell::new(None) } - } - - /// Writes the result value and reawakens the caller. - pub fn fulfill(&self, result: T) { - if self.result.replace(Some(result)).is_none() { - self.caller.wake_by_ref(); - } - } - - /// Extracts the written value and resets the state of the promise. - pub fn redeem(&self) -> Option<T> { - self.result.replace(None) - } + /// Creates a new promise with an unwritten result. + pub fn new(waker: task::Waker) -> Self { + Self { + caller: waker, + result: Cell::new(None), + } + } + + /// Writes the result value and reawakens the caller. + pub fn fulfill(&self, result: T) { + if self.result.replace(Some(result)).is_none() { + self.caller.wake_by_ref(); + } + } + + /// Extracts the written value and resets the state of the promise. + pub fn redeem(&self) -> Option<T> { + self.result.replace(None) + } } /// Returns a future that takes two other futures and completes as soon as @@ -121,45 +131,57 @@ impl<T> Promise<T> { /// function cannot outlive its returned future, enabling us to allow /// references to variables in the local scope. The passed futures may /// compute a value, as long as the return type is identical in both cases. -pub async fn select<'s,'u,G,E,O,R>(sim: Sim<'s,G>, either: E, or: O) -> R -where E: Future<Output = R> + 'u, O: Future<Output = R> + 'u, R: 'u { - use std::mem::transmute; - - // create a one-shot channel that reactivates the caller on write - let promise = &Promise::new(sim.active().waker()); - - // this unsafe block shortens the guaranteed lifetimes of the processes - // contained in the scheduler; it is safe because the constructed future - // ensures that both futures are terminated before the promise and - // itself are terminated, thereby preventing references to the lesser - // constrained processes to survive the call to select() - let sim = unsafe { - transmute::<Sim<'s,G>, Sim<'_,G>>(sim) - }; - - // create the two competing processes - // a future optimization would be to keep them on the stack - let p1 = Process::new(sim, async move { - promise.fulfill(either.await); - }); - let p2 = Process::new(sim, async move { - promise.fulfill(or.await); - }); - - // activate them - sim.reactivate(p1.clone()); - sim.reactivate(p2.clone()); - - // wait for reactivation; the promise will wake us on fulfillment - sleep().await; - - // terminate both processes - // (this is redundant for one of them but doesn't hurt either) - p1.terminate(); - p2.terminate(); - - // extract the promised value - promise.redeem().unwrap() +pub async fn select<'s, 'u, G, E, O, R>(sim: Sim<'s, G>, either: E, or: O) -> R +where + E: Future<Output = R> + 'u, + O: Future<Output = R> + 'u, + R: 'u, +{ + use std::mem::transmute; + + // create a one-shot channel that reactivates the caller on write + let promise = &Promise::new(sim.active().waker()); + + // this unsafe block shortens the guaranteed lifetimes of the processes + // contained in the scheduler; it is safe because the constructed future + // ensures that both futures are terminated before the promise and + // itself are terminated, thereby preventing references to the lesser + // constrained processes to survive the call to select() + let sim = unsafe { transmute::<Sim<'s, G>, Sim<'_, G>>(sim) }; + + { + // create the two competing processes + // a future optimization would be to keep them on the stack + let p1 = Process::new(sim, async move { + promise.fulfill(either.await); + }); + let p2 = Process::new(sim, async move { + promise.fulfill(or.await); + }); + + struct TerminateOnDrop<'s, G>(Process<'s, G>); + impl<G> Drop for TerminateOnDrop<'_, G> { + fn drop(&mut self) { + self.0.terminate(); + } + } + + // `p1` and `p2` borrow from `promise` and must therefore be dropped before `promise` + // is dropped. In particular, we must ensure that the future returned by the outer + // `async fn` outlives the futures crated by the `async` blocks above. + let _g1 = TerminateOnDrop(p1.clone()); + let _g2 = TerminateOnDrop(p2.clone()); + + // activate them + sim.reactivate(p1); + sim.reactivate(p2); + + // wait for reactivation; the promise will wake us on fulfillment + sleep().await; + } + + // extract the promised value + promise.redeem().unwrap() } /// Complex channel with space for infinitely many elements of arbitrary type. @@ -167,19 +189,19 @@ where E: Future<Output = R> + 'u, O: Future<Output = R> + 'u, R: 'u { /// This channel supports arbitrary many readers and writers and uses wakers /// to reactivate suspended processes. struct Channel<T> { - /// A queue of messages. - store: VecDeque<T>, - /// A queue of processes waiting to receive a message. - waiting: VecDeque<task::Waker> + /// A queue of messages. + store: VecDeque<T>, + /// A queue of processes waiting to receive a message. + waiting: VecDeque<task::Waker>, } /// Creates a channel and returns a pair of read and write ends. pub fn channel<T>() -> (Sender<T>, Receiver<T>) { - let channel = Rc::new(RefCell::new(Channel { - store: VecDeque::new(), - waiting: VecDeque::new() - })); - (Sender(Rc::downgrade(&channel)), Receiver(channel)) + let channel = Rc::new(RefCell::new(Channel { + store: VecDeque::new(), + waiting: VecDeque::new(), + })); + (Sender(Rc::downgrade(&channel)), Receiver(channel)) } /// Write-end of a channel. @@ -190,151 +212,153 @@ pub struct Receiver<T>(Rc<RefCell<Channel<T>>>); // Allow senders to be duplicated. impl<T> Clone for Sender<T> { - fn clone(&self) -> Self { - Self(self.0.clone()) - } + fn clone(&self) -> Self { + Self(self.0.clone()) + } } // Allow receivers to be duplicated. impl<T> Clone for Receiver<T> { - fn clone(&self) -> Self { - Self(self.0.clone()) - } + fn clone(&self) -> Self { + Self(self.0.clone()) + } } impl<T: Unpin> Sender<T> { - /// Returns a future that can be awaited to send a message. - pub fn send(&self, elem: T) -> SendFuture<'_,T> { - SendFuture(&self.0, Some(elem)) - } + /// Returns a future that can be awaited to send a message. + pub fn send(&self, elem: T) -> SendFuture<'_, T> { + SendFuture(&self.0, Some(elem)) + } } impl<T> Drop for Sender<T> { - #[inline] - fn drop(&mut self) { - // check if there are still receivers - if let Some(chan) = self.0.upgrade() { - // check if we're the last sender to drop - if Rc::weak_count(&chan) == 1 { - // awake all the waiting receivers so that they get to return - for process in chan.borrow_mut().waiting.drain(..) { - process.wake(); - } - } - } - } + #[inline] + fn drop(&mut self) { + // check if there are still receivers + if let Some(chan) = self.0.upgrade() { + // check if we're the last sender to drop + if Rc::weak_count(&chan) == 1 { + // awake all the waiting receivers so that they get to return + for process in chan.borrow_mut().waiting.drain(..) { + process.wake(); + } + } + } + } } impl<T: Unpin> Receiver<T> { - /// Returns a future that can be awaited to receive a message. - pub fn recv(&self) -> ReceiveFuture<'_,T> { - ReceiveFuture(&self.0, None) - } - - /// Returns the number of elements that can be received before model time - /// has to be consumed. - pub fn len(&self) -> usize { - self.0.borrow().len() - } + /// Returns a future that can be awaited to receive a message. + pub fn recv(&self) -> ReceiveFuture<'_, T> { + ReceiveFuture(&self.0, None) + } + + /// Returns the number of elements that can be received before model time + /// has to be consumed. + pub fn len(&self) -> usize { + self.0.borrow().len() + } } impl<T> Channel<T> { - /// Private method returning the number of elements left in the channel. - fn len(&self) -> usize { self.store.len() } - - /// Private method enqueuing a process into the waiting list. - fn enqueue(&mut self, process: task::Waker) { - self.waiting.push_back(process); - } - - /// Private method removing a process from the waiting list. - fn dequeue(&mut self) -> Option<task::Waker> { - self.waiting.pop_front() - } - - /// Private method that unregisters a previously registered waker. - fn unregister(&mut self, waker: task::Waker) { - self.waiting.retain(|elem| !waker.will_wake(elem)); - } - - /// Private method inserting a message into the queue. - fn send(&mut self, value: T) { - self.store.push_back(value); - } - - /// Private method extracting a message from the queue non-blocking. - fn recv(&mut self) -> Option<T> { - self.store.pop_front() - } + /// Private method returning the number of elements left in the channel. + fn len(&self) -> usize { + self.store.len() + } + + /// Private method enqueuing a process into the waiting list. + fn enqueue(&mut self, process: task::Waker) { + self.waiting.push_back(process); + } + + /// Private method removing a process from the waiting list. + fn dequeue(&mut self) -> Option<task::Waker> { + self.waiting.pop_front() + } + + /// Private method that unregisters a previously registered waker. + fn unregister(&mut self, waker: task::Waker) { + self.waiting.retain(|elem| !deep_will_wake(&waker, elem)); + } + + /// Private method inserting a message into the queue. + fn send(&mut self, value: T) { + self.store.push_back(value); + } + + /// Private method extracting a message from the queue non-blocking. + fn recv(&mut self) -> Option<T> { + self.store.pop_front() + } } /// Future for the [`send()`] operation on a channel sender. /// /// [`send()`]: struct.Sender.html#method.send -pub struct SendFuture<'c,T>(&'c Weak<RefCell<Channel<T>>>, Option<T>); +pub struct SendFuture<'c, T>(&'c Weak<RefCell<Channel<T>>>, Option<T>); /// Future for the [`recv()`] operation on a channel receiver. /// /// [`recv()`]: struct.Receiver.html#method.recv -pub struct ReceiveFuture<'c,T>(&'c Rc<RefCell<Channel<T>>>, Option<task::Waker>); - -impl<T: Unpin> Future for SendFuture<'_,T> { - type Output = Result<(),T>; - - fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { - let elem = self.1.take().unwrap(); - - // test if there are still readers left to listen - if let Some(channel) = self.0.upgrade() { - let mut channel = channel.borrow_mut(); - channel.send(elem); - - // awake a waiting process - if let Some(process) = channel.dequeue() { - process.wake(); - } - - Poll::Ready(Ok(())) - } else { - Poll::Ready(Err(elem)) - } - } -} - -impl<T: Unpin> Future for ReceiveFuture<'_,T> { - type Output = Option<T>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut channel = self.0.borrow_mut(); - - // clear the local copy of our waker to prevent unnecessary - // de-registration attempts in our destructor - self.1 = None; - - if let Some(c) = channel.recv() { - // there are elements in the channel - Poll::Ready(Some(c)) - } else if Rc::weak_count(self.0) == 0 { - // no elements in the channel and no potential senders exist - Poll::Ready(None) - } else { - // no elements, but potential senders exist: check back later - let waker = cx.waker().clone(); - self.1 = Some(waker.clone()); - channel.enqueue(waker); - Poll::Pending - } - } -} - -impl<T> Drop for ReceiveFuture<'_,T> { - #[inline] - fn drop(&mut self) { - // take care to unregister our waker from the channel - if let Some(waker) = self.1.take() { - self.0.borrow_mut().unregister(waker); - } - } +pub struct ReceiveFuture<'c, T>(&'c Rc<RefCell<Channel<T>>>, Option<task::Waker>); + +impl<T: Unpin> Future for SendFuture<'_, T> { + type Output = Result<(), T>; + + fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { + let elem = self.1.take().unwrap(); + + // test if there are still readers left to listen + if let Some(channel) = self.0.upgrade() { + let mut channel = channel.borrow_mut(); + channel.send(elem); + + // awake a waiting process + if let Some(process) = channel.dequeue() { + process.wake(); + } + + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(elem)) + } + } +} + +impl<T: Unpin> Future for ReceiveFuture<'_, T> { + type Output = Option<T>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut channel = self.0.borrow_mut(); + + // clear the local copy of our waker to prevent unnecessary + // de-registration attempts in our destructor + self.1 = None; + + if let Some(c) = channel.recv() { + // there are elements in the channel + Poll::Ready(Some(c)) + } else if Rc::weak_count(self.0) == 0 { + // no elements in the channel and no potential senders exist + Poll::Ready(None) + } else { + // no elements, but potential senders exist: check back later + let waker = cx.waker().clone(); + self.1 = Some(waker.clone()); + channel.enqueue(waker); + Poll::Pending + } + } +} + +impl<T> Drop for ReceiveFuture<'_, T> { + #[inline] + fn drop(&mut self) { + // take care to unregister our waker from the channel + if let Some(waker) = self.1.take() { + self.0.borrow_mut().unregister(waker); + } + } } /// A trait signifying that the type implementing it is able to broadcast state @@ -347,75 +371,79 @@ impl<T> Drop for ReceiveFuture<'_,T> { /// [`until()`]: fn.until.html /// [`Control`]: struct.Control.html pub trait Controlled { - /// Allows a [waker] to subscribe to the controlled expression to be - /// informed about any and all state changes. - /// - /// # Safety - /// This method is unsafe, because the controlled expression will call the - /// [`wake_by_ref()`] method on the waker during every change in state. The caller - /// is responsible to ensure the validity of all subscribed wakers at - /// the time of the state change. - /// - /// The [`span()`] method provides a safe alternative. - /// - /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html - /// [`span()`]: trait.Controlled.html#method.span - /// [`wake_by_ref()`]: https://doc.rust-lang.org/std/task/struct.Waker.html#method.wake_by_ref - unsafe fn subscribe(&self, waker: &task::Waker); - - /// Unsubscribes a previously subscribed [waker] from the controlled - /// expression. - /// - /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html - unsafe fn unsubscribe(&self, waker: &task::Waker); + /// Allows a [waker] to subscribe to the controlled expression to be + /// informed about any and all state changes. + /// + /// # Safety + /// This method is unsafe, because the controlled expression will call the + /// [`wake_by_ref()`] method on the waker during every change in state. The caller + /// is responsible to ensure the validity of all subscribed wakers at + /// the time of the state change. + /// + /// The [`span()`] method provides a safe alternative. + /// + /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + /// [`span()`]: trait.Controlled.html#method.span + /// [`wake_by_ref()`]: https://doc.rust-lang.org/std/task/struct.Waker.html#method.wake_by_ref + unsafe fn subscribe(&self, waker: &task::Waker); + + /// Unsubscribes a previously subscribed [waker] from the controlled + /// expression. + /// + /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + unsafe fn unsubscribe(&self, waker: &task::Waker); } /// Guarding structure that ensures that waker and controlled expression are /// valid and fixed in space. pub struct WakerSpan<'s, C: ?Sized + Controlled> { - /// The controlled expression. - cv: &'s C, - /// The waker. - waker: &'s task::Waker + /// The controlled expression. + cv: &'s C, + /// The waker. + waker: &'s task::Waker, } // implement a constructor for the guard -impl<'s, C: ?Sized + Controlled> WakerSpan<'s,C> { - /// Subscribes a [waker] to a controlled expression for a certain duration. - /// - /// This method subscribes the waker to the controlled expression and - /// returns an object that unsubscribes the waker from the same controlled - /// expression upon drop. The borrow-checker secures the correct lifetimes - /// of waker and controlled expression so that this method can be safe. - /// - /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html - #[inline] - pub fn new(cv: &'s C, waker: &'s task::Waker) -> Self { - // this is safe because we bind the lifetime of the waker to the guard - unsafe { cv.subscribe(waker); } - WakerSpan { cv, waker } - } +impl<'s, C: ?Sized + Controlled> WakerSpan<'s, C> { + /// Subscribes a [waker] to a controlled expression for a certain duration. + /// + /// This method subscribes the waker to the controlled expression and + /// returns an object that unsubscribes the waker from the same controlled + /// expression upon drop. The borrow-checker secures the correct lifetimes + /// of waker and controlled expression so that this method can be safe. + /// + /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + #[inline] + pub fn new(cv: &'s C, waker: &'s task::Waker) -> Self { + // this is safe because we bind the lifetime of the waker to the guard + unsafe { + cv.subscribe(waker); + } + WakerSpan { cv, waker } + } } // implement drop for the guard impl<C: ?Sized + Controlled> Drop for WakerSpan<'_, C> { - #[inline] - fn drop(&mut self) { - unsafe { self.cv.unsubscribe(self.waker); } - } + #[inline] + fn drop(&mut self) { + unsafe { + self.cv.unsubscribe(self.waker); + } + } } // a pointer to a controlled expression is also a controlled expression impl<T: Controlled> Controlled for &'_ T { - #[inline] - unsafe fn subscribe(&self, waker: &task::Waker) { - Controlled::subscribe(*self, waker); - } - - #[inline] - unsafe fn unsubscribe(&self, waker: &task::Waker) { - Controlled::unsubscribe(*self, waker); - } + #[inline] + unsafe fn subscribe(&self, waker: &task::Waker) { + Controlled::subscribe(*self, waker); + } + + #[inline] + unsafe fn unsubscribe(&self, waker: &task::Waker) { + Controlled::unsubscribe(*self, waker); + } } /// Marks a variable as a state variable which can be used in conjunction with @@ -427,80 +455,82 @@ impl<T: Controlled> Controlled for &'_ T { /// /// [`until()`]: fn.until.html pub struct Control<T> { - /// The inner value. - value: Cell<T>, - /// A list of waiting processes, identified by their [wakers]. - /// - /// [wakers]: https://doc.rust-lang.org/std/task/struct.Waker.html - waiting: RefCell<Vec<task::Waker>> + /// The inner value. + value: Cell<T>, + /// A list of waiting processes, identified by their [wakers]. + /// + /// [wakers]: https://doc.rust-lang.org/std/task/struct.Waker.html + waiting: RefCell<Vec<task::Waker>>, } impl<T> Control<T> { - /// Creates a new control variable and initializes its value. - #[inline] - pub const fn new(value: T) -> Self { - Self { - value: Cell::new(value), - waiting: RefCell::new(Vec::new()) - } - } - - /// Assigns a new value to the control variable. - /// - /// This can lead to potential activations of waiting processes. - #[inline] - pub fn set(&self, val: T) { - self.notify(); - self.value.set(val); - } - - /// Extracts the current value from the control variable. - #[inline] - pub fn get(&self) -> T where T: Copy { - self.value.get() - } - - /// Notifies all the waiting processes to re-check their state condition. - /// - /// This action is usually performed automatically when the value of the - /// control variable is changed through the [`set()`]-method but may be - /// triggered manually when this mechanism is bypassed somehow. - /// - /// [`set()`]: #method.set - pub fn notify(&self) { - for waker in self.waiting.borrow().iter() { - waker.wake_by_ref(); - } - } + /// Creates a new control variable and initializes its value. + #[inline] + pub const fn new(value: T) -> Self { + Self { + value: Cell::new(value), + waiting: RefCell::new(Vec::new()), + } + } + + /// Assigns a new value to the control variable. + /// + /// This can lead to potential activations of waiting processes. + #[inline] + pub fn set(&self, val: T) { + self.notify(); + self.value.set(val); + } + + /// Extracts the current value from the control variable. + #[inline] + pub fn get(&self) -> T + where + T: Copy, + { + self.value.get() + } + + /// Notifies all the waiting processes to re-check their state condition. + /// + /// This action is usually performed automatically when the value of the + /// control variable is changed through the [`set()`]-method but may be + /// triggered manually when this mechanism is bypassed somehow. + /// + /// [`set()`]: #method.set + pub fn notify(&self) { + for waker in self.waiting.borrow().iter() { + waker.wake_by_ref(); + } + } } // implement the trait marking control variables as controlled expressions impl<T: Copy> Controlled for Control<T> { - #[inline] - unsafe fn subscribe(&self, waker: &task::Waker) { - self.waiting.borrow_mut() - .push(waker.clone()); - } - - unsafe fn unsubscribe(&self, waker: &task::Waker) { - let mut waiting = self.waiting.borrow_mut(); - let pos = waiting.iter().position(|w| w.will_wake(waker)); - - if let Some(pos) = pos { - waiting.remove(pos); - } else { - panic!("attempt to unsubscribe waker that isn't subscribed to"); - } - } + #[inline] + unsafe fn subscribe(&self, waker: &task::Waker) { + self.waiting.borrow_mut().push(waker.clone()); + } + + unsafe fn unsubscribe(&self, waker: &task::Waker) { + let mut waiting = self.waiting.borrow_mut(); + let pos = waiting.iter().position(|w| deep_will_wake(w, waker)); + + if let Some(pos) = pos { + waiting.remove(pos); + } else { + panic!("attempt to unsubscribe waker that isn't subscribed to"); + } + } } impl<T: Default> Default for Control<T> { - fn default() -> Self { - Self { - value: Cell::new(T::default()), - waiting: RefCell::new(Vec::new()) - } - } + fn default() -> Self { + Self { + value: Cell::new(T::default()), + waiting: RefCell::new(Vec::new()), + } + } } /// An internal macro to generate implementations of the [`Controlled`] trait @@ -514,26 +544,26 @@ macro_rules! controlled_tuple_impl { unsafe fn subscribe(&self, _waker: &task::Waker) { $(self.$ID.subscribe(_waker);)* } - + unsafe fn unsubscribe(&self, _waker: &task::Waker) { $(self.$ID.unsubscribe(_waker);)* } } }; - + ($($T:ident -> $ID:tt),* . $HEAD:ident -> $HID:tt $(, $TAIL:ident -> $TID:tt)*) => { controlled_tuple_impl!($($T -> $ID),* .); controlled_tuple_impl!($($T -> $ID,)* $HEAD -> $HID . $($TAIL -> $TID),*); }; - + ($HEAD:ident -> $HID:tt $(, $TAIL:ident -> $TID:tt)* $(,)?) => { controlled_tuple_impl!($HEAD -> $HID . $($TAIL -> $TID),*); }; } controlled_tuple_impl! { - A -> 0, B -> 1, C -> 2, D -> 3, E -> 4, F -> 5, G -> 6, H -> 7, - I -> 8, J -> 9, K ->10, L ->11, M ->12, N ->13, O ->14, P ->15, + A -> 0, B -> 1, C -> 2, D -> 3, E -> 4, F -> 5, G -> 6, H -> 7, + I -> 8, J -> 9, K ->10, L ->11, M ->12, N ->13, O ->14, P ->15, } /// Returns a future that suspends until an arbitrary boolean condition @@ -542,17 +572,19 @@ controlled_tuple_impl! { /// [`Controlled`]: trait.Controlled.html #[inline] pub async fn until<C: Controlled>(cntl: C, cond: impl Fn(&C) -> bool) { - if !cond(&cntl) { - let waker = waker().await; - let span = WakerSpan::new(&cntl, &waker); - - loop { - sleep().await; - if cond(&cntl) { break; } - } - - drop(span); - } + if !cond(&cntl) { + let waker = waker().await; + let span = WakerSpan::new(&cntl, &waker); + + loop { + sleep().await; + if cond(&cntl) { + break; + } + } + + drop(span); + } } /* ************************* statistical facilities ************************* */ @@ -560,82 +592,85 @@ pub async fn until<C: Controlled>(cntl: C, cond: impl Fn(&C) -> bool) { /// A simple collector for statistical data, inspired by SLX's random_variable. #[derive(Clone)] pub struct RandomVar { - total: Cell<u32>, sum: Cell<f64>, sqr: Cell<f64>, - min: Cell<f64>, max: Cell<f64> + total: Cell<u32>, + sum: Cell<f64>, + sqr: Cell<f64>, + min: Cell<f64>, + max: Cell<f64>, } impl RandomVar { - /// Creates a new random variable. - #[inline] - pub fn new() -> Self { - RandomVar::default() - } - - /// Resets all stored statistical data. - pub fn clear(&self) { - self.total.set(0); - self.sum.set(0.0); - self.sqr.set(0.0); - self.min.set(f64::INFINITY); - self.max.set(f64::NEG_INFINITY); - } - - /// Adds another value to the statistical collection. - pub fn tabulate<T: Into<f64>>(&self, val: T) { - let val: f64 = val.into(); - - self.total.set(self.total.get() + 1); - self.sum.set(self.sum.get() + val); - self.sqr.set(self.sqr.get() + val*val); - - if self.min.get() > val { - self.min.set(val); - } - if self.max.get() < val { - self.max.set(val); - } - } - - /// Combines the statistical collection of two random variables into one. - pub fn merge(&self, other: &Self) { - self.total.set(self.total.get() + other.total.get()); - self.sum.set(self.sum.get() + other.sum.get()); - self.sqr.set(self.sqr.get() + other.sqr.get()); - - if self.min.get() > other.min.get() { - self.min.set(other.min.get()); - } - if self.max.get() < other.max.get() { - self.max.set(other.max.get()); - } - } + /// Creates a new random variable. + #[inline] + pub fn new() -> Self { + RandomVar::default() + } + + /// Resets all stored statistical data. + pub fn clear(&self) { + self.total.set(0); + self.sum.set(0.0); + self.sqr.set(0.0); + self.min.set(f64::INFINITY); + self.max.set(f64::NEG_INFINITY); + } + + /// Adds another value to the statistical collection. + pub fn tabulate<T: Into<f64>>(&self, val: T) { + let val: f64 = val.into(); + + self.total.set(self.total.get() + 1); + self.sum.set(self.sum.get() + val); + self.sqr.set(self.sqr.get() + val * val); + + if self.min.get() > val { + self.min.set(val); + } + if self.max.get() < val { + self.max.set(val); + } + } + + /// Combines the statistical collection of two random variables into one. + pub fn merge(&self, other: &Self) { + self.total.set(self.total.get() + other.total.get()); + self.sum.set(self.sum.get() + other.sum.get()); + self.sqr.set(self.sqr.get() + other.sqr.get()); + + if self.min.get() > other.min.get() { + self.min.set(other.min.get()); + } + if self.max.get() < other.max.get() { + self.max.set(other.max.get()); + } + } } impl Default for RandomVar { - fn default() -> Self { - RandomVar { - total: Cell::default(), - sum: Cell::default(), - sqr: Cell::default(), - min: Cell::new(f64::INFINITY), - max: Cell::new(f64::NEG_INFINITY) - } - } + fn default() -> Self { + RandomVar { + total: Cell::default(), + sum: Cell::default(), + sqr: Cell::default(), + min: Cell::new(f64::INFINITY), + max: Cell::new(f64::NEG_INFINITY), + } + } } impl Display for RandomVar { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let total = self.total.get(); - let mean = self.sum.get() / f64::from(total); - let variance = self.sqr.get() / f64::from(total) - mean*mean; - let std_dev = variance.sqrt(); - - f.debug_struct("RandomVar") - .field("total", &total) - .field("mean", &mean) - .field("std_dev", &std_dev) - .field("min", &self.min.get()) - .field("max", &self.max.get()) - .finish() - } + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let total = self.total.get(); + let mean = self.sum.get() / f64::from(total); + let variance = self.sqr.get() / f64::from(total) - mean * mean; + let std_dev = variance.sqrt(); + + f.debug_struct("RandomVar") + .field("total", &total) + .field("mean", &mean) + .field("std_dev", &std_dev) + .field("min", &self.min.get()) + .field("max", &self.max.get()) + .finish() + } } -- GitLab