Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • weber/simcore-rs
  • markeffl/simcore-rs
2 results
Show changes
Commits on Source (39)
/target /target
/.idea/ /.idea/
/uml/ /uml/
/odemx-lite/related documents/
This diff is collapsed.
[package] [package]
name = "simcore-rs" name = "simcore-rs"
version = "0.1.0" version = "0.1.0"
authors = ["Dorian Weber <weber@informatik.hu-berlin.de>"] authors = [
edition = "2018" "Dorian Weber <weber@informatik.hu-berlin.de>",
]
edition = "2024"
[dev-dependencies]
rand = { version = "0.9", default-features = false, features = ["small_rng"] }
rand_distr = "0.5"
rayon = "1.10"
[profile.release] [profile.release]
lto = true lto = "thin"
debug = "full"
opt-level = 3 opt-level = 3
[dependencies] [profile.bench]
rand = { version = "0.7", default-features = false, features = ["small_rng"] } lto = true
debug = false
[dev-dependencies]
criterion = "0.3.3"
[[bench]]
name = "barbershop"
harness = false
Copyright 2024 Dorian Weber
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the “Software”), to deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
# A Minimalistic Simulation Framework in Rust
## Introduction
This Rust project provides a minimally-viable library for creating and running simulations using asynchronous processes. By leveraging Rust's concurrency features, it allows you to simulate various scenarios, making it a valuable tool for studying the mechanics of simulation libraries in projects involving complex, process-based models.
This simulation core is not meant to be used in production code, see [odem-rs] for an earnest attempt.
## Features
The project includes:
- **Core Library (`lib.rs`)**: Defines reusable components for building simulators.
- **Utility Modules (`util.rs`)**: Provides support for common simulation tasks.
- **Example Models**: Demonstrates different aspects of the framework through three examples:
- `barbershop`
- `ferry`
- `philosophers`
- **Decompressor-Tokenizer (`bin/coroutines.rs`)**: An educational example with an executor, a corresponding spawner, and a channel that shows the mechanism without using unsafe code.
- **Decompressor-Tokenizer (`c/coroutines.c`)**: The same example implemented manually in C to illustrate the transformation performed by the Rust compiler on `async`/`await`.
## Getting Started
### Prerequisites
- **Rust Compiler**: Latest stable version is recommended. [Install Rust][Rust-compiler]
- no other dependencies
### Building the Project
To build the project, run:
```sh
cargo build
```
### Running the Examples
The examples are located in the `examples` directory and demonstrate different types of simulations:
- **Barbershop**: A discrete event simulation of a barbershop where customers arrive randomly, wait if the barber is busy, receive service, and depart. It tracks customer wait times and provides statistical analysis over the simulated period. This is a minimalistic model containing processes and time- and state-based synchronization.
- **Ferry**: Simulates a car-ferry system where cars arrive at multiple harbors and wait to be transported by ferries to other harbors. It tracks statistics like car wait times, ferry cargo lengths, and ferry load times. This example demonstrates complex synchronization, featuring channels and speculative execution.
- **Philosophers**: Simulates the classic Dining Philosophers problem, where philosophers alternate between thinking and eating, requiring coordination to avoid deadlocks. It tracks the time until a deadlock occurs and collects statistical data over multiple simulation runs. This example highlights parallel execution of multiple simulations and concurrent resource access.
To run an example, use:
```sh
cargo run --example <example_name>
```
For example, to run the barbershop simulation:
```sh
cargo run --example barbershop
```
## License
This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
## Acknowledgments
Special thanks to the contributors of the Rust community for creating powerful tools that make projects like this
possible. Extra-special thanks to Lukas Markeffsky for helping me to refine this library both through fruitful
discussions and by resolving its soundness problems through a refactoring of the code.
[Rust-compiler]: https://rust.sh
[odem-rs]: https://crates.io/crates/odem-rs
use simcore_rs::{Time, SimContext, Facility, simulation};
use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng};
use criterion::{Criterion, BenchmarkId, criterion_group, criterion_main, BatchSize};
use std::time::Duration;
const SEED_A : u64 = 100000;
const SEED_S : u64 = 200000;
const RANGE : u32 = 8;
const STEP : Time = 100000.0;
fn barbershop(c: &mut Criterion) {
let mut group = c.benchmark_group("Barbershop");
let mut rng_a = SmallRng::seed_from_u64(SEED_A);
let mut rng_s = SmallRng::seed_from_u64(SEED_S);
group.measurement_time(Duration::from_secs_f64(30.0));
group.confidence_level(0.99);
for stop_time in (1..=RANGE).map(|c| Time::from(c)*STEP) {
group.bench_with_input(
BenchmarkId::from_parameter(stop_time),
&stop_time,
|b, &stop_time|
b.iter_batched(
|| {
(SmallRng::from_rng(&mut rng_a).unwrap(),
SmallRng::from_rng(&mut rng_s).unwrap())
},
|(rng_a, rng_s)| {
let shop = BarberShop {
stop_time, rng_a, rng_s, joe: &Facility::new()
};
simulation(|sim| shop.actions(sim));
},
BatchSize::SmallInput
)
);
}
group.finish();
}
criterion_group!(benches, barbershop);
criterion_main!(benches);
/* *************************** Barbershop Example *************************** */
/// Barbershop process.
struct BarberShop<'j> {
stop_time: Time, rng_a: SmallRng, rng_s: SmallRng, joe: &'j Facility
}
impl<'j> BarberShop<'j> {
async fn actions(self, sim: SimContext<'j>) {
// unpack the barber shop structure for easier access
let Self {
stop_time, mut rng_a, mut rng_s, joe
} = self;
// activate a process to generate the customers
sim.activate(async move {
let dist = Uniform::new(12.0, 24.0);
// wait some time before activating the first customer
sim.advance(rng_a.sample(dist)).await;
// generate new customers until the store closes officially
while sim.now() < stop_time {
// activate the next customer
sim.activate(Customer {
joe, rng: SmallRng::from_seed(rng_s.gen())
}.actions(sim));
// wait some time before activating the next customer
sim.advance(rng_a.sample(dist)).await;
}
});
// wait until the store closes
sim.advance(self.stop_time).await;
// finish processing the queue (no more customers arrive)
joe.seize().await;
}
}
/// Customer process with access to the barber and a random number generator.
struct Customer<'j> { joe: &'j Facility, rng: SmallRng }
impl Customer<'_> {
pub async fn actions(mut self, sim: SimContext<'_>) {
// access the barber
self.joe.seize().await;
// spend time
sim.advance(self.rng.gen_range(12.0, 18.0)).await;
// release the barber
self.joe.release();
}
}
#!/usr/bin/make
.SUFFIXES:
.PHONY: all run clean
TAR = coroutines
SRC = $(wildcard *.c)
OBJ = $(SRC:%.c=%.o)
DEP = $(OBJ:%.o=%.d)
-include $(DEP)
CFLAGS = -std=c11 -Wall -pedantic -MMD -MP
%.o: %.c
$(CC) $(CFLAGS) $< -c
$(TAR): $(OBJ)
$(CC) $(CFLAGS) $^ -o $@
all: $(TAR)
run: all
./$(TAR)
clean:
$(RM) $(RMFILES) $(TAR) $(OBJ) $(DEP)
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
/* Define either of them to enable the corresponding rewritten forms of the
* routines or both to have both of them rewritten. */
#define DECOMPRESS_COROUTINE
// #define TOKENIZE_COROUTINE
/* Helper macro to simplify tracing of the function calls and messages. */
#define TRACE(...) do { \
fprintf(stderr, __VA_ARGS__); \
putc('\n', stderr); \
} while (0)
/* Helper macro to catch array overflows for extreme inputs. */
#define CATCH_OVERFLOW(ARR, LEN) do { \
if ((LEN) >= sizeof(ARR)/sizeof(*ARR)) { \
fprintf(stderr, "PANIC: Array " #ARR " overflow detected, abort\n"); \
exit(-1); \
} \
} while (0)
/* Enumeration of possible token tags. */
enum Tag { WORD, PUNCT };
/* Names of the token tags. */
static const char *TOKEN_TAG[] = {
[WORD] = "Word", [PUNCT] = "Punct"
};
/* Token type with tag and value. */
typedef struct {
enum Tag tag;
char val[256];
size_t len;
} Token;
/* Primitive token channel for buffering multiple detected tokens. */
static struct {
Token token[256];
size_t len;
} token_chan;
/* Function that adds another character to the token value. */
void add_to_token(char c) {
Token *token = &token_chan.token[token_chan.len];
CATCH_OVERFLOW(token->val, token->len);
token->val[token->len++] = c;
}
/* Function that adds the corresponding tag and closes token construction. */
void got_token(enum Tag tag) {
CATCH_OVERFLOW(token_chan.token, token_chan.len);
Token *token = &token_chan.token[token_chan.len];
token->val[token->len] = '\0';
token->tag = tag;
TRACE("got_token(%s) = \"%s\"", TOKEN_TAG[tag], token->val);
++token_chan.len;
}
/* Stackless coroutine-version of the decompress-routine. */
int co_decompress(void) {
static int pc, l, c;
switch (pc) {
case 0: while (1) {
c = getchar();
if (c == EOF)
return EOF;
if (c == 0xFF) {
l = getchar();
c = getchar();
while (l--) {
TRACE("nextchar() = '%c'", c);
pc = 1;
return c;
case 1:;
}
} else {
TRACE("nextchar() = '%c'", c);
pc = 2;
return c;
case 2:;
}
}}
}
/* Stackless coroutine-version of the tokenize-routine. */
void co_tokenize(int c) {
static int pc = 1;
switch (pc) {
case 0: while (1) {
pc = 1;
return;
case 1:;
TRACE("emit('%c')", c);
if (c == EOF)
return;
if (isalpha(c)) {
do {
add_to_token(c);
pc = 2;
return;
case 2:;
TRACE("emit('%c')", c);
} while (isalpha(c));
got_token(WORD);
}
add_to_token(c);
got_token(PUNCT);
}}
}
/* Decodes RLE-encoded input and pushes it into the tokenizer coroutine. */
void decompress(void) {
while (1) {
int c = getchar();
if (c == EOF)
break;
if (c == 0xFF) {
int l = getchar();
c = getchar();
while (l--) {
co_tokenize(c);
}
} else
co_tokenize(c);
}
co_tokenize(EOF);
}
/* Calls the decompressor-coroutine for decoding RLE-encoded input and
* constructs token. */
void tokenize(void) {
while (1) {
int c = co_decompress();
if (c == EOF)
break;
if (isalpha(c)) {
do {
add_to_token(c);
c = co_decompress();
} while (isalpha(c));
got_token(WORD);
}
add_to_token(c);
got_token(PUNCT);
}
}
/* Prints all token currently present in the token channel. */
void printToken(void) {
for (size_t i = 0; i < token_chan.len; ++i) {
Token *token = &token_chan.token[i];
TRACE(
"Token: {\n"
"\ttag: %s,\n"
"\tval: \"%s\"\n"
"}",
TOKEN_TAG[token->tag],
token->val
);
token->len = 0;
}
token_chan.len = 0;
}
/* Program entry. */
int main() {
#if defined(TOKENIZE_COROUTINE) && defined(DECOMPRESS_COROUTINE)
fprintf(stderr, "Decompress Coroutine, Tokenize Coroutine\n");
for (int c; (c = co_decompress()) != EOF;) {
co_tokenize(c);
printToken();
}
#elif defined(TOKENIZE_COROUTINE)
fprintf(stderr, "Tokenize Routine, Decompress Coroutine\n");
tokenize();
#elif defined(DECOMPRESS_COROUTINE)
fprintf(stderr, "Decompress Routine, Tokenize Coroutine\n");
decompress();
#else
#error "At least one (or both) of TOKENIZE_COROUTINE or DECOMPRESS_COROUTINE should be defined."
#endif
return 0;
}
//! A discrete event simulation of a barbershop using `simcore_rs`, the
//! simulator core developed as part of my (Dorian Weber) dissertation.
//!
//! This module models a simple barbershop scenario where customers arrive at
//! random intervals, wait if the barber is busy, receive service, and then
//! depart. The simulation tracks customer wait times and provides statistical
//! analysis over the simulated period.
//!
//! # Notes
//! - The simulation assumes a continuous operation of the barbershop over the
//! specified duration.
//! - Customers arriving after the shop closes are not admitted; however, the
//! barber will finish servicing any remaining customers.
use rand::{rngs::SmallRng, Rng};
use simcore_rs::{
util::{Facility, RandomVariable},
Process, Sim, Time,
};
use std::cell::RefCell;
// Helper constants for random number generator seeds.
const SEED_A: u64 = 100_000;
const SEED_S: u64 = 200_000;
/// Globally shared data for the barbershop simulation.
struct Barbershop {
/// Random number generator for customer arrival times.
rng_a: RefCell<SmallRng>,
/// Random number generator for service times.
rng_s: RefCell<SmallRng>,
/// Facility representing the barber (Joe).
joe: Facility,
/// Statistical accumulator for customer wait times.
wait_time: RandomVariable,
}
/// Represents a customer in the barbershop simulation.
struct Customer;
impl Customer {
/// Defines the actions performed by a customer in the simulation.
///
/// The customer arrives at the barbershop, waits for the barber to be
/// available, gets a haircut (spends time being serviced), and then leaves.
pub async fn actions(self, sim: Sim<'_, Barbershop>) {
// Record the arrival time for wait time calculation.
let arrival_time = sim.now();
// Seize the barber (wait if not available).
sim.global().joe.seize().await;
// Calculate and record the customer's wait time.
sim.global().wait_time.tabulate(sim.now() - arrival_time);
// Simulate the time taken for the haircut.
let cut = sim.global().rng_s.borrow_mut().random_range(12.0..18.0);
sim.advance(cut).await;
// Release the barber for the next customer.
sim.global().joe.release();
}
}
/// The main simulation function.
///
/// This function initializes the customer arrival process, runs the simulation
/// for the specified duration, and ensures that all customers are serviced
/// before the simulation ends.
async fn sim_main(sim: Sim<'_, Barbershop>, duration: Time) {
// Activate a process to generate customers at random intervals.
sim.activate(async move {
loop {
// Wait for a random time before the next customer arrives.
let wait_time = sim.global().rng_a.borrow_mut().random_range(12.0..24.0);
sim.advance(wait_time).await;
// If the simulation time exceeds the duration, stop generating customers.
if sim.now() >= duration {
return;
}
// Activate a new customer process.
sim.activate(Customer.actions(sim));
}
});
// Run the simulation until the store closes.
sim.advance(duration).await;
// Ensure the barber finishes servicing any remaining customers.
sim.global().joe.seize().await;
}
/// Entry point for the barbershop simulation.
///
/// Initializes the simulation environment and runs the simulation for a
/// specified duration.
fn main() {
use rand::SeedableRng;
// Run the simulation and collect the result.
let result = simcore_rs::simulation(
// Initialize the global data for the simulation.
Barbershop {
rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)),
rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)),
joe: Facility::new(),
wait_time: RandomVariable::new(),
},
// Simulation entry point.
|sim| Process::new(sim, sim_main(sim, 3.0 * 7.0 * 24.0 * 60.0)),
);
// Print statistics after the simulation ends.
println!("wait_time: {:#.3?}", result.wait_time);
}
//! A discrete event simulation of a car-ferry system using `simcore_rs`, the
//! simulator core developed as part of my (Dorian Weber) dissertation.
//!
//! This module models a ferry system where cars arrive at multiple harbors and
//! wait to be transported by ferries to other harbors. The simulation tracks
//! various statistics such as car wait times, ferry cargo lengths, and ferry
//! load times over the simulated period.
//!
//! # Notes
//!
//! - The simulation assumes continuous operation over the specified duration.
//! - Cars that are not picked up by the end of the simulation are accounted for
//! in the wait time statistics.
//! - The ferry routes are set up in such a way that each ferry starts at a
//! different harbor and cycles through all harbors.
use rand::{rngs::SmallRng, Rng, SeedableRng};
use rand_distr::{Distribution, Exp, Normal};
use simcore_rs::{
util::{channel, select, RandomVariable, Receiver, Sender},
Process, Sim, Time,
};
use std::cell::RefCell;
// Constants for simulation parameters.
const SEED: u64 = 100_000;
const FERRY_COUNT: usize = 2;
const HARBOR_COUNT: usize = 4;
const HARBOR_DISTANCE: Time = 10.0;
const FERRY_TIMEOUT: Time = 5.0;
const FERRY_CAPACITY: usize = 5;
/// Shared global data for the ferry simulation.
struct Ferries {
/// Master random number generator for seeding.
master_rng: RefCell<SmallRng>,
/// Statistical accumulator for ferry cargo lengths.
ferry_cargo_len: RandomVariable,
/// Statistical accumulator for ferry load times.
ferry_load_time: RandomVariable,
/// Statistical accumulator for car wait times.
car_wait_time: RandomVariable,
}
/// Represents a car in the simulation.
#[derive(Debug)]
struct Car {
/// Time when the car arrives at the pier.
arrival_time: Time,
/// Duration required to load the car onto the ferry.
load_duration: Time,
}
/// Represents a pier (harbor) where cars arrive and wait for ferries.
struct Pier {
/// Random number generator specific to this pier.
rng: SmallRng,
/// Channel to send cars to the ferry.
landing_site: Sender<Car>,
}
/// Represents a ferry that transports cars between harbors.
struct Ferry {
/// Cargo hold containing the cars on the ferry.
cargo: Vec<Car>,
/// Timeout duration for ferry departure.
timeout: Time,
/// Time taken to travel between harbors.
travel_time: Time,
/// Receivers from piers to accept arriving cars.
piers: Vec<Receiver<Car>>,
}
impl Pier {
/// Simulates the actions of a pier in the simulation.
///
/// Cars arrive at the pier following an exponential distribution
/// and are sent to the ferry when it arrives.
async fn actions(mut self, sim: Sim<'_, Ferries>) {
// Arrival and loading time distributions.
let arrival_delay = Exp::new(0.1).unwrap();
let loading_delay = Normal::new(0.5, 0.2).unwrap();
loop {
// Wait for the next car to arrive.
sim.advance(arrival_delay.sample(&mut self.rng)).await;
// Create a new car with arrival and load times.
self.landing_site
.send(Car {
arrival_time: sim.now(),
load_duration: loading_delay
.sample_iter(&mut self.rng)
.find(|&val| val >= 0.0)
.unwrap(),
})
.await
.expect("No ferries in the simulation");
}
}
}
impl Ferry {
/// Simulates the actions of a ferry in the simulation.
///
/// The ferry travels between harbors, loads cars, waits for a timeout or
/// until it reaches capacity, and then moves to the next harbor.
async fn actions(mut self, sim: Sim<'_, Ferries>) {
loop {
for pier in self.piers.iter() {
// Unload the cars at the current pier.
for car in self.cargo.drain(..) {
sim.advance(car.load_duration).await;
}
let begin_loading = sim.now();
// Load cars until capacity is reached or timeout occurs.
while self.cargo.len() < self.cargo.capacity() {
match select(sim, pier.recv(), async {
sim.advance(self.timeout).await;
None
})
.await
{
// A car arrived before timeout.
Some(car) => {
sim.global()
.car_wait_time
.tabulate(sim.now() - car.arrival_time);
sim.advance(car.load_duration).await;
self.cargo.push(car);
}
// Timeout occurred; depart to next harbor.
None => break,
}
}
// Record ferry loading statistics.
sim.global()
.ferry_load_time
.tabulate(sim.now() - begin_loading);
sim.global()
.ferry_cargo_len
.tabulate(self.cargo.len() as f64);
// Travel to the next harbor.
sim.advance(self.travel_time).await;
}
}
}
}
/// The main simulation function.
///
/// Sets up the piers and ferries, activates their processes, and runs the simulation
/// for the specified duration.
async fn sim_main(sim: Sim<'_, Ferries>, duration: Time, ferries: usize, harbors: usize) {
let mut ports = Vec::with_capacity(harbors);
// Create all the harbors (piers).
for _ in 0..harbors {
let (sx, rx) = channel();
let harbor = Pier {
rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().random()),
landing_site: sx,
};
sim.activate(harbor.actions(sim));
ports.push(rx);
}
// Create all the ferries.
for i in 0..ferries {
let ferry = Ferry {
cargo: Vec::with_capacity(FERRY_CAPACITY),
timeout: FERRY_TIMEOUT,
travel_time: HARBOR_DISTANCE,
piers: ports
.iter()
.skip(i)
.chain(ports.iter().take(i))
.cloned()
.collect(),
};
sim.activate(ferry.actions(sim));
}
// Run the simulation for the specified duration.
sim.advance(duration).await;
// Handle any cars that weren't picked up by a ferry.
for port in ports {
for _ in 0..port.len() {
let car = port.recv().await.unwrap();
sim.global()
.car_wait_time
.tabulate(sim.now() - car.arrival_time);
}
}
}
/// Entry point for the ferry simulation.
///
/// Initializes the simulation environment and runs the simulation for one week.
fn main() {
let result = simcore_rs::simulation(
// Global shared data.
Ferries {
master_rng: RefCell::new(SmallRng::seed_from_u64(SEED)),
ferry_cargo_len: RandomVariable::new(),
ferry_load_time: RandomVariable::new(),
car_wait_time: RandomVariable::new(),
},
// Simulation entry point.
|sim| {
Process::new(
sim,
sim_main(sim, 24.0 * 60.0 * 365.0, FERRY_COUNT, HARBOR_COUNT),
)
},
);
// Output simulation statistics.
println!("Number of harbors: {}", HARBOR_COUNT);
println!("Number of ferries: {}", FERRY_COUNT);
println!("Car wait time: {:#.3?}", result.car_wait_time);
println!("Ferry cargo len: {:#.3?}", result.ferry_cargo_len);
println!("Ferry load time: {:#.3?}", result.ferry_load_time);
}
//! A discrete event simulation of the Dining Philosophers problem using
//! `simcore_rs`, the simulator core developed as part of my (Dorian Weber)
//! dissertation.
//!
//! This module models the classic Dining Philosophers problem, where multiple
//! philosophers sit around a table and alternate between thinking and eating.
//! They need two forks to eat and must coordinate to avoid deadlocks. The
//! simulation tracks the time until a deadlock occurs and collects statistical
//! data over multiple runs.
//!
//! # Notes
//!
//! - The simulation detects deadlocks when all philosophers are waiting for
//! forks and none can proceed.
//! - The statistical data collected includes the time until deadlock over
//! multiple simulation runs.
use rand::{rngs::SmallRng, Rng, SeedableRng};
use rand_distr::{Exp, Normal, Uniform};
use rayon::prelude::*;
use simcore_rs::{
util::{until, Control, RandomVariable},
Process, Sim, Time,
};
use std::{
cell::{Cell, RefCell},
rc::Rc,
};
// Constants for simulation parameters.
const PHILOSOPHER_COUNT: usize = 5;
const SEED: u64 = 100_000;
/// Shared global data for the simulation.
struct Philosophers {
/// Master random number generator for seeding.
master_rng: RefCell<SmallRng>,
/// Cell to store the simulation duration until deadlock.
sim_duration: Cell<Time>,
}
/// Represents the shared table with forks for the philosophers.
struct Table {
/// Controls for each fork, indicating whether it is held.
forks: Vec<Control<bool>>,
/// Counter for the number of forks currently held.
forks_held: Control<usize>,
/// Counter for the number of forks currently awaited.
forks_awaited: Control<usize>,
}
impl Table {
/// Creates a new table with a specified number of forks.
fn new(forks: usize) -> Self {
Table {
forks: (0..forks).map(|_| Control::new(false)).collect(),
forks_held: Control::default(),
forks_awaited: Control::default(),
}
}
/// Acquires a fork at a given position, blocking until it is available.
async fn acquire_fork(&self, i: usize) {
let num = i % self.forks.len();
self.forks_awaited.set(self.forks_awaited.get() + 1);
until(&self.forks[num], |fork| !fork.get()).await;
self.forks[num].set(true);
self.forks_awaited.set(self.forks_awaited.get() - 1);
self.forks_held.set(self.forks_held.get() + 1);
}
/// Releases a fork at a given position back to the table.
fn release_fork(&self, i: usize) {
self.forks[i % self.forks.len()].set(false);
self.forks_held.set(self.forks_held.get() - 1);
}
}
/// Represents a philosopher in the simulation.
struct Philosopher {
/// Reference to the shared table.
table: Rc<Table>,
/// The seat index of the philosopher at the table.
seat: usize,
/// Random number generator for this philosopher.
rng: SmallRng,
}
impl Philosopher {
/// Simulates the actions of a philosopher.
///
/// The philosopher alternates between thinking and eating, and tries to
/// acquire forks.
async fn actions(mut self, sim: Sim<'_, Philosophers>) {
let thinking_duration = Exp::new(1.0).unwrap();
let artificial_delay = Uniform::new(0.1, 0.2).unwrap();
let eating_duration = Normal::new(0.5, 0.2).unwrap();
let rng = &mut self.rng;
loop {
// Spend some time pondering the nature of things.
sim.advance(rng.sample(thinking_duration)).await;
// Acquire the first fork.
self.table.acquire_fork(self.seat).await;
// Introduce an artificial delay to leave room for deadlocks.
sim.advance(rng.sample(artificial_delay)).await;
// Acquire the second fork.
self.table.acquire_fork(self.seat + 1).await;
// Spend some time eating.
sim.advance(
rng.sample_iter(eating_duration).find(|&v| v >= 0.0).unwrap(),
)
.await;
// Release the forks.
self.table.release_fork(self.seat + 1);
self.table.release_fork(self.seat);
}
}
}
/// Runs a single simulation instance of the Dining Philosophers problem.
///
/// Initializes the table and philosophers, and waits until a deadlock occurs.
async fn sim_main(sim: Sim<'_, Philosophers>, count: usize) {
let table = Rc::new(Table::new(count));
// Create the philosopher processes and seat them.
for i in 0..count {
sim.activate(
Philosopher {
table: table.clone(),
seat: i,
rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().random()),
}
.actions(sim),
);
}
// Wait for the precise configuration indicating a deadlock.
until(
(&table.forks_held, &table.forks_awaited),
|(held, awaited)| held.get() == count && awaited.get() == count,
)
.await;
// Record the current simulation time.
sim.global().sim_duration.set(sim.now());
}
/// Runs multiple simulation instances in parallel and collects statistics.
///
/// # Arguments
///
/// * `count` - The number of philosophers (and forks) in each simulation.
/// * `reruns` - The number of simulation runs to perform.
///
/// # Returns
///
/// A `RandomVar` containing statistical data of simulation durations until
/// deadlock.
fn philosophers(count: usize, reruns: usize) -> RandomVariable {
// Use thread-based parallelism to concurrently run simulation models.
(1..=reruns)
.into_par_iter()
.map(|i| {
simcore_rs::simulation(
// Global data.
Philosophers {
master_rng: RefCell::new(SmallRng::seed_from_u64(i as u64 * SEED)),
sim_duration: Cell::default(),
},
// Simulation entry point.
|sim| Process::new(sim, sim_main(sim, count)),
)
.sim_duration
.get()
})
.fold(RandomVariable::new, |var, duration| {
var.tabulate(duration);
var
})
.reduce(RandomVariable::new, |var_a, var_b| {
var_a.merge(&var_b);
var_a
})
}
/// Entry point for the Dining Philosophers simulation.
///
/// Runs multiple simulations and prints out the statistical results.
fn main() {
const EXPERIMENT_COUNT: usize = 500;
let sim_duration = philosophers(PHILOSOPHER_COUNT, EXPERIMENT_COUNT);
println!("Simulation duration until deadlock: {:#?}", sim_duration);
}
hard_tabs=true
barbershop.slx: SLX-64 AN206 Lines: 7,086 Errors: 0 Warnings: 0 Lines/Second: 680,384 Peak Memory: 111 MB
Execution begins
[100000] mean 0.514 ms, stddev 0.052 ms, half-width 0.001 ms
[200000] mean 1.034 ms, stddev 0.069 ms, half-width 0.002 ms
[300000] mean 1.552 ms, stddev 0.073 ms, half-width 0.002 ms
[400000] mean 2.069 ms, stddev 0.089 ms, half-width 0.002 ms
[500000] mean 2.592 ms, stddev 0.116 ms, half-width 0.003 ms
[600000] mean 3.106 ms, stddev 0.121 ms, half-width 0.003 ms
[700000] mean 3.618 ms, stddev 0.127 ms, half-width 0.003 ms
[800000] mean 4.136 ms, stddev 0.150 ms, half-width 0.004 ms
Execution complete
Objects created: 44 passive and 1,999,959,827 active Pucks created: 2,000,039,828 Peak Memory: 111 MB Time: 3.10 Minutes
import <h7>
// enable this macro for the benchmark
#define BENCH
module barbershop {
facility joe;
queue joeq;
rn_stream rng_a seed = 100000;
rn_stream rng_s seed = 200000;
constant double STOP_TIME = 480000.0;
constant int BENCH_SAMPLES = 10000;
constant int BENCH_RANGE = 8;
constant int BENCH_STEP = 100000;
class Customer {
actions {
#ifndef BENCH
enqueue joeq;
#endif
seize joe;
#ifndef BENCH
depart joeq;
#endif
advance rv_uniform(rng_s, 12.0, 18.0);
release joe;
}
}
procedure run(double stop_time) {
arrivals: Customer
iat = rv_uniform(rng_a, 12.0, 24.0)
until_time = stop_time;
advance stop_time;
wait until FNU(joe);
}
procedure main() {
#ifdef BENCH
double samples[BENCH_SAMPLES];
double real_start, real_end;
double stop_time;
double mean, stddev, half_width;
int i, j;
for (j = 1; j <= BENCH_RANGE; j += 1) {
stop_time = BENCH_STEP*j;
for (i = 1; i <= BENCH_SAMPLES; ++i) {
real_start = real_time();
run(stop_time);
real_end = real_time();
samples[i] = (real_end - real_start)*1000.0;
clear joe;
clear system;
}
if (BENCH_SAMPLES > 1) {
build_mean_ci(samples, BENCH_SAMPLES, 0.99, mean, stddev, half_width);
print(stop_time, mean, stddev, half_width) "[_] mean _.___ ms, stddev _.___ ms, half-width _.___ ms\n";
} else {
print(stop_time, samples[1]) "[_] sample _.___ ms\n";
}
}
#else
run(STOP_TIME);
report queue_set;
report joe;
#endif
}
}
\ No newline at end of file
use simcore_rs::{Time, SimContext, Facility, RandomVar, simulation};
use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng};
// helper constants
const STOP_TIME: Time = 480000.0;
const SEED_A : u64 = 100000;
const SEED_S : u64 = 200000;
fn main() {
// pseudo random number generators referenced from within the main process
let mut rng_a = SmallRng::seed_from_u64(SEED_A);
let mut rng_s = SmallRng::seed_from_u64(SEED_S);
// the lifetimes are automatically extended until the end of the scope
let joe = &Facility::new();
let rv = &RandomVar::new();
// the main process
simulation(|sim| async move {
// activate a process to generate the customers
sim.activate(async move {
let dist = Uniform::new(12.0, 24.0);
// wait some time before activating the first customer
sim.advance(rng_a.sample(dist)).await;
// generate new customers until the store closes officially
while sim.now() < STOP_TIME {
// activate the next customer
sim.activate(Customer {
joe, rv, rng: SmallRng::from_seed(rng_s.gen())
}.actions(sim));
// wait some time before activating the next customer
sim.advance(rng_a.sample(dist)).await;
}
});
// wait until the store closes
sim.advance(STOP_TIME).await;
// finish processing the queue (no more customers arrive)
joe.seize().await;
});
println!("Stats: {:.3}", rv);
}
/// Customer process with access to the barber and a random number generator.
struct Customer<'j> { joe: &'j Facility, rv: &'j RandomVar, rng: SmallRng }
impl Customer<'_> {
pub async fn actions(mut self, sim: SimContext<'_>) {
// access the barber and record the time for the report
let time = sim.now();
self.joe.seize().await;
self.rv.tabulate(sim.now() - time);
// spend time
sim.advance(self.rng.gen_range(12.0, 18.0)).await;
// release the barber
self.joe.release();
}
}
use self::Token::*; #![forbid(unsafe_code)]
use std::rc::Rc; use std::{pin::pin, rc::Rc};
async fn decompress(mut input: impl Iterator<Item=u8>, #[derive(Clone, Debug)]
output: Sender<char>) { #[allow(dead_code)]
while let Some(c) = input.next() { enum Token {
Word(String),
Punct(char),
}
async fn decompress<I>(mut inp: I, out: Sender<char>)
where
I: Iterator<Item = u8>,
{
while let Some(c) = inp.next() {
if c == 0xFF { if c == 0xFF {
let len = input.next().unwrap(); let len = inp.next().unwrap();
let c = input.next().unwrap(); let c = inp.next().unwrap();
for _ in 0..len { for _ in 0..len {
output.send(c as char).await; out.send(c as char).await;
} }
} else { } else {
output.send(c as char).await; out.send(c as char).await;
} }
} }
} }
#[derive(Clone, Debug)] async fn tokenize(inp: Receiver<char>, out: Sender<Token>) {
enum Token { WORD(String), PUNCT(char) } while let Some(mut c) = inp.recv().await {
async fn tokenize(input: Receiver<char>, output: Sender<Token>) {
while let Some(mut c) = input.recv().await {
if c.is_alphabetic() { if c.is_alphabetic() {
let mut text = c.to_string(); let mut text = c.to_string();
while let Some(new_c) = input.recv().await { while let Some(new_c) = inp.recv().await {
if new_c.is_alphabetic() { if new_c.is_alphabetic() {
text.push(new_c); text.push(new_c);
} else { } else {
c = new_c; break; c = new_c;
break;
} }
} }
output.send(WORD(text)).await; out.send(Token::Word(text)).await;
} }
output.send(PUNCT(c)).await; out.send(Token::Punct(c)).await;
} }
} }
fn main() { fn main() {
let exec = Executor::new(); let exec = Executor::new();
let spawner = exec.spawner(); let spawner = exec.spawner();
let input = b"He\xff\x02lo SAM!".iter().cloned(); let input = b"F\xff\x02o!".iter().cloned();
exec.run(async { exec.run(async {
let (s1, r1) = channel(); let (s1, r1) = channel::<char>();
let (s2, r2) = channel(); let (s2, r2) = channel::<Token>();
spawner.spawn(decompress(input, s1)); spawner.spawn(decompress(input, s1));
spawner.spawn(tokenize(r1, s2)); spawner.spawn(tokenize(r1, s2));
while let Some(token) = r2.recv().await { while let Some(token) = r2.recv().await {
println!("{:?}", token); println!("{:?}", token);
} }
...@@ -60,65 +67,56 @@ fn main() { ...@@ -60,65 +67,56 @@ fn main() {
// additional imports for the necessary execution environment // additional imports for the necessary execution environment
use std::{ use std::{
pin::Pin,
cell::RefCell, cell::RefCell,
future::Future, future::Future,
task::{self, Poll, Context}, mem::swap,
mem::swap pin::Pin,
task::{self, Context, Poll},
}; };
/* ************************** executor environment ************************** */ /* ************************** executor environment ************************** */
// heterogeneous, pinned list of futures // heterogeneous, pinned list of futures
type FutureQ = Vec<Pin<Box<dyn Future<Output=()>>>>; type FutureQ = Vec<Pin<Box<dyn Future<Output = ()>>>>;
/// A simple executor that drives the event loop. /// A simple executor that drives the event loop.
struct Executor { struct Executor {
sched: RefCell<FutureQ> sched: RefCell<FutureQ>,
} }
impl Executor { impl Executor {
/// Constructs a new executor. /// Constructs a new executor.
pub fn new() -> Self { pub fn new() -> Self {
Executor { Executor {
sched: RefCell::new(vec![]) sched: RefCell::new(vec![]),
} }
} }
/// Creates and returns a Spawner that can be used to insert new futures /// Creates and returns a Spawner that can be used to insert new futures
/// into the executors future queue. /// into the executors future queue.
pub fn spawner(&self) -> Spawner { pub fn spawner(&self) -> Spawner {
Spawner { sched: &self.sched } Spawner { sched: &self.sched }
} }
/// Runs a future to completion. /// Runs a future to completion.
pub fn run<F: Future>(&self, mut future: F) -> F::Output { pub fn run<F: Future>(&self, mut future: F) -> F::Output {
// construct a custom context to pass into the poll()-method // construct a custom context to pass into the poll()-method
let waker = unsafe { let mut cx = Context::from_waker(task::Waker::noop());
task::Waker::from_raw(task::RawWaker::new(
std::ptr::null(), // pin the passed future for the duration of this function
&EXECUTOR_WAKER_VTABLE let mut future = pin!(future);
))
};
let mut cx = task::Context::from_waker(&waker);
// pin the passed future for the duration of this function;
// note: this is safe since we're not moving it around
let mut future = unsafe {
Pin::new_unchecked(&mut future)
};
let mut sched = FutureQ::new(); let mut sched = FutureQ::new();
// implement a busy-wait event loop for simplicity // implement a busy-wait event loop for simplicity
loop { loop {
// poll the primary future, allowing secondary futures to spawn // poll the primary future, allowing secondary futures to spawn
if let Poll::Ready(val) = future.as_mut().poll(&mut cx) { if let Poll::Ready(val) = future.as_mut().poll(&mut cx) {
break val; break val;
} }
// swap the scheduled futures with an empty queue // swap the scheduled futures with an empty queue
swap(&mut sched, &mut self.sched.borrow_mut()); swap(&mut sched, &mut self.sched.borrow_mut());
// iterate over all secondary futures presently scheduled // iterate over all secondary futures presently scheduled
for mut future in sched.drain(..) { for mut future in sched.drain(..) {
// if they are not completed, reschedule // if they are not completed, reschedule
...@@ -131,115 +129,107 @@ impl Executor { ...@@ -131,115 +129,107 @@ impl Executor {
} }
/// A handle to the executors future queue that can be used to insert new tasks. /// A handle to the executors future queue that can be used to insert new tasks.
struct Spawner<'a> { sched: &'a RefCell<FutureQ> } struct Spawner<'a> {
sched: &'a RefCell<FutureQ>,
impl<'a> Spawner<'a> {
pub fn spawn(&self, future: impl Future<Output=()> + 'static) {
self.sched.borrow_mut().push(Box::pin(future));
}
} }
/* **************************** specialized waker *************************** */ impl Spawner<'_> {
pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
/// Simple waker that isn't really used (yet), except to show the mechanism. self.sched.borrow_mut().push(Box::pin(future));
struct TrivialWaker;
impl TrivialWaker {
unsafe fn clone(_this: *const ()) -> task::RawWaker {
unimplemented!()
}
unsafe fn wake(_this: *const ()) {
unimplemented!()
}
unsafe fn wake_by_ref(_this: *const ()) {
unimplemented!()
} }
unsafe fn drop(_this: *const ()) {}
} }
/// Virtual function table for the simple waker.
static EXECUTOR_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(
TrivialWaker::clone,
TrivialWaker::wake,
TrivialWaker::wake_by_ref,
TrivialWaker::drop
);
/* ************************** asynchronous wrappers ************************* */ /* ************************** asynchronous wrappers ************************* */
/// Simple channel with space for only one element. /// Simple channel with space for only one element.
struct Channel<T> { struct Channel<T> {
slot: RefCell<Option<T>> slot: RefCell<Option<T>>,
} }
/// Creates a channel and returns a pair of read and write ends. /// Creates a channel and returns a pair of read and write ends.
fn channel<T>() -> (Sender<T>, Receiver<T>) { fn channel<T>() -> (Sender<T>, Receiver<T>) {
let chan = Rc::new(Channel { let channel = Rc::new(Channel {
slot: RefCell::new(None) slot: RefCell::new(None),
}); });
(Sender { chan: chan.clone() }, Receiver { chan }) (
Sender {
channel: channel.clone(),
},
Receiver { channel },
)
} }
/// Write-end of a channel. /// Write-end of a channel.
struct Sender<T> { chan: Rc<Channel<T>> } struct Sender<T> {
channel: Rc<Channel<T>>,
}
/// Read-end of a channel. /// Read-end of a channel.
struct Receiver<T> { chan: Rc<Channel<T>> } struct Receiver<T> {
channel: Rc<Channel<T>>,
}
impl<T: Clone> Sender<T> { impl<T: Clone> Sender<T> {
/// Method used to push an element into the channel. /// Method used to push an element into the channel.
/// Blocks until the previous element has been consumed. /// Blocks until the previous element has been consumed.
pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's { pub fn send(&self, elem: T) -> impl Future<Output = ()> + '_ {
SendFuture { chan: &self.chan, elem } SendFuture {
channel: &self.channel,
elem,
}
} }
} }
impl<T: Clone> Receiver<T> { impl<T: Clone> Receiver<T> {
/// Method used to consume an element from the channel. /// Method used to consume an element from the channel.
/// Blocks until an element can be consumed. /// Blocks until an element can be consumed.
pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r { pub fn recv(&self) -> impl Future<Output = Option<T>> + '_ {
ReceiveFuture { chan: &self.chan } ReceiveFuture {
channel: &self.channel,
}
} }
} }
/// A future that pushes an element into a channel. /// A future that pushes an element into a channel.
struct SendFuture<'c,T> { chan: &'c Rc<Channel<T>>, elem: T } struct SendFuture<'c, T> {
channel: &'c Rc<Channel<T>>,
elem: T,
}
/// A future that consumes an element from a channel. /// A future that consumes an element from a channel.
struct ReceiveFuture<'c,T> { chan: &'c Rc<Channel<T>> } struct ReceiveFuture<'c, T> {
channel: &'c Rc<Channel<T>>,
}
impl<T: Clone> Future for SendFuture<'_,T> { impl<T: Clone> Future for SendFuture<'_, T> {
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
// check if there is space for the element // check if there is space for the element
if self.chan.slot.borrow().is_none() { if self.channel.slot.borrow().is_none() {
// replace the empty element with ours // replace the empty element with ours
self.chan.slot.replace(Some(self.elem.clone())); self.channel.slot.replace(Some(self.elem.clone()));
Poll::Ready(()) Poll::Ready(())
} else { } else {
// check back at a later time // try again at a later time
Poll::Pending Poll::Pending
} }
} }
} }
impl<T> Future for ReceiveFuture<'_,T> { impl<T> Future for ReceiveFuture<'_, T> {
type Output = Option<T>; type Output = Option<T>;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
// check if there is an element in the channel // check if there is an element in the channel
if let Some(c) = self.chan.slot.borrow_mut().take() { if let Some(c) = self.channel.slot.borrow_mut().take() {
// return it // return it
Poll::Ready(Some(c)) Poll::Ready(Some(c))
} else if Rc::strong_count(self.chan) == 1 { } else if Rc::strong_count(self.channel) == 1 {
// check if the sender disconnected // check if the sender disconnected
Poll::Ready(None) Poll::Ready(None)
} else { } else {
// check back at a later time // try again at a later time
Poll::Pending Poll::Pending
} }
} }
......
use simcore_rs::{Time, SimContext, Facility, RandomVar, simulation};
use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng};
// helper constants
const STOP_TIME: Time = 480000.0;
const SEED_A : u64 = 100000;
const SEED_S : u64 = 200000;
fn main() {
// pseudo random number generators referenced from within the main process
let mut rng_a = SmallRng::seed_from_u64(SEED_A);
let mut rng_s = SmallRng::seed_from_u64(SEED_S);
// the lifetimes are automatically extended
let joe = &Facility::new();
let rv = &RandomVar::new();
// the main process
simulation(|sim| async move {
// activate a process to generate the customers
sim.activate(async move {
let dist = Uniform::new(12.0, 24.0);
// wait some time before activating the first customer
sim.advance(rng_a.sample(dist)).await;
// generate new customers until the store closes officially
while sim.now() < STOP_TIME {
// activate the next customer
sim.activate(Customer {
joe, rv, rng: SmallRng::from_seed(rng_s.gen())
}.actions(sim));
// wait some time before activating the next customer
sim.advance(rng_a.sample(dist)).await;
}
});
// wait until the store closes
sim.advance(STOP_TIME).await;
// finish processing the queue (no more customers arrive)
joe.seize().await;
joe.release();
});
println!("Stats: {:.3}", rv);
}
/// Customer process with access to the barber and a random number generator.
struct Customer<'j> { joe: &'j Facility, rv: &'j RandomVar, rng: SmallRng }
impl Customer<'_> {
pub async fn actions(mut self, sim: SimContext<'_>) {
// access the barber and record the time for the report
let time = sim.now();
self.joe.seize().await;
self.rv.tabulate(sim.now() - time);
// spend time
sim.advance(self.rng.gen_range(12.0, 18.0)).await;
// release the barber
self.joe.release();
}
}
This diff is collapsed.
use simcore_rs::util::channel;
use simcore_rs::*;
fn main() {
simulation((), |sim| {
Process::new(sim, async move {
let (sx, rx) = channel();
sim.activate(async move {
rx.recv().await;
});
sim.advance(1.0).await;
sx.send(1).await.unwrap();
})
});
}
This diff is collapsed.