Skip to content
Snippets Groups Projects
Commit 60746d57 authored by Dorian Weber's avatar Dorian Weber
Browse files

Restructured some of the files in order to implement additional examples,...

Restructured some of the files in order to implement additional examples, implemented the car ferry scenario and added the ability to wait for the termination of two competing futures, enabling timeouts.
parent 53b9db55
No related merge requests found
use simcore_rs::{Time, SimContext, Facility, simulation, Process};
use simcore_rs::{Time, SimContext, Facility, simulation, Fiber};
use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng};
use criterion::{Criterion, BenchmarkId, criterion_group, criterion_main, BatchSize};
......@@ -29,7 +29,7 @@ fn barbershop(c: &mut Criterion) {
},
|shop| simulation(
shop,
|sim| Process::new(sim_main(sim))
|sim| Fiber::new(sim_main(sim))
),
BatchSize::SmallInput
)
......
use simcore_rs::{Process, Time, SimContext, Facility, RandomVar, simulation};
use simcore_rs::{Time, SimContext, Facility, RandomVar, simulation};
use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng};
// helper constants
......@@ -61,6 +61,6 @@ fn main() {
// global data
Shared { joe: Facility::new(), rv: RandomVar::new() },
// simulation entry point
|sim| Process::new(sim_main(sim))
|sim| sim.process(sim_main(sim))
);
}
use simcore_rs::{Fiber, Time, SimContext, Sender, Receiver, RandomVar, channel, simulation};
use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng};
const SIM_DURATION: Time = 8.0*60.0*60.0;
const HARBOR_DISTANCE: Time = 60.0;
const CAR_ARRIVAL_DELAY: Time = 20.0;
const CAR_LOAD_TIME: Time = 10.0;
const FERRY_TIMEOUT: Time = 5.0;
const FERRY_CAPACITY: usize = 5;
struct Ferry {
cargo: Vec<Car>,
harbors: [Receiver<Car>; 2]
}
struct Car {
delay: Time
}
impl Ferry {
async fn actions(mut self, sim: SimContext<'_>) {
loop {
for harbor_id in 0..self.harbors.len() {
// unload all of the cars
for car in self.cargo.drain(..) {
sim.advance(car.delay).await;
}
// wait until the cars have arrived or a timeout has occurred
while self.cargo.len() < FERRY_CAPACITY {
// have the receive-operation compete with the advance
if let Some(car) = sim.until(
self.harbors[harbor_id].recv(),
async { sim.advance(FERRY_TIMEOUT).await; None }
).await {
// another car arrived in time
sim.advance(car.delay).await;
self.cargo.push(car);
} else {
// the timeout has been triggered
break;
}
}
// return to the other harbor
sim.advance(HARBOR_DISTANCE).await;
}
}
}
}
async fn harbor(sim: SimContext<'_>, pier: Sender<Car>) {
loop {
sim.advance(CAR_ARRIVAL_DELAY).await;
pier.send(Car { delay: CAR_LOAD_TIME });
}
}
async fn sim_main(sim: SimContext<'_>) {
let ch1 = channel();
let ch2 = channel();
let ferry = Ferry {
cargo: Vec::new(),
harbors: [ch1.1, ch2.1]
};
sim.activate(harbor(sim, ch1.0));
sim.activate(harbor(sim, ch2.0));
sim.activate(ferry.actions(sim));
sim.advance(SIM_DURATION).await;
}
fn main() {
simulation(
// global data
(),
// simulation entry point
|sim| sim.process(sim_main(sim))
);
}
......@@ -40,7 +40,7 @@ async fn tokenize(input: Receiver<char>, output: Sender<Token>) {
fn main() {
let exec = Executor::new();
let spawner = exec.spawner();
let input = b"He\xff\x02lo SAM!".iter().cloned();
let input = b"He\xff\x02lo IST!".iter().cloned();
exec.run(async {
let (s1, r1) = channel();
......@@ -177,23 +177,23 @@ struct Channel<T> {
/// Creates a channel and returns a pair of read and write ends.
fn channel<T>() -> (Sender<T>, Receiver<T>) {
let chan = Rc::new(Channel {
let channel = Rc::new(Channel {
slot: RefCell::new(None)
});
(Sender { chan: chan.clone() }, Receiver { chan })
(Sender { channel: channel.clone() }, Receiver { channel })
}
/// Write-end of a channel.
struct Sender<T> { chan: Rc<Channel<T>> }
struct Sender<T> { channel: Rc<Channel<T>> }
/// Read-end of a channel.
struct Receiver<T> { chan: Rc<Channel<T>> }
struct Receiver<T> { channel: Rc<Channel<T>> }
impl<T: Clone> Sender<T> {
/// Method used to push an element into the channel.
/// Blocks until the previous element has been consumed.
pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's {
SendFuture { chan: &self.chan, elem }
SendFuture { channel: &self.channel, elem }
}
}
......@@ -201,27 +201,27 @@ impl<T: Clone> Receiver<T> {
/// Method used to consume an element from the channel.
/// Blocks until an element can be consumed.
pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r {
ReceiveFuture { chan: &self.chan }
ReceiveFuture { channel: &self.channel }
}
}
/// A future that pushes an element into a channel.
struct SendFuture<'c,T> { chan: &'c Rc<Channel<T>>, elem: T }
struct SendFuture<'c,T> { channel: &'c Rc<Channel<T>>, elem: T }
/// A future that consumes an element from a channel.
struct ReceiveFuture<'c,T> { chan: &'c Rc<Channel<T>> }
struct ReceiveFuture<'c,T> { channel: &'c Rc<Channel<T>> }
impl<T: Clone> Future for SendFuture<'_,T> {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
// check if there is space for the element
if self.chan.slot.borrow().is_none() {
if self.channel.slot.borrow().is_none() {
// replace the empty element with ours
self.chan.slot.replace(Some(self.elem.clone()));
self.channel.slot.replace(Some(self.elem.clone()));
Poll::Ready(())
} else {
// check back at a later time
// try again at a later time
Poll::Pending
}
}
......@@ -232,14 +232,14 @@ impl<T> Future for ReceiveFuture<'_,T> {
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
// check if there is an element in the channel
if let Some(c) = self.chan.slot.borrow_mut().take() {
if let Some(c) = self.channel.slot.borrow_mut().take() {
// return it
Poll::Ready(Some(c))
} else if Rc::strong_count(self.chan) == 1 {
} else if Rc::strong_count(self.channel) == 1 {
// check if the sender disconnected
Poll::Ready(None)
} else {
// check back at a later time
// try again at a later time
Poll::Pending
}
}
......
This diff is collapsed.
use simcore_rs::{Time, SimContext, Sender, Receiver, channel, simulation};
fn main() {
simulation(
(),
|sim| sim.process(async move {
println!("[{}] Begin", sim.now());
let (sx, rx) = channel();
sim.activate(async move {
for i in 0.. {
sim.advance(1.0).await;
sx.send(i).await.ok();
}
});
sim.until(async move {
while let Some(i) = rx.recv().await {
println!("[{}] Received {}", sim.now(), i);
if i >= 3 { break; }
}
println!("[{}] Exit Receiver Process", sim.now());
}, sim.advance(5.0)).await;
sim.advance(10.0).await;
println!("[{}] End", sim.now());
})
);
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment