diff --git a/benches/barbershop.rs b/benches/barbershop.rs index b201361b674c979fcd9afbd30de3babff0962992..5eb5361de02005f2fd6bed46eeee1e62c510ae90 100644 --- a/benches/barbershop.rs +++ b/benches/barbershop.rs @@ -1,4 +1,4 @@ -use simcore_rs::{Time, SimContext, Facility, simulation, Process}; +use simcore_rs::{Time, SimContext, Facility, simulation, Fiber}; use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng}; use criterion::{Criterion, BenchmarkId, criterion_group, criterion_main, BatchSize}; @@ -29,7 +29,7 @@ fn barbershop(c: &mut Criterion) { }, |shop| simulation( shop, - |sim| Process::new(sim_main(sim)) + |sim| Fiber::new(sim_main(sim)) ), BatchSize::SmallInput ) diff --git a/src/bin/barbershop.rs b/examples/barbershop.rs similarity index 93% rename from src/bin/barbershop.rs rename to examples/barbershop.rs index 2dce6805145591d0989d38060d30caa9447b1538..bd2ecca278905c3208faa6e03d268d549967004b 100644 --- a/src/bin/barbershop.rs +++ b/examples/barbershop.rs @@ -1,4 +1,4 @@ -use simcore_rs::{Process, Time, SimContext, Facility, RandomVar, simulation}; +use simcore_rs::{Time, SimContext, Facility, RandomVar, simulation}; use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng}; // helper constants @@ -61,6 +61,6 @@ fn main() { // global data Shared { joe: Facility::new(), rv: RandomVar::new() }, // simulation entry point - |sim| Process::new(sim_main(sim)) + |sim| sim.process(sim_main(sim)) ); } diff --git a/examples/ferry.rs b/examples/ferry.rs new file mode 100644 index 0000000000000000000000000000000000000000..a04b4232985776c1aa0c4010d9f69044e0dfd073 --- /dev/null +++ b/examples/ferry.rs @@ -0,0 +1,81 @@ +use simcore_rs::{Fiber, Time, SimContext, Sender, Receiver, RandomVar, channel, simulation}; +use rand::{distributions::Uniform, rngs::SmallRng, SeedableRng, Rng}; + +const SIM_DURATION: Time = 8.0*60.0*60.0; +const HARBOR_DISTANCE: Time = 60.0; +const CAR_ARRIVAL_DELAY: Time = 20.0; +const CAR_LOAD_TIME: Time = 10.0; +const FERRY_TIMEOUT: Time = 5.0; +const FERRY_CAPACITY: usize = 5; + +struct Ferry { + cargo: Vec<Car>, + harbors: [Receiver<Car>; 2] +} + +struct Car { + delay: Time +} + +impl Ferry { + async fn actions(mut self, sim: SimContext<'_>) { + loop { + for harbor_id in 0..self.harbors.len() { + // unload all of the cars + for car in self.cargo.drain(..) { + sim.advance(car.delay).await; + } + + // wait until the cars have arrived or a timeout has occurred + while self.cargo.len() < FERRY_CAPACITY { + // have the receive-operation compete with the advance + if let Some(car) = sim.until( + self.harbors[harbor_id].recv(), + async { sim.advance(FERRY_TIMEOUT).await; None } + ).await { + // another car arrived in time + sim.advance(car.delay).await; + self.cargo.push(car); + } else { + // the timeout has been triggered + break; + } + } + + // return to the other harbor + sim.advance(HARBOR_DISTANCE).await; + } + } + } +} + +async fn harbor(sim: SimContext<'_>, pier: Sender<Car>) { + loop { + sim.advance(CAR_ARRIVAL_DELAY).await; + pier.send(Car { delay: CAR_LOAD_TIME }); + } +} + +async fn sim_main(sim: SimContext<'_>) { + let ch1 = channel(); + let ch2 = channel(); + let ferry = Ferry { + cargo: Vec::new(), + harbors: [ch1.1, ch2.1] + }; + + sim.activate(harbor(sim, ch1.0)); + sim.activate(harbor(sim, ch2.0)); + + sim.activate(ferry.actions(sim)); + sim.advance(SIM_DURATION).await; +} + +fn main() { + simulation( + // global data + (), + // simulation entry point + |sim| sim.process(sim_main(sim)) + ); +} diff --git a/src/bin/coroutines.rs b/src/bin/coroutines.rs index 830d53fa2ce22f09c08ee98fbab3dbd964b5fbf0..be9f88cd280934b2da67203af570144ae59160fc 100644 --- a/src/bin/coroutines.rs +++ b/src/bin/coroutines.rs @@ -40,7 +40,7 @@ async fn tokenize(input: Receiver<char>, output: Sender<Token>) { fn main() { let exec = Executor::new(); let spawner = exec.spawner(); - let input = b"He\xff\x02lo SAM!".iter().cloned(); + let input = b"He\xff\x02lo IST!".iter().cloned(); exec.run(async { let (s1, r1) = channel(); @@ -177,23 +177,23 @@ struct Channel<T> { /// Creates a channel and returns a pair of read and write ends. fn channel<T>() -> (Sender<T>, Receiver<T>) { - let chan = Rc::new(Channel { + let channel = Rc::new(Channel { slot: RefCell::new(None) }); - (Sender { chan: chan.clone() }, Receiver { chan }) + (Sender { channel: channel.clone() }, Receiver { channel }) } /// Write-end of a channel. -struct Sender<T> { chan: Rc<Channel<T>> } +struct Sender<T> { channel: Rc<Channel<T>> } /// Read-end of a channel. -struct Receiver<T> { chan: Rc<Channel<T>> } +struct Receiver<T> { channel: Rc<Channel<T>> } impl<T: Clone> Sender<T> { /// Method used to push an element into the channel. /// Blocks until the previous element has been consumed. pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's { - SendFuture { chan: &self.chan, elem } + SendFuture { channel: &self.channel, elem } } } @@ -201,27 +201,27 @@ impl<T: Clone> Receiver<T> { /// Method used to consume an element from the channel. /// Blocks until an element can be consumed. pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r { - ReceiveFuture { chan: &self.chan } + ReceiveFuture { channel: &self.channel } } } /// A future that pushes an element into a channel. -struct SendFuture<'c,T> { chan: &'c Rc<Channel<T>>, elem: T } +struct SendFuture<'c,T> { channel: &'c Rc<Channel<T>>, elem: T } /// A future that consumes an element from a channel. -struct ReceiveFuture<'c,T> { chan: &'c Rc<Channel<T>> } +struct ReceiveFuture<'c,T> { channel: &'c Rc<Channel<T>> } impl<T: Clone> Future for SendFuture<'_,T> { type Output = (); fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { // check if there is space for the element - if self.chan.slot.borrow().is_none() { + if self.channel.slot.borrow().is_none() { // replace the empty element with ours - self.chan.slot.replace(Some(self.elem.clone())); + self.channel.slot.replace(Some(self.elem.clone())); Poll::Ready(()) } else { - // check back at a later time + // try again at a later time Poll::Pending } } @@ -232,14 +232,14 @@ impl<T> Future for ReceiveFuture<'_,T> { fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { // check if there is an element in the channel - if let Some(c) = self.chan.slot.borrow_mut().take() { + if let Some(c) = self.channel.slot.borrow_mut().take() { // return it Poll::Ready(Some(c)) - } else if Rc::strong_count(self.chan) == 1 { + } else if Rc::strong_count(self.channel) == 1 { // check if the sender disconnected Poll::Ready(None) } else { - // check back at a later time + // try again at a later time Poll::Pending } } diff --git a/src/lib.rs b/src/lib.rs index 00893d633918df6937ab7d2a24a40a858465dad5..4afd3f044096beb069dbe8abb4d01ac2347e5396 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,41 +14,34 @@ use std::{ // simple time type pub type Time = f64; -// priority queue of time-process-pairs using time as the key -type EventQ<'p> = BinaryHeap<NextEvent<'p>>; - -/// The (private) scheduler for processes. -struct Scheduler<'s,G> { - /// The current simulation time. - now: Cell<Time>, - - /// The event-calendar organized chronologically. - calendar: RefCell<EventQ<'s>>, - - /// The currently active process. - active: RefCell<Process<'s>>, - - /// Globally-accessible data. - shared: G -} - -// these allow us to *move* the context without invalidating it -/// A light-weight handle to the scheduler. -pub struct SimContext<'s,G> { handle: *const Scheduler<'s,G> } - -impl<'s,G> Clone for SimContext<'s,G> { - fn clone(&self) -> Self { *self } -} - -impl<'s,G> Copy for SimContext<'s,G> {} - /// Performs a single simulation run. /// /// Input is a function that takes a simulation context and returns the first /// process. It would be better if the function only needed to return the future /// needed to initialize the first process, but then the function signature gets /// more complicated and is harder to explain in a paper. -pub fn simulation<G>(shared: G, main: impl FnOnce(SimContext<G>) -> Process) { +/// +/// But just in case you're wondering how to do it: +/// ``` +/// # use simcore_rs::{SimContext, Process}; +/// # use std::future::Future; +/// pub trait Active<'s,G> { +/// fn lifecycle(self, sim: SimContext<'s,G>) -> Process<'s,G>; +/// } +/// +/// impl<'s,G,F,R> Active<'s,G> for F +/// where F: FnOnce(SimContext<'s,G>) -> R, +/// R: Future<Output = ()> + 's { +/// fn lifecycle(self, sim: SimContext<'s,G>) -> Process<'s,G> { +/// sim.process(self(sim)) +/// } +/// } +/// ``` +/// You may then change the signature to +/// ``` +/// fn simulation<G>(shared: G, main: impl for<'s> Active<'s,G>) {} +/// ``` +pub fn simulation<G>(shared: G, main: impl FnOnce(SimContext<G>) -> Process<G>) { // create a fresh scheduler and a handle to it let sched = Scheduler::new(shared); let sim = SimContext { handle: &sched }; @@ -73,6 +66,24 @@ pub fn simulation<G>(shared: G, main: impl FnOnce(SimContext<G>) -> Process) { sched.clear(); } +// priority queue of time-process-pairs using time as the key +type EventQ<'s,G> = BinaryHeap<NextEvent<'s,G>>; + +/// The (private) scheduler for processes. +struct Scheduler<'s,G> { + /// The current simulation time. + now: Cell<Time>, + + /// The event-calendar organized chronologically. + calendar: RefCell<EventQ<'s,G>>, + + /// The currently active process. + active: RefCell<Process<'s,G>>, + + /// Globally-accessible data. + shared: G +} + impl<'s,G> Scheduler<'s,G> { /// Creates a new scheduler. fn new(shared: G) -> Self { @@ -84,19 +95,26 @@ impl<'s,G> Scheduler<'s,G> { } } - /// Clears the scheduler. + /// Clears the scheduler, dropping all of the contained processes. fn clear(&self) { + // this is a surprisingly delicate operation because any of the + // processes that are still present in the event-queue may schedule + // additional processes upon dropping, with the consequence of writing + // into the event-queue while it's being cleared (and therefore in an + // inconsistent state); swapping out the calendar with an empty one + // circumvents this issue completely, since clearing and dropping are + // now two separate operations self.active.replace(Process::default()); self.calendar.replace(EventQ::default()); } /// Schedules a process at the current simulation time. - fn schedule(&self, process: Process<'s>) { + fn schedule(&self, process: Process<'s,G>) { self.schedule_in(Time::default(), process); } /// Schedules a process at a later simulation time. - fn schedule_in(&self, dt: Time, process: Process<'s>) { + fn schedule_in(&self, dt: Time, process: Process<'s,G>) { self.calendar.borrow_mut().push( NextEvent(self.now.get() + dt, process) ); @@ -104,7 +122,7 @@ impl<'s,G> Scheduler<'s,G> { /// Removes the process with the next event time from the calendar and /// activates it. - fn next_event(&self) -> Option<Process<'s>> { + fn next_event(&self) -> Option<Process<'s,G>> { let NextEvent(now, process) = self.calendar.borrow_mut().pop()?; self.now.set(now); self.active.replace(process.clone()); @@ -112,22 +130,87 @@ impl<'s,G> Scheduler<'s,G> { } } +/// A light-weight handle to the scheduler. +pub struct SimContext<'s,G = ()> { handle: *const Scheduler<'s,G> } + +// this allows the creation of copies +impl<'s,G> Clone for SimContext<'s,G> { + fn clone(&self) -> Self { *self } +} + +// this allows moving the context without invalidating it (copy semantics) +impl<'s,G> Copy for SimContext<'s,G> {} + impl<'s,G> SimContext<'s,G> { /// Returns a (reference-counted) copy of the currently active process. - pub fn active(&self) -> Process<'s> { + pub fn active(&self) -> Process<'s,G> { self.sched().active.borrow().clone() } + /// Constructs a new process from a future without scheduling it. + pub fn process(&self, fut: impl Future<Output = ()> + 's) -> Process<'s,G> { + Process::new(*self, fut) + } + /// Activates a new process with the given future. pub fn activate(&self, fut: impl Future<Output = ()> + 's) { - self.sched().schedule(Process::new(fut)); + self.sched().schedule(self.process(fut)); } /// Reactivates a process that has been suspended with wait(). - pub fn reactivate(&self, process: Process<'s>) { + pub fn reactivate(&self, process: Process<'s,G>) { + assert!(process.0.borrow().fib.is_some()); self.sched().schedule(process); } + /// Returns a future that takes two other futures and completes as soon as + /// one of them returns, canceling the other future before returning. + /// + /// This design guarantees that the two futures passed as input to this + /// function cannot outlive its returned future, enabling us to allow + /// references to variables in the local scope. The passed futures may + /// compute a value, as long as the return type is identical in both cases. + pub async fn until<'u,E,O,R>(&self, either: E, or: O) -> R + where E: Future<Output = R> + 'u, O: Future<Output = R> + 'u, R: 'u { + use std::mem::transmute; + + // create a one-shot channel that reactivates the caller on write + let promise = &Promise::new(self.active().waker()); + + // this unsafe block shortens the guaranteed lifetimes of the processes + // contained in the scheduler; it is safe because the constructed future + // ensures that both futures are terminated before the promise and + // itself are terminated, thereby preventing references to the lesser + // constrained processes to leak from this function and be reawakened, + // allowing access to dangling pointers + let sim = unsafe { + transmute::<&SimContext<'s,G>, &SimContext<'_,G>>(self) + }; + + // create the two competing processes + let p1 = sim.process(async move { + promise.fulfill(either.await); + }); + let p2 = sim.process(async move { + promise.fulfill(or.await); + }); + + // activate them + sim.reactivate(p1.clone()); + sim.reactivate(p2.clone()); + + // wait for reactivation; the promise will wake us on fulfillment + sim.wait().await; + + // terminate both processes + // (this is redundant for one of them but doesn't hurt) + p1.terminate(); + p2.terminate(); + + // extract the promised value + promise.redeem().unwrap() + } + /// Unconditionally suspends execution until reactivation. pub fn wait(&self) -> impl Future<Output = ()> { Wait { ready: false } @@ -158,60 +241,106 @@ impl<'s,G> SimContext<'s,G> { } } -#[derive(Clone)] -/// Bare-bone process. -pub struct Process<'p>(Pin<Rc<RefCell<dyn Future<Output = ()> + 'p>>>); +/// A single, cooperatively and dynamically dispatched path of execution. +pub struct Fiber<F: ?Sized>(Pin<Box<F>>); -impl<'p> Process<'p> { - /// Creates a new process from a future. - pub fn new(fut: impl Future<Output = ()> + 'p) -> Self { - Process(Rc::pin(RefCell::new(fut))) +impl<F: ?Sized + Future<Output = ()>> Fiber<F> { + /// Private function for polling the future. + #[inline] + fn poll(&mut self, cx: &mut Context) -> Poll<()> { + self.0.as_mut().poll(cx) + } +} + +/// A bare-bone process type that can also be used as a waker. +pub struct Process<'s,G>(Rc<RefCell<Inner<'s,G>>>); + +/// The private details of the [`Process`](struct.Process.html) type. +struct Inner<'s,G> { + sim: SimContext<'s,G>, + fib: Option<Fiber<dyn Future<Output = ()> + 's>> +} + +impl<'s,G> Process<'s,G> { + /// Combines a future and a simulation context into a process. + #[inline] + fn new(sim: SimContext<'s,G>, fut: impl Future<Output = ()> + 's) -> Self { + Process(Rc::new(RefCell::new(Inner { + sim, fib: Some(Fiber(Box::pin(fut))) + }))) } - /// Private function for polling the future. - fn poll(&self, cx: &mut Context) -> Poll<()> { - // this is safe because the process is already pinned + /// Releases the [`Fiber`](struct.Fiber.html) contained in this process. + /// + /// This will also execute all of the destructors for the local variables + /// initialized by this process. + #[inline] + pub fn terminate(&self) { + self.0.borrow_mut().fib.take(); + } + + /// Returns a `Waker` for this process. + #[inline] + pub fn waker(self) -> task::Waker { unsafe { - Pin::new_unchecked(&mut *self.0.borrow_mut()) - }.poll(cx) + task::Waker::from_raw(self.raw_waker()) + } + } + + /// Private function for polling the process. + #[inline] + fn poll(&self, cx: &mut Context) -> Poll<()> { + if let Some(fiber) = self.0.borrow_mut().fib.as_mut() { + fiber.poll(cx) + } else { + Poll::Ready(()) + } } } -// implementation of Rust's version of a default constructor -impl Default for Process<'_> { +impl<'s,G> Default for Process<'s,G> { fn default() -> Self { - Process::new(Wait { ready: false }) + Process(Rc::new(RefCell::new(Inner { + sim: SimContext { handle: std::ptr::null() }, + fib: None + }))) + } +} + +impl<'s,G> Clone for Process<'s,G> { + fn clone(&self) -> Self { + Process(self.0.clone()) } } // allows processes to be compared for equality -impl PartialEq for Process<'_> { +impl<G> PartialEq for Process<'_,G> { fn eq(&self, other: &Self) -> bool { - std::ptr::eq(&*self.0, &*other.0) + Rc::ptr_eq(&self.0, &other.0) } } // marks the equality-relation as total -impl Eq for Process<'_> {} +impl<G> Eq for Process<'_,G> {} /// Time-process-pair that has a total order defined based on the time. -struct NextEvent<'p>(Time, Process<'p>); +struct NextEvent<'p,G>(Time, Process<'p,G>); -impl PartialEq for NextEvent<'_> { +impl<G> PartialEq for NextEvent<'_,G> { fn eq(&self, other: &Self) -> bool { self.0 == other.0 } } -impl Eq for NextEvent<'_> {} +impl<G> Eq for NextEvent<'_,G> {} -impl PartialOrd for NextEvent<'_> { +impl<G> PartialOrd for NextEvent<'_,G> { fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) } } -impl Ord for NextEvent<'_> { +impl<G> Ord for NextEvent<'_,G> { fn cmp(&self, other: &Self) -> Ordering { self.0.partial_cmp(&other.0) .expect("illegal event time NaN").reverse() @@ -220,6 +349,89 @@ impl Ord for NextEvent<'_> { /* **************************** specialized waker *************************** */ +impl<'s,G> Process<'s,G> { + /// Virtual function table for the waker. + const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( + Self::clone, + Self::wake, + Self::wake_by_ref, + Self::drop + ); + + /// Constructs a raw waker from a simulation context and a process. + fn raw_waker(self) -> task::RawWaker { + task::RawWaker::new( + Rc::into_raw(self.0) as *const (), + &Self::VTABLE + ) + } + + unsafe fn clone(this: *const ()) -> task::RawWaker { + let waker = Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + + // increase the reference counter once + Rc::into_raw(waker.clone()); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + task::RawWaker::new( + Rc::into_raw(waker) as *const (), + &Self::VTABLE + ) + } + + unsafe fn wake(this: *const ()) { + let waker = Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + + // this can happen if a synchronization structure forgets to clean + // up registered Waker objects on destruct; this would lead to + // hard-to-diagnose bugs if we were to ignore it + assert!(waker.borrow().fib.is_some(), + "Attempted to wake a terminated process."); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + let sim = waker.borrow().sim; + sim.reactivate(Process(waker)); + } + + unsafe fn wake_by_ref(this: *const ()) { + let waker = Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + + // keep the waker alive + Rc::into_raw(waker.clone()); + + // this can happen if a synchronization structure forgets to clean + // up registered Waker objects on destruct; this would lead to + // hard-to-diagnose bugs if we were to ignore it + assert!(waker.borrow().fib.is_some(), + "Attempted to wake a terminated process."); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + let sim = waker.borrow().sim; + sim.reactivate(Process(waker)); + } + + unsafe fn drop(this: *const ()) { + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + Rc::from_raw( + this as *const RefCell<Inner<G>> + ); + } +} + /// Complex waker that is used to implement state events. /// /// This is the shallow version that is created on the stack of the function @@ -227,15 +439,6 @@ impl Ord for NextEvent<'_> { /// active one and creates the deep version when it is cloned. struct StateEventWaker<'s,G> { context: SimContext<'s,G> } -/// Deep version of the complex waker used to implement state events. -/// -/// This version actually holds a process as well as a simulation context on the -/// heap. Dropping the waker releases the memory. -struct StateEventWakerDeep<'s,G> { - process: Process<'s>, - context: SimContext<'s,G> -} - impl<'s,G> StateEventWaker<'s,G> { /// Virtual function table for the waker. const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( @@ -263,70 +466,21 @@ impl<'s,G> StateEventWaker<'s,G> { ) } - /// Creates a deep waker from a reference. - /// - /// This function is safe because it allocates memory on the heap. - fn deep_waker(&self) -> task::RawWaker { - StateEventWakerDeep::raw_waker( - self.context, self.context.active() - ) - } - unsafe fn clone(this: *const ()) -> task::RawWaker { - (&*(this as *const Self)).deep_waker() + // return the currently active process as a raw waker + (&*(this as *const Self)).context.active().raw_waker() } unsafe fn wake(_this: *const ()) { - unreachable!("attempting to awake the active process") + // waking the active process can safely be ignored } unsafe fn wake_by_ref(_this: *const ()) { - unreachable!("attempting to awake the active process") + // waking the active process can safely be ignored } unsafe fn drop(_this: *const ()) { - // memory released in the main event loop - } -} - -impl<'s,G> StateEventWakerDeep<'s,G> { - /// Virtual function table for the deep waker. - const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( - Self::clone, - Self::wake, - Self::wake_by_ref, - Self::drop - ); - - /// Constructs a raw waker from a simulation context and a process. - fn raw_waker(sim: SimContext<'s,G>, process: Process<'s>) -> task::RawWaker { - let waker = Box::leak(Box::new( - Self { process, context: sim } - )); - - task::RawWaker::new( - waker as *const _ as *const (), - &Self::VTABLE - ) - } - - unsafe fn clone(this: *const ()) -> task::RawWaker { - let waker = &*(this as *const Self); - Self::raw_waker(waker.context, waker.process.clone()) - } - - unsafe fn wake(this: *const ()) { - let waker = Box::from_raw(this as *const Self as *mut Self); - waker.context.reactivate(waker.process); - } - - unsafe fn wake_by_ref(this: *const ()) { - let waker = &*(this as *const Self); - waker.context.reactivate(waker.process.clone()); - } - - unsafe fn drop(this: *const ()) { - Box::from_raw(this as *const Self as *mut Self); + // memory is released in the main event loop } } @@ -403,8 +557,33 @@ impl Facility { } } -/// Complex channel with space for infinitely many elements of arbitrary -/// (non-process) type. +/// A one-shot, writable container type that awakens a preset process on write. +pub struct Promise<T> { + /// The waker of the process to awaken on write. + caller: task::Waker, + /// The written value to be extracted by the caller. + result: Cell<Option<T>> +} + +impl<T> Promise<T> { + /// Create a new promise with an unwritten result. + pub fn new(caller: task::Waker) -> Self { + Self { caller, result: Cell::new(None) } + } + + /// Write the result value and reawaken the caller. + pub fn fulfill(&self, result: T) { + self.result.replace(Some(result)); + self.caller.wake_by_ref(); + } + + /// Extract the written value and reset the state of the promise. + pub fn redeem(&self) -> Option<T> { + self.result.replace(None) + } +} + +/// Complex channel with space for infinitely many elements of arbitrary type. /// /// This channel supports arbitrary many readers and writers and uses wakers /// to reactivate suspended processes. @@ -454,7 +633,7 @@ impl<T: Unpin> Sender<T> { impl<T: Unpin> Receiver<T> { /// Returns a future that can be awaited to receive a message. pub fn recv(&self) -> ReceiveFuture<T> { - ReceiveFuture(&self.0) + ReceiveFuture(&self.0, None) } } @@ -469,6 +648,11 @@ impl<T> Channel<T> { self.waiting.pop_front() } + /// Private method that unregisters a previously registered waker. + fn unregister(&mut self, waker: task::Waker) { + self.waiting.retain(|elem| !waker.will_wake(elem)); + } + /// Private method inserting a message into the queue. fn send(&mut self, value: T) { self.store.push_back(value); @@ -488,7 +672,7 @@ pub struct SendFuture<'c,T>(&'c Weak<RefCell<Channel<T>>>, Option<T>); /// Future for the [`recv()`] operation on a channel receiver. /// /// [`recv()`]: struct.Receiver.html#method.recv -pub struct ReceiveFuture<'c,T>(&'c Rc<RefCell<Channel<T>>>); +pub struct ReceiveFuture<'c,T>(&'c Rc<RefCell<Channel<T>>>, Option<task::Waker>); impl<T: Unpin> Future for SendFuture<'_,T> { type Output = Result<(),T>; @@ -516,9 +700,12 @@ impl<T: Unpin> Future for SendFuture<'_,T> { impl<T: Unpin> Future for ReceiveFuture<'_,T> { type Output = Option<T>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut channel = self.0.borrow_mut(); + // clear the local copy of our waker + self.1 = None; + if let Some(c) = channel.recv() { // there are elements in the channel Poll::Ready(Some(c)) @@ -527,12 +714,23 @@ impl<T: Unpin> Future for ReceiveFuture<'_,T> { Poll::Ready(None) } else { // no elements, but potential senders exist: check back later - channel.enqueue(cx.waker().clone()); + let waker = cx.waker().clone(); + self.1 = Some(waker.clone()); + channel.enqueue(waker); Poll::Pending } } } +impl<T> Drop for ReceiveFuture<'_,T> { + fn drop(&mut self) { + // take care to unregister our waker from the channel + if let Some(waker) = self.1.take() { + self.0.borrow_mut().unregister(waker); + } + } +} + /* ************************* statistical facilities ************************* */ /// A simple collector for statistical data, inspired by SLX's random_variable. diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..19540e6452ead354ba9ecadf75b6eb53eb47a204 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,30 @@ +use simcore_rs::{Time, SimContext, Sender, Receiver, channel, simulation}; + +fn main() { + simulation( + (), + |sim| sim.process(async move { + println!("[{}] Begin", sim.now()); + + let (sx, rx) = channel(); + sim.activate(async move { + for i in 0.. { + sim.advance(1.0).await; + sx.send(i).await.ok(); + } + }); + + sim.until(async move { + while let Some(i) = rx.recv().await { + println!("[{}] Received {}", sim.now(), i); + if i >= 3 { break; } + } + + println!("[{}] Exit Receiver Process", sim.now()); + }, sim.advance(5.0)).await; + + sim.advance(10.0).await; + println!("[{}] End", sim.now()); + }) + ); +}