From 60746d575dd0ffb94cbf5f159274bf313ade7efa Mon Sep 17 00:00:00 2001 From: Dorian Weber <weber@informatik.hu-berlin.de> Date: Sun, 24 Jan 2021 06:17:54 +0100 Subject: [PATCH] Restructured some of the files in order to implement additional examples, implemented the car ferry scenario and added the ability to wait for the termination of two competing futures, enabling timeouts. --- benches/barbershop.rs | 4 +- {src/bin => examples}/barbershop.rs | 4 +- examples/ferry.rs | 81 +++++ src/bin/coroutines.rs | 30 +- src/lib.rs | 454 ++++++++++++++++++++-------- src/main.rs | 30 ++ 6 files changed, 456 insertions(+), 147 deletions(-) rename {src/bin => examples}/barbershop.rs (93%) create mode 100644 examples/ferry.rs create mode 100644 src/main.rs diff --git a/benches/barbershop.rs b/benches/barbershop.rs index b201361..5eb5361 100644 --- a/benches/barbershop.rs +++ b/benches/barbershop.rs @@ -1,4 +1,4 @@ -use simcore_rs::{Time, SimContext, Facility, simulation, Process}; +use simcore_rs::{Time, SimContext, Facility, simulation, Fiber}; use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng}; use criterion::{Criterion, BenchmarkId, criterion_group, criterion_main, BatchSize}; @@ -29,7 +29,7 @@ fn barbershop(c: &mut Criterion) { }, |shop| simulation( shop, - |sim| Process::new(sim_main(sim)) + |sim| Fiber::new(sim_main(sim)) ), BatchSize::SmallInput ) diff --git a/src/bin/barbershop.rs b/examples/barbershop.rs similarity index 93% rename from src/bin/barbershop.rs rename to examples/barbershop.rs index 2dce680..bd2ecca 100644 --- a/src/bin/barbershop.rs +++ b/examples/barbershop.rs @@ -1,4 +1,4 @@ -use simcore_rs::{Process, Time, SimContext, Facility, RandomVar, simulation}; +use simcore_rs::{Time, SimContext, Facility, RandomVar, simulation}; use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng}; // helper constants @@ -61,6 +61,6 @@ fn main() { // global data Shared { joe: Facility::new(), rv: RandomVar::new() }, // simulation entry point - |sim| Process::new(sim_main(sim)) + |sim| sim.process(sim_main(sim)) ); } diff --git a/examples/ferry.rs b/examples/ferry.rs new file mode 100644 index 0000000..a04b423 --- /dev/null +++ b/examples/ferry.rs @@ -0,0 +1,81 @@ +use simcore_rs::{Fiber, Time, SimContext, Sender, Receiver, RandomVar, channel, simulation}; +use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng}; + +const SIM_DURATION: Time = 8.0*60.0*60.0; +const HARBOR_DISTANCE: Time = 60.0; +const CAR_ARRIVAL_DELAY: Time = 20.0; +const CAR_LOAD_TIME: Time = 10.0; +const FERRY_TIMEOUT: Time = 5.0; +const FERRY_CAPACITY: usize = 5; + +struct Ferry { + cargo: Vec<Car>, + harbors: [Receiver<Car>; 2] +} + +struct Car { + delay: Time +} + +impl Ferry { + async fn actions(mut self, sim: SimContext<'_>) { + loop { + for harbor_id in 0..self.harbors.len() { + // unload all of the cars + for car in self.cargo.drain(..) { + sim.advance(car.delay).await; + } + + // wait until the cars have arrived or a timeout has occurred + while self.cargo.len() < FERRY_CAPACITY { + // have the receive-operation compete with the advance + if let Some(car) = sim.until( + self.harbors[harbor_id].recv(), + async { sim.advance(FERRY_TIMEOUT).await; None } + ).await { + // another car arrived in time + sim.advance(car.delay).await; + self.cargo.push(car); + } else { + // the timeout has been triggered + break; + } + } + + // return to the other harbor + sim.advance(HARBOR_DISTANCE).await; + } + } + } +} + +async fn harbor(sim: SimContext<'_>, pier: Sender<Car>) { + loop { + sim.advance(CAR_ARRIVAL_DELAY).await; + pier.send(Car { delay: CAR_LOAD_TIME }); + } +} + +async fn sim_main(sim: SimContext<'_>) { + let ch1 = channel(); + let ch2 = channel(); + let ferry = Ferry { + cargo: Vec::new(), + harbors: [ch1.1, ch2.1] + }; + + sim.activate(harbor(sim, ch1.0)); + sim.activate(harbor(sim, ch2.0)); + + sim.activate(ferry.actions(sim)); + sim.advance(SIM_DURATION).await; +} + +fn main() { + simulation( + // global data + (), + // simulation entry point + |sim| sim.process(sim_main(sim)) + ); +} diff --git a/src/bin/coroutines.rs b/src/bin/coroutines.rs index 830d53f..be9f88c 100644 --- a/src/bin/coroutines.rs +++ b/src/bin/coroutines.rs @@ -40,7 +40,7 @@ async fn tokenize(input: Receiver<char>, output: Sender<Token>) { fn main() { let exec = Executor::new(); let spawner = exec.spawner(); - let input = b"He\xff\x02lo SAM!".iter().cloned(); + let input = b"He\xff\x02lo IST!".iter().cloned(); exec.run(async { let (s1, r1) = channel(); @@ -177,23 +177,23 @@ struct Channel<T> { /// Creates a channel and returns a pair of read and write ends. fn channel<T>() -> (Sender<T>, Receiver<T>) { - let chan = Rc::new(Channel { + let channel = Rc::new(Channel { slot: RefCell::new(None) }); - (Sender { chan: chan.clone() }, Receiver { chan }) + (Sender { channel: channel.clone() }, Receiver { channel }) } /// Write-end of a channel. -struct Sender<T> { chan: Rc<Channel<T>> } +struct Sender<T> { channel: Rc<Channel<T>> } /// Read-end of a channel. -struct Receiver<T> { chan: Rc<Channel<T>> } +struct Receiver<T> { channel: Rc<Channel<T>> } impl<T: Clone> Sender<T> { /// Method used to push an element into the channel. /// Blocks until the previous element has been consumed. pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's { - SendFuture { chan: &self.chan, elem } + SendFuture { channel: &self.channel, elem } } } @@ -201,27 +201,27 @@ impl<T: Clone> Receiver<T> { /// Method used to consume an element from the channel. /// Blocks until an element can be consumed. pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r { - ReceiveFuture { chan: &self.chan } + ReceiveFuture { channel: &self.channel } } } /// A future that pushes an element into a channel. -struct SendFuture<'c,T> { chan: &'c Rc<Channel<T>>, elem: T } +struct SendFuture<'c,T> { channel: &'c Rc<Channel<T>>, elem: T } /// A future that consumes an element from a channel. -struct ReceiveFuture<'c,T> { chan: &'c Rc<Channel<T>> } +struct ReceiveFuture<'c,T> { channel: &'c Rc<Channel<T>> } impl<T: Clone> Future for SendFuture<'_,T> { type Output = (); fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { // check if there is space for the element - if self.chan.slot.borrow().is_none() { + if self.channel.slot.borrow().is_none() { // replace the empty element with ours - self.chan.slot.replace(Some(self.elem.clone())); + self.channel.slot.replace(Some(self.elem.clone())); Poll::Ready(()) } else { - // check back at a later time + // try again at a later time Poll::Pending } } @@ -232,14 +232,14 @@ impl<T> Future for ReceiveFuture<'_,T> { fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { // check if there is an element in the channel - if let Some(c) = self.chan.slot.borrow_mut().take() { + if let Some(c) = self.channel.slot.borrow_mut().take() { // return it Poll::Ready(Some(c)) - } else if Rc::strong_count(self.chan) == 1 { + } else if Rc::strong_count(self.channel) == 1 { // check if the sender disconnected Poll::Ready(None) } else { - // check back at a later time + // try again at a later time Poll::Pending } } diff --git a/src/lib.rs b/src/lib.rs index 00893d6..4afd3f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,41 +14,34 @@ use std::{ // simple time type pub type Time = f64; -// priority queue of time-process-pairs using time as the key -type EventQ<'p> = BinaryHeap<NextEvent<'p>>; - -/// The (private) scheduler for processes. -struct Scheduler<'s,G> { - /// The current simulation time. - now: Cell<Time>, - - /// The event-calendar organized chronologically. - calendar: RefCell<EventQ<'s>>, - - /// The currently active process. - active: RefCell<Process<'s>>, - - /// Globally-accessible data. - shared: G -} - -// these allow us to *move* the context without invalidating it -/// A light-weight handle to the scheduler. -pub struct SimContext<'s,G> { handle: *const Scheduler<'s,G> } - -impl<'s,G> Clone for SimContext<'s,G> { - fn clone(&self) -> Self { *self } -} - -impl<'s,G> Copy for SimContext<'s,G> {} - /// Performs a single simulation run. /// /// Input is a function that takes a simulation context and returns the first /// process. It would be better if the function only needed to return the future /// needed to initialize the first process, but then the function signature gets /// more complicated and is harder to explain in a paper. -pub fn simulation<G>(shared: G, main: impl FnOnce(SimContext<G>) -> Process) { +/// +/// But just in case you're wondering how to do it: +/// ``` +/// # use simcore_rs::{SimContext, Process}; +/// # use std::future::Future; +/// pub trait Active<'s,G> { +/// fn lifecycle(self, sim: SimContext<'s,G>) -> Process<'s,G>; +/// } +/// +/// impl<'s,G,F,R> Active<'s,G> for F +/// where F: FnOnce(SimContext<'s,G>) -> R, +/// R: Future<Output = ()> + 's { +/// fn lifecycle(self, sim: SimContext<'s,G>) -> Process<'s,G> { +/// sim.process(self(sim)) +/// } +/// } +/// ``` +/// You may then change the signature to +/// ``` +/// fn simulation<G>(shared: G, main: impl for<'s> Active<'s,G>) {} +/// ``` +pub fn simulation<G>(shared: G, main: impl FnOnce(SimContext<G>) -> Process<G>) { // create a fresh scheduler and a handle to it let sched = Scheduler::new(shared); let sim = SimContext { handle: &sched }; @@ -73,6 +66,24 @@ pub fn simulation<G>(shared: G, main: impl FnOnce(SimContext<G>) -> Process) { sched.clear(); } +// priority queue of time-process-pairs using time as the key +type EventQ<'s,G> = BinaryHeap<NextEvent<'s,G>>; + +/// The (private) scheduler for processes. +struct Scheduler<'s,G> { + /// The current simulation time. + now: Cell<Time>, + + /// The event-calendar organized chronologically. + calendar: RefCell<EventQ<'s,G>>, + + /// The currently active process. + active: RefCell<Process<'s,G>>, + + /// Globally-accessible data. + shared: G +} + impl<'s,G> Scheduler<'s,G> { /// Creates a new scheduler. fn new(shared: G) -> Self { @@ -84,19 +95,26 @@ impl<'s,G> Scheduler<'s,G> { } } - /// Clears the scheduler. + /// Clears the scheduler, dropping all of the contained processes. fn clear(&self) { + // this is a surprisingly delicate operation because any of the + // processes that are still present in the event-queue may schedule + // additional processes upon dropping, with the consequence of writing + // into the event-queue while it's being cleared (and therefore in an + // inconsistent state); swapping out the calendar with an empty one + // circumvents this issue completely, since clearing and dropping are + // now two separate operations self.active.replace(Process::default()); self.calendar.replace(EventQ::default()); } /// Schedules a process at the current simulation time. - fn schedule(&self, process: Process<'s>) { + fn schedule(&self, process: Process<'s,G>) { self.schedule_in(Time::default(), process); } /// Schedules a process at a later simulation time. - fn schedule_in(&self, dt: Time, process: Process<'s>) { + fn schedule_in(&self, dt: Time, process: Process<'s,G>) { self.calendar.borrow_mut().push( NextEvent(self.now.get() + dt, process) ); @@ -104,7 +122,7 @@ impl<'s,G> Scheduler<'s,G> { /// Removes the process with the next event time from the calendar and /// activates it. - fn next_event(&self) -> Option<Process<'s>> { + fn next_event(&self) -> Option<Process<'s,G>> { let NextEvent(now, process) = self.calendar.borrow_mut().pop()?; self.now.set(now); self.active.replace(process.clone()); @@ -112,22 +130,87 @@ impl<'s,G> Scheduler<'s,G> { } } +/// A light-weight handle to the scheduler. +pub struct SimContext<'s,G = ()> { handle: *const Scheduler<'s,G> } + +// this allows the creation of copies +impl<'s,G> Clone for SimContext<'s,G> { + fn clone(&self) -> Self { *self } +} + +// this allows moving the context without invalidating it (copy semantics) +impl<'s,G> Copy for SimContext<'s,G> {} + impl<'s,G> SimContext<'s,G> { /// Returns a (reference-counted) copy of the currently active process. - pub fn active(&self) -> Process<'s> { + pub fn active(&self) -> Process<'s,G> { self.sched().active.borrow().clone() } + /// Constructs a new process from a future without scheduling it. + pub fn process(&self, fut: impl Future<Output = ()> + 's) -> Process<'s,G> { + Process::new(*self, fut) + } + /// Activates a new process with the given future. pub fn activate(&self, fut: impl Future<Output = ()> + 's) { - self.sched().schedule(Process::new(fut)); + self.sched().schedule(self.process(fut)); } /// Reactivates a process that has been suspended with wait(). - pub fn reactivate(&self, process: Process<'s>) { + pub fn reactivate(&self, process: Process<'s,G>) { + assert!(process.0.borrow().fib.is_some()); self.sched().schedule(process); } + /// Returns a future that takes two other futures and completes as soon as + /// one of them returns, canceling the other future before returning. + /// + /// This design guarantees that the two futures passed as input to this + /// function cannot outlive its returned future, enabling us to allow + /// references to variables in the local scope. The passed futures may + /// compute a value, as long as the return type is identical in both cases. + pub async fn until<'u,E,O,R>(&self, either: E, or: O) -> R + where E: Future<Output = R> + 'u, O: Future<Output = R> + 'u, R: 'u { + use std::mem::transmute; + + // create a one-shot channel that reactivates the caller on write + let promise = &Promise::new(self.active().waker()); + + // this unsafe block shortens the guaranteed lifetimes of the processes + // contained in the scheduler; it is safe because the constructed future + // ensures that both futures are terminated before the promise and + // itself are terminated, thereby preventing references to the lesser + // constrained processes to leak from this function and be reawakened, + // allowing access to dangling pointers + let sim = unsafe { + transmute::<&SimContext<'s,G>, &SimContext<'_,G>>(self) + }; + + // create the two competing processes + let p1 = sim.process(async move { + promise.fulfill(either.await); + }); + let p2 = sim.process(async move { + promise.fulfill(or.await); + }); + + // activate them + sim.reactivate(p1.clone()); + sim.reactivate(p2.clone()); + + // wait for reactivation; the promise will wake us on fulfillment + sim.wait().await; + + // terminate both processes + // (this is redundant for one of them but doesn't hurt) + p1.terminate(); + p2.terminate(); + + // extract the promised value + promise.redeem().unwrap() + } + /// Unconditionally suspends execution until reactivation. pub fn wait(&self) -> impl Future<Output = ()> { Wait { ready: false } @@ -158,60 +241,106 @@ impl<'s,G> SimContext<'s,G> { } } -#[derive(Clone)] -/// Bare-bone process. -pub struct Process<'p>(Pin<Rc<RefCell<dyn Future<Output = ()> + 'p>>>); +/// A single, cooperatively and dynamically dispatched path of execution. +pub struct Fiber<F: ?Sized>(Pin<Box<F>>); -impl<'p> Process<'p> { - /// Creates a new process from a future. - pub fn new(fut: impl Future<Output = ()> + 'p) -> Self { - Process(Rc::pin(RefCell::new(fut))) +impl<F: ?Sized + Future<Output = ()>> Fiber<F> { + /// Private function for polling the future. + #[inline] + fn poll(&mut self, cx: &mut Context) -> Poll<()> { + self.0.as_mut().poll(cx) + } +} + +/// A bare-bone process type that can also be used as a waker. +pub struct Process<'s,G>(Rc<RefCell<Inner<'s,G>>>); + +/// The private details of the [`Process`](struct.Process.html) type. +struct Inner<'s,G> { + sim: SimContext<'s,G>, + fib: Option<Fiber<dyn Future<Output = ()> + 's>> +} + +impl<'s,G> Process<'s,G> { + /// Combines a future and a simulation context into a process. + #[inline] + fn new(sim: SimContext<'s,G>, fut: impl Future<Output = ()> + 's) -> Self { + Process(Rc::new(RefCell::new(Inner { + sim, fib: Some(Fiber(Box::pin(fut))) + }))) } - /// Private function for polling the future. - fn poll(&self, cx: &mut Context) -> Poll<()> { - // this is safe because the process is already pinned + /// Releases the [`Fiber`](struct.Fiber.html) contained in this process. + /// + /// This will also execute all of the destructors for the local variables + /// initialized by this process. + #[inline] + pub fn terminate(&self) { + self.0.borrow_mut().fib.take(); + } + + /// Returns a `Waker` for this process. + #[inline] + pub fn waker(self) -> task::Waker { unsafe { - Pin::new_unchecked(&mut *self.0.borrow_mut()) - }.poll(cx) + task::Waker::from_raw(self.raw_waker()) + } + } + + /// Private function for polling the process. + #[inline] + fn poll(&self, cx: &mut Context) -> Poll<()> { + if let Some(fiber) = self.0.borrow_mut().fib.as_mut() { + fiber.poll(cx) + } else { + Poll::Ready(()) + } } } -// implementation of Rust's version of a default constructor -impl Default for Process<'_> { +impl<'s,G> Default for Process<'s,G> { fn default() -> Self { - Process::new(Wait { ready: false }) + Process(Rc::new(RefCell::new(Inner { + sim: SimContext { handle: std::ptr::null() }, + fib: None + }))) + } +} + +impl<'s,G> Clone for Process<'s,G> { + fn clone(&self) -> Self { + Process(self.0.clone()) } } // allows processes to be compared for equality -impl PartialEq for Process<'_> { +impl<G> PartialEq for Process<'_,G> { fn eq(&self, other: &Self) -> bool { - std::ptr::eq(&*self.0, &*other.0) + Rc::ptr_eq(&self.0, &other.0) } } // marks the equality-relation as total -impl Eq for Process<'_> {} +impl<G> Eq for Process<'_,G> {} /// Time-process-pair that has a total order defined based on the time. -struct NextEvent<'p>(Time, Process<'p>); +struct NextEvent<'p,G>(Time, Process<'p,G>); -impl PartialEq for NextEvent<'_> { +impl<G> PartialEq for NextEvent<'_,G> { fn eq(&self, other: &Self) -> bool { self.0 == other.0 } } -impl Eq for NextEvent<'_> {} +impl<G> Eq for NextEvent<'_,G> {} -impl PartialOrd for NextEvent<'_> { +impl<G> PartialOrd for NextEvent<'_,G> { fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) } } -impl Ord for NextEvent<'_> { +impl<G> Ord for NextEvent<'_,G> { fn cmp(&self, other: &Self) -> Ordering { self.0.partial_cmp(&other.0) .expect("illegal event time NaN").reverse() @@ -220,6 +349,89 @@ impl Ord for NextEvent<'_> { /* **************************** specialized waker *************************** */ +impl<'s,G> Process<'s,G> { + /// Virtual function table for the waker. + const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( + Self::clone, + Self::wake, + Self::wake_by_ref, + Self::drop + ); + + /// Constructs a raw waker from a simulation context and a process. + fn raw_waker(self) -> task::RawWaker { + task::RawWaker::new( + Rc::into_raw(self.0) as *const (), + &Self::VTABLE + ) + } + + unsafe fn clone(this: *const ()) -> task::RawWaker { + let waker = Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + + // increase the reference counter once + Rc::into_raw(waker.clone()); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + task::RawWaker::new( + Rc::into_raw(waker) as *const (), + &Self::VTABLE + ) + } + + unsafe fn wake(this: *const ()) { + let waker = Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + + // this can happen if a synchronization structure forgets to clean + // up registered Waker objects on destruct; this would lead to + // hard-to-diagnose bugs if we were to ignore it + assert!(waker.borrow().fib.is_some(), + "Attempted to wake a terminated process."); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + let sim = waker.borrow().sim; + sim.reactivate(Process(waker)); + } + + unsafe fn wake_by_ref(this: *const ()) { + let waker = Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + + // keep the waker alive + Rc::into_raw(waker.clone()); + + // this can happen if a synchronization structure forgets to clean + // up registered Waker objects on destruct; this would lead to + // hard-to-diagnose bugs if we were to ignore it + assert!(waker.borrow().fib.is_some(), + "Attempted to wake a terminated process."); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + let sim = waker.borrow().sim; + sim.reactivate(Process(waker)); + } + + unsafe fn drop(this: *const ()) { + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + } +} + /// Complex waker that is used to implement state events. /// /// This is the shallow version that is created on the stack of the function @@ -227,15 +439,6 @@ impl Ord for NextEvent<'_> { /// active one and creates the deep version when it is cloned. struct StateEventWaker<'s,G> { context: SimContext<'s,G> } -/// Deep version of the complex waker used to implement state events. -/// -/// This version actually holds a process as well as a simulation context on the -/// heap. Dropping the waker releases the memory. -struct StateEventWakerDeep<'s,G> { - process: Process<'s>, - context: SimContext<'s,G> -} - impl<'s,G> StateEventWaker<'s,G> { /// Virtual function table for the waker. const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( @@ -263,70 +466,21 @@ impl<'s,G> StateEventWaker<'s,G> { ) } - /// Creates a deep waker from a reference. - /// - /// This function is safe because it allocates memory on the heap. - fn deep_waker(&self) -> task::RawWaker { - StateEventWakerDeep::raw_waker( - self.context, self.context.active() - ) - } - unsafe fn clone(this: *const ()) -> task::RawWaker { - (&*(this as *const Self)).deep_waker() + // return the currently active process as a raw waker + (&*(this as *const Self)).context.active().raw_waker() } unsafe fn wake(_this: *const ()) { - unreachable!("attempting to awake the active process") + // waking the active process can safely be ignored } unsafe fn wake_by_ref(_this: *const ()) { - unreachable!("attempting to awake the active process") + // waking the active process can safely be ignored } unsafe fn drop(_this: *const ()) { - // memory released in the main event loop - } -} - -impl<'s,G> StateEventWakerDeep<'s,G> { - /// Virtual function table for the deep waker. - const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( - Self::clone, - Self::wake, - Self::wake_by_ref, - Self::drop - ); - - /// Constructs a raw waker from a simulation context and a process. - fn raw_waker(sim: SimContext<'s,G>, process: Process<'s>) -> task::RawWaker { - let waker = Box::leak(Box::new( - Self { process, context: sim } - )); - - task::RawWaker::new( - waker as *const _ as *const (), - &Self::VTABLE - ) - } - - unsafe fn clone(this: *const ()) -> task::RawWaker { - let waker = &*(this as *const Self); - Self::raw_waker(waker.context, waker.process.clone()) - } - - unsafe fn wake(this: *const ()) { - let waker = Box::from_raw(this as *const Self as *mut Self); - waker.context.reactivate(waker.process); - } - - unsafe fn wake_by_ref(this: *const ()) { - let waker = &*(this as *const Self); - waker.context.reactivate(waker.process.clone()); - } - - unsafe fn drop(this: *const ()) { - Box::from_raw(this as *const Self as *mut Self); + // memory is released in the main event loop } } @@ -403,8 +557,33 @@ impl Facility { } } -/// Complex channel with space for infinitely many elements of arbitrary -/// (non-process) type. +/// A one-shot, writable container type that awakens a preset process on write. +pub struct Promise<T> { + /// The waker of the process to awaken on write. + caller: task::Waker, + /// The written value to be extracted by the caller. + result: Cell<Option<T>> +} + +impl<T> Promise<T> { + /// Create a new promise with an unwritten result. + pub fn new(caller: task::Waker) -> Self { + Self { caller, result: Cell::new(None) } + } + + /// Write the result value and reawaken the caller. + pub fn fulfill(&self, result: T) { + self.result.replace(Some(result)); + self.caller.wake_by_ref(); + } + + /// Extract the written value and reset the state of the promise. + pub fn redeem(&self) -> Option<T> { + self.result.replace(None) + } +} + +/// Complex channel with space for infinitely many elements of arbitrary type. /// /// This channel supports arbitrary many readers and writers and uses wakers /// to reactivate suspended processes. @@ -454,7 +633,7 @@ impl<T: Unpin> Sender<T> { impl<T: Unpin> Receiver<T> { /// Returns a future that can be awaited to receive a message. pub fn recv(&self) -> ReceiveFuture<T> { - ReceiveFuture(&self.0) + ReceiveFuture(&self.0, None) } } @@ -469,6 +648,11 @@ impl<T> Channel<T> { self.waiting.pop_front() } + /// Private method that unregisters a previously registered waker. + fn unregister(&mut self, waker: task::Waker) { + self.waiting.retain(|elem| !waker.will_wake(elem)); + } + /// Private method inserting a message into the queue. fn send(&mut self, value: T) { self.store.push_back(value); @@ -488,7 +672,7 @@ pub struct SendFuture<'c,T>(&'c Weak<RefCell<Channel<T>>>, Option<T>); /// Future for the [`recv()`] operation on a channel receiver. /// /// [`recv()`]: struct.Receiver.html#method.recv -pub struct ReceiveFuture<'c,T>(&'c Rc<RefCell<Channel<T>>>); +pub struct ReceiveFuture<'c,T>(&'c Rc<RefCell<Channel<T>>>, Option<task::Waker>); impl<T: Unpin> Future for SendFuture<'_,T> { type Output = Result<(),T>; @@ -516,9 +700,12 @@ impl<T: Unpin> Future for SendFuture<'_,T> { impl<T: Unpin> Future for ReceiveFuture<'_,T> { type Output = Option<T>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut channel = self.0.borrow_mut(); + // clear the local copy of our waker + self.1 = None; + if let Some(c) = channel.recv() { // there are elements in the channel Poll::Ready(Some(c)) @@ -527,12 +714,23 @@ impl<T: Unpin> Future for ReceiveFuture<'_,T> { Poll::Ready(None) } else { // no elements, but potential senders exist: check back later - channel.enqueue(cx.waker().clone()); + let waker = cx.waker().clone(); + self.1 = Some(waker.clone()); + channel.enqueue(waker); Poll::Pending } } } +impl<T> Drop for ReceiveFuture<'_,T> { + fn drop(&mut self) { + // take care to unregister our waker from the channel + if let Some(waker) = self.1.take() { + self.0.borrow_mut().unregister(waker); + } + } +} + /* ************************* statistical facilities ************************* */ /// A simple collector for statistical data, inspired by SLX's random_variable. diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..19540e6 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,30 @@ +use simcore_rs::{Time, SimContext, Sender, Receiver, channel, simulation}; + +fn main() { + simulation( + (), + |sim| sim.process(async move { + println!("[{}] Begin", sim.now()); + + let (sx, rx) = channel(); + sim.activate(async move { + for i in 0.. { + sim.advance(1.0).await; + sx.send(i).await.ok(); + } + }); + + sim.until(async move { + while let Some(i) = rx.recv().await { + println!("[{}] Received {}", sim.now(), i); + if i >= 3 { break; } + } + + println!("[{}] Exit Receiver Process", sim.now()); + }, sim.advance(5.0)).await; + + sim.advance(10.0).await; + println!("[{}] End", sim.now()); + }) + ); +} -- GitLab