diff --git a/Cargo.lock b/Cargo.lock index f6d8aa0f08ef9554be78b44c8eeed69d82e0e159..6f3fcfb750e11c03311d5067879c0c0a621710d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstyle" -version = "1.0.8" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "autocfg" @@ -49,9 +49,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.31" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" +checksum = "a012a0df96dd6d06ba9a1b29d6402d1a5d77c6befd2566afdc26e10603dc93d7" dependencies = [ "jobserver", "libc", @@ -93,18 +93,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.20" +version = "4.5.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +checksum = "a8eb5e908ef3a6efbe1ed62520fb7287959888c88485abe072543190ecc66783" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.20" +version = "4.5.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +checksum = "96b01801b5fc6a0a232407abc821660c9c6d25a1cafc0d4f85f29fb8d9afc121" dependencies = [ "anstyle", "clap_lex", @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.2" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "criterion" @@ -154,9 +154,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" dependencies = [ "crossbeam-epoch", "crossbeam-utils", @@ -173,9 +173,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crunchy" @@ -218,9 +218,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.0" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" [[package]] name = "hermit-abi" @@ -230,9 +230,9 @@ checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" [[package]] name = "indexmap" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", "hashbrown", @@ -260,18 +260,18 @@ dependencies = [ [[package]] name = "itertools" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" dependencies = [ "either", ] [[package]] name = "itoa" -version = "1.0.11" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jobserver" @@ -284,24 +284,25 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" dependencies = [ + "once_cell", "wasm-bindgen", ] [[package]] name = "libc" -version = "0.2.161" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libm" -version = "0.2.8" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" [[package]] name = "log" @@ -376,18 +377,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" dependencies = [ "proc-macro2", ] @@ -466,9 +467,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -481,12 +482,6 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" -[[package]] -name = "rustc-hash" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" - [[package]] name = "ryu" version = "1.0.18" @@ -504,18 +499,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.213" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.213" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", @@ -524,9 +519,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "itoa", "memchr", @@ -547,18 +542,17 @@ dependencies = [ "cc", "criterion", "indexmap", - "itertools 0.13.0", + "itertools 0.14.0", "rand", "rand_distr", "rayon", - "rustc-hash", ] [[package]] name = "syn" -version = "2.0.85" +version = "2.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" +checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" dependencies = [ "proc-macro2", "quote", @@ -577,9 +571,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" [[package]] name = "walkdir" @@ -599,9 +593,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" dependencies = [ "cfg-if", "once_cell", @@ -610,13 +604,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", "syn", @@ -625,9 +618,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -635,9 +628,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", @@ -648,15 +641,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" [[package]] name = "web-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index a6c622983754a42d0a0bc6e52532eaa051eadb5b..9ef2a472a88762f8b77c5260b524ad9cd3216e1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,16 +17,15 @@ lto = true [dependencies] rand = { version = "0.8", default-features = false, features = ["small_rng"] } rand_distr = "0.4" -rayon = "1.5" +rayon = "1.10" indexmap = "2" -rustc-hash = "2" [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports"]} -itertools = "0.13" +itertools = "0.14" [build-dependencies] -cc = { version = "1.0", features = ["parallel"] } +cc = { version = "1.2", features = ["parallel"] } [[example]] name = "barbershop" diff --git a/build.rs b/build.rs index b19362b899fa89f1710eef011fc15cbcd76e5954..1b64b1dbce87d5a5395a7c251d300eb5b5d49700 100644 --- a/build.rs +++ b/build.rs @@ -1,105 +1,109 @@ // compiles the ODEMx-library and the C++ examples to link against fn main() { - // attempt to translate the library using the native platform's compiler - let result = cc::Build::new() - .cpp(true) - .define("RUST_FFI", None) - .opt_level(3) - .includes(&["odemx-lite/include", "odemx-lite/external/CppLog/include"]) - .flag_if_supported("-std=c++17") - .flag_if_supported("-std:c++17") - .flag_if_supported("-Wno-potentially-evaluated-expression") - .flag_if_supported("-Wno-deprecated-declarations") - .flag_if_supported("-flto") - .warnings(false) - .debug(false) - .files(&[ - "odemx-lite/src/base/Comparators.cpp", - "odemx-lite/src/base/Continuous.cpp", - "odemx-lite/src/base/DefaultSimulation.cpp", - "odemx-lite/src/base/Event.cpp", - "odemx-lite/src/base/ExecutionList.cpp", - "odemx-lite/src/base/Process.cpp", - "odemx-lite/src/base/Sched.cpp", - "odemx-lite/src/base/Scheduler.cpp", - "odemx-lite/src/base/Simulation.cpp", - "odemx-lite/src/base/control/ControlBase.cpp", - "odemx-lite/src/coroutine/Coroutine.cpp", - "odemx-lite/src/coroutine/CoroutineContext.cpp", - "odemx-lite/src/coroutine/ucFiber.cpp", - "odemx-lite/src/data/buffer/SimRecordBuffer.cpp", - "odemx-lite/src/data/buffer/StatisticsBuffer.cpp", - "odemx-lite/src/data/LoggingManager.cpp", - "odemx-lite/src/data/ManagedChannels.cpp", - "odemx-lite/src/data/output/ErrorWriter.cpp", - "odemx-lite/src/data/output/GermanTime.cpp", - "odemx-lite/src/data/output/Iso8601Time.cpp", - "odemx-lite/src/data/output/OStreamReport.cpp", - "odemx-lite/src/data/output/OStreamWriter.cpp", - "odemx-lite/src/data/output/TimeFormat.cpp", - "odemx-lite/src/data/Producer.cpp", - "odemx-lite/src/data/Report.cpp", - "odemx-lite/src/data/ReportProducer.cpp", - "odemx-lite/src/data/ReportTable.cpp", - "odemx-lite/src/data/SimRecord.cpp", - "odemx-lite/src/data/SimRecordFilter.cpp", - "odemx-lite/src/protocol/Device.cpp", - "odemx-lite/src/protocol/Entity.cpp", - "odemx-lite/src/protocol/ErrorModelDraw.cpp", - "odemx-lite/src/protocol/Layer.cpp", - "odemx-lite/src/protocol/Medium.cpp", - "odemx-lite/src/protocol/Sap.cpp", - "odemx-lite/src/protocol/Service.cpp", - "odemx-lite/src/protocol/ServiceProvider.cpp", - "odemx-lite/src/protocol/Stack.cpp", - "odemx-lite/src/random/ContinuousConst.cpp", - "odemx-lite/src/random/ContinuousDist.cpp", - "odemx-lite/src/random/DiscreteConst.cpp", - "odemx-lite/src/random/DiscreteDist.cpp", - "odemx-lite/src/random/Dist.cpp", - "odemx-lite/src/random/DistContext.cpp", - "odemx-lite/src/random/Draw.cpp", - "odemx-lite/src/random/Erlang.cpp", - "odemx-lite/src/random/NegativeExponential.cpp", - "odemx-lite/src/random/Normal.cpp", - "odemx-lite/src/random/Poisson.cpp", - "odemx-lite/src/random/RandomInt.cpp", - "odemx-lite/src/random/Uniform.cpp", - "odemx-lite/src/statistics/Accumulate.cpp", - "odemx-lite/src/statistics/Count.cpp", - "odemx-lite/src/statistics/Histogram.cpp", - "odemx-lite/src/statistics/Regression.cpp", - "odemx-lite/src/statistics/Sum.cpp", - "odemx-lite/src/statistics/Tab.cpp", - "odemx-lite/src/statistics/Tally.cpp", - "odemx-lite/src/synchronization/Bin.cpp", - "odemx-lite/src/synchronization/CondQ.cpp", - "odemx-lite/src/synchronization/IMemory.cpp", - "odemx-lite/src/synchronization/Memory.cpp", - "odemx-lite/src/synchronization/ProcessQueue.cpp", - "odemx-lite/src/synchronization/Queue.cpp", - "odemx-lite/src/synchronization/Res.cpp", - "odemx-lite/src/synchronization/Timer.cpp", - "odemx-lite/src/synchronization/Wait.cpp", - "odemx-lite/src/synchronization/WaitQ.cpp", - // example scenarios - "cpp/Barbershop/main.cpp", - "cpp/Ferry/main.cpp", - "cpp/Philosophers/main.cpp", - ]) - .try_compile("odemx"); + // only rerun this script if a source file in odemx has changed + println!("cargo:rerun-if-changed=odemx-lite/src"); + println!("cargo:rerun-if-changed=cpp"); + + // attempt to translate the library using the native platform's compiler + let result = cc::Build::new() + .cpp(true) + .define("RUST_FFI", None) + .opt_level(3) + .includes(&["odemx-lite/include", "odemx-lite/external/CppLog/include"]) + .flag_if_supported("-std=c++17") + .flag_if_supported("-std:c++17") + .flag_if_supported("-Wno-potentially-evaluated-expression") + .flag_if_supported("-Wno-deprecated-declarations") + .flag_if_supported("-flto") + .warnings(false) + .debug(false) + .files(&[ + "odemx-lite/src/base/Comparators.cpp", + "odemx-lite/src/base/Continuous.cpp", + "odemx-lite/src/base/DefaultSimulation.cpp", + "odemx-lite/src/base/Event.cpp", + "odemx-lite/src/base/ExecutionList.cpp", + "odemx-lite/src/base/Process.cpp", + "odemx-lite/src/base/Sched.cpp", + "odemx-lite/src/base/Scheduler.cpp", + "odemx-lite/src/base/Simulation.cpp", + "odemx-lite/src/base/control/ControlBase.cpp", + "odemx-lite/src/coroutine/Coroutine.cpp", + "odemx-lite/src/coroutine/CoroutineContext.cpp", + "odemx-lite/src/coroutine/ucFiber.cpp", + "odemx-lite/src/data/buffer/SimRecordBuffer.cpp", + "odemx-lite/src/data/buffer/StatisticsBuffer.cpp", + "odemx-lite/src/data/LoggingManager.cpp", + "odemx-lite/src/data/ManagedChannels.cpp", + "odemx-lite/src/data/output/ErrorWriter.cpp", + "odemx-lite/src/data/output/GermanTime.cpp", + "odemx-lite/src/data/output/Iso8601Time.cpp", + "odemx-lite/src/data/output/OStreamReport.cpp", + "odemx-lite/src/data/output/OStreamWriter.cpp", + "odemx-lite/src/data/output/TimeFormat.cpp", + "odemx-lite/src/data/Producer.cpp", + "odemx-lite/src/data/Report.cpp", + "odemx-lite/src/data/ReportProducer.cpp", + "odemx-lite/src/data/ReportTable.cpp", + "odemx-lite/src/data/SimRecord.cpp", + "odemx-lite/src/data/SimRecordFilter.cpp", + "odemx-lite/src/protocol/Device.cpp", + "odemx-lite/src/protocol/Entity.cpp", + "odemx-lite/src/protocol/ErrorModelDraw.cpp", + "odemx-lite/src/protocol/Layer.cpp", + "odemx-lite/src/protocol/Medium.cpp", + "odemx-lite/src/protocol/Sap.cpp", + "odemx-lite/src/protocol/Service.cpp", + "odemx-lite/src/protocol/ServiceProvider.cpp", + "odemx-lite/src/protocol/Stack.cpp", + "odemx-lite/src/random/ContinuousConst.cpp", + "odemx-lite/src/random/ContinuousDist.cpp", + "odemx-lite/src/random/DiscreteConst.cpp", + "odemx-lite/src/random/DiscreteDist.cpp", + "odemx-lite/src/random/Dist.cpp", + "odemx-lite/src/random/DistContext.cpp", + "odemx-lite/src/random/Draw.cpp", + "odemx-lite/src/random/Erlang.cpp", + "odemx-lite/src/random/NegativeExponential.cpp", + "odemx-lite/src/random/Normal.cpp", + "odemx-lite/src/random/Poisson.cpp", + "odemx-lite/src/random/RandomInt.cpp", + "odemx-lite/src/random/Uniform.cpp", + "odemx-lite/src/statistics/Accumulate.cpp", + "odemx-lite/src/statistics/Count.cpp", + "odemx-lite/src/statistics/Histogram.cpp", + "odemx-lite/src/statistics/Regression.cpp", + "odemx-lite/src/statistics/Sum.cpp", + "odemx-lite/src/statistics/Tab.cpp", + "odemx-lite/src/statistics/Tally.cpp", + "odemx-lite/src/synchronization/Bin.cpp", + "odemx-lite/src/synchronization/CondQ.cpp", + "odemx-lite/src/synchronization/IMemory.cpp", + "odemx-lite/src/synchronization/Memory.cpp", + "odemx-lite/src/synchronization/ProcessQueue.cpp", + "odemx-lite/src/synchronization/Queue.cpp", + "odemx-lite/src/synchronization/Res.cpp", + "odemx-lite/src/synchronization/Timer.cpp", + "odemx-lite/src/synchronization/Wait.cpp", + "odemx-lite/src/synchronization/WaitQ.cpp", + // example scenarios + "cpp/Barbershop/main.cpp", + "cpp/Ferry/main.cpp", + "cpp/Philosophers/main.cpp", + ]) + .try_compile("odemx"); - // there is no need to stop the build altogether if the compilation failed - if let Err(msg) = result { - println!("cargo::warning=Compiling ODEMx-lite has failed."); - println!( - "cargo::warning=This library will still work, \ + // there is no need to stop the build altogether if the compilation failed + if let Err(msg) = result { + println!("cargo::warning=Compiling ODEMx-lite has failed."); + println!( + "cargo::warning=This library will still work, \ the C++ benchmarks will not!" - ); - println!("cargo::warning={}", msg); - } else { - println!("cargo::rustc-cfg=odemx"); - } - - println!("cargo::rustc-check-cfg=cfg(odemx)"); + ); + println!("cargo::warning={}", msg); + } else { + println!("cargo::rustc-cfg=odemx"); + } + + println!("cargo::rustc-check-cfg=cfg(odemx)"); } diff --git a/examples/barbershop.rs b/examples/barbershop.rs index 67381278a1e52103e4a3f3ab4fbc48f24a07cf6f..1bffebd6b2ef0b4738d2eb9800b9dfc923f6d7cf 100644 --- a/examples/barbershop.rs +++ b/examples/barbershop.rs @@ -14,8 +14,8 @@ use rand::{rngs::SmallRng, Rng}; use simcore_rs::{ - util::{Facility, RandomVar}, - Process, Sim, Time, + util::{Facility, RandomVariable}, + Process, Sim, Time, }; use std::cell::RefCell; @@ -25,41 +25,41 @@ const SEED_S: u64 = 200_000; /// Globally shared data for the barbershop simulation. struct Barbershop { - /// Random number generator for customer arrival times. - rng_a: RefCell<SmallRng>, - /// Random number generator for service times. - rng_s: RefCell<SmallRng>, - /// Facility representing the barber (Joe). - joe: Facility, - /// Statistical accumulator for customer wait times. - wait_time: RandomVar, + /// Random number generator for customer arrival times. + rng_a: RefCell<SmallRng>, + /// Random number generator for service times. + rng_s: RefCell<SmallRng>, + /// Facility representing the barber (Joe). + joe: Facility, + /// Statistical accumulator for customer wait times. + wait_time: RandomVariable, } /// Represents a customer in the barbershop simulation. struct Customer; impl Customer { - /// Defines the actions performed by a customer in the simulation. - /// - /// The customer arrives at the barbershop, waits for the barber to be - /// available, gets a haircut (spends time being serviced), and then leaves. - pub async fn actions(self, sim: Sim<'_, Barbershop>) { - // Record the arrival time for wait time calculation. - let arrival_time = sim.now(); - - // Seize the barber (wait if not available). - sim.global().joe.seize().await; - - // Calculate and record the customer's wait time. - sim.global().wait_time.tabulate(sim.now() - arrival_time); - - // Simulate the time taken for the haircut. - let cut = sim.global().rng_s.borrow_mut().gen_range(12.0..18.0); - sim.advance(cut).await; - - // Release the barber for the next customer. - sim.global().joe.release(); - } + /// Defines the actions performed by a customer in the simulation. + /// + /// The customer arrives at the barbershop, waits for the barber to be + /// available, gets a haircut (spends time being serviced), and then leaves. + pub async fn actions(self, sim: Sim<'_, Barbershop>) { + // Record the arrival time for wait time calculation. + let arrival_time = sim.now(); + + // Seize the barber (wait if not available). + sim.global().joe.seize().await; + + // Calculate and record the customer's wait time. + sim.global().wait_time.tabulate(sim.now() - arrival_time); + + // Simulate the time taken for the haircut. + let cut = sim.global().rng_s.borrow_mut().gen_range(12.0..18.0); + sim.advance(cut).await; + + // Release the barber for the next customer. + sim.global().joe.release(); + } } /// The main simulation function. @@ -68,28 +68,28 @@ impl Customer { /// for the specified duration, and ensures that all customers are serviced /// before the simulation ends. async fn sim_main(sim: Sim<'_, Barbershop>, duration: Time) { - // Activate a process to generate customers at random intervals. - sim.activate(async move { - loop { - // Wait for a random time before the next customer arrives. - let wait_time = sim.global().rng_a.borrow_mut().gen_range(12.0..24.0); - sim.advance(wait_time).await; - - // If the simulation time exceeds the duration, stop generating customers. - if sim.now() >= duration { - return; - } - - // Activate a new customer process. - sim.activate(Customer.actions(sim)); - } - }); - - // Run the simulation until the store closes. - sim.advance(duration).await; - - // Ensure the barber finishes servicing any remaining customers. - sim.global().joe.seize().await; + // Activate a process to generate customers at random intervals. + sim.activate(async move { + loop { + // Wait for a random time before the next customer arrives. + let wait_time = sim.global().rng_a.borrow_mut().gen_range(12.0..24.0); + sim.advance(wait_time).await; + + // If the simulation time exceeds the duration, stop generating customers. + if sim.now() >= duration { + return; + } + + // Activate a new customer process. + sim.activate(Customer.actions(sim)); + } + }); + + // Run the simulation until the store closes. + sim.advance(duration).await; + + // Ensure the barber finishes servicing any remaining customers. + sim.global().joe.seize().await; } /// Entry point for the barbershop simulation. @@ -98,23 +98,23 @@ async fn sim_main(sim: Sim<'_, Barbershop>, duration: Time) { /// specified duration. #[cfg(not(test))] fn main() { - use rand::SeedableRng; - - // Run the simulation and collect the result. - let result = simcore_rs::simulation( - // Initialize the global data for the simulation. - Barbershop { - rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), - rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), - joe: Facility::new(), - wait_time: RandomVar::new(), - }, - // Simulation entry point. - |sim| Process::new(sim, sim_main(sim, 3.0 * 7.0 * 24.0 * 60.0)), - ); - - // Print statistics after the simulation ends. - println!("wait_time: {:#.3?}", result.wait_time); + use rand::SeedableRng; + + // Run the simulation and collect the result. + let result = simcore_rs::simulation( + // Initialize the global data for the simulation. + Barbershop { + rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), + rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), + joe: Facility::new(), + wait_time: RandomVariable::new(), + }, + // Simulation entry point. + |sim| Process::new(sim, sim_main(sim, 3.0 * 7.0 * 24.0 * 60.0)), + ); + + // Print statistics after the simulation ends. + println!("wait_time: {:#.3?}", result.wait_time); } #[cfg(test)] @@ -129,102 +129,102 @@ mod slx; /// implementations and simulation durations. #[cfg(test)] mod bench { - use super::*; - use criterion::{ - criterion_group, AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, - }; - - #[cfg(odemx)] - mod odemx { - use std::os::raw::c_double; - - #[link(name = "odemx", kind = "static")] - extern "C" { - pub fn barbershop(duration: c_double); - } - } - - #[cfg(windows)] - const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; - const RANGE: u32 = 10; - const STEP: Time = 1000.0; - - /// Benchmarks the barbershop simulation using different implementations and - /// simulation durations. - fn barbershop_bench(c: &mut Criterion) { - use rand::{rngs::SmallRng, SeedableRng}; - - let mut group = c.benchmark_group("Barbershop"); - - // Set up the benchmark parameters. - group.confidence_level(0.99); - group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); - - #[cfg(windows)] - let slx_path = { - let path = slx::slx_version(SLX_PATH); - - if path.is_none() { - println!("SLX not found, skipping SLX benchmarks!"); - } else { - println!("Using SLX program at {:?}", path.as_ref().unwrap()); - } - - path - }; - - // Vary the length of the simulation run. - for sim_duration in (0..RANGE).map(|c| Time::from(1 << c) * STEP) { - #[cfg(windows)] - if let Some(path) = slx_path.as_ref() { - let duration = sim_duration.to_string(); - let args = [ - "/silent", - "/stdout", - "/noicon", - "/nowarn", - "/noxwarn", - "/#BENCH", - "slx\\barbershop.slx", - duration.as_str(), - ]; - - // Benchmark the SLX implementation. - group.bench_function(BenchmarkId::new("SLX", sim_duration), |b| { - b.iter_custom(|iters| { - slx::slx_bench(path.as_os_str(), &args, iters as usize) - .expect("couldn't benchmark the SLX program") - }) - }); - } - - // Benchmark the C++ implementation. - #[cfg(odemx)] - group.bench_function(BenchmarkId::new("ODEMx", sim_duration), |b| { - b.iter(|| unsafe { odemx::barbershop(sim_duration as _) }) - }); - - // Benchmark the Rust implementation. - group.bench_function(BenchmarkId::new("Rust", sim_duration), |b| { - b.iter_batched( - || Barbershop { - rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), - rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), - joe: Facility::new(), - wait_time: RandomVar::new(), - }, - |shared| { - simcore_rs::simulation(shared, |sim| { - Process::new(sim, sim_main(sim, sim_duration)) - }) - }, - BatchSize::SmallInput, - ) - }); - } - - group.finish(); - } - - criterion_group!(benches, barbershop_bench); + use super::*; + use criterion::{ + criterion_group, AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, + }; + + #[cfg(odemx)] + mod odemx { + use std::os::raw::c_double; + + #[link(name = "odemx", kind = "static")] + extern "C" { + pub fn barbershop(duration: c_double); + } + } + + #[cfg(windows)] + const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; + const RANGE: u32 = 10; + const STEP: Time = 1000.0; + + /// Benchmarks the barbershop simulation using different implementations and + /// simulation durations. + fn barbershop_bench(c: &mut Criterion) { + use rand::{rngs::SmallRng, SeedableRng}; + + let mut group = c.benchmark_group("Barbershop"); + + // Set up the benchmark parameters. + group.confidence_level(0.99); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + #[cfg(windows)] + let slx_path = { + let path = slx::slx_version(SLX_PATH); + + if path.is_none() { + println!("SLX not found, skipping SLX benchmarks!"); + } else { + println!("Using SLX program at {:?}", path.as_ref().unwrap()); + } + + path + }; + + // Vary the length of the simulation run. + for sim_duration in (0..RANGE).map(|c| Time::from(1 << c) * STEP) { + #[cfg(windows)] + if let Some(path) = slx_path.as_ref() { + let duration = sim_duration.to_string(); + let args = [ + "/silent", + "/stdout", + "/noicon", + "/nowarn", + "/noxwarn", + "/#BENCH", + "slx\\barbershop.slx", + duration.as_str(), + ]; + + // Benchmark the SLX implementation. + group.bench_function(BenchmarkId::new("SLX", sim_duration), |b| { + b.iter_custom(|iters| { + slx::slx_bench(path.as_os_str(), &args, iters as usize) + .expect("couldn't benchmark the SLX program") + }) + }); + } + + // Benchmark the C++ implementation. + #[cfg(odemx)] + group.bench_function(BenchmarkId::new("ODEMx", sim_duration), |b| { + b.iter(|| unsafe { odemx::barbershop(sim_duration as _) }) + }); + + // Benchmark the Rust implementation. + group.bench_function(BenchmarkId::new("Rust", sim_duration), |b| { + b.iter_batched( + || Barbershop { + rng_a: RefCell::new(SmallRng::seed_from_u64(SEED_A)), + rng_s: RefCell::new(SmallRng::seed_from_u64(SEED_S)), + joe: Facility::new(), + wait_time: RandomVariable::new(), + }, + |shared| { + simcore_rs::simulation(shared, |sim| { + Process::new(sim, sim_main(sim, sim_duration)) + }) + }, + BatchSize::SmallInput, + ) + }); + } + + group.finish(); + } + + criterion_group!(benches, barbershop_bench); } diff --git a/examples/ferry.rs b/examples/ferry.rs index b80f289c9b02fc2c501186e34482460c03af4daa..cbf0e27e428ab91cdd1e3246305cea470206fc05 100644 --- a/examples/ferry.rs +++ b/examples/ferry.rs @@ -17,8 +17,8 @@ use rand::{rngs::SmallRng, Rng, SeedableRng}; use rand_distr::{Distribution, Exp, Normal}; use simcore_rs::{ - util::{channel, select, RandomVar, Receiver, Sender}, - Process, Sim, Time, + util::{channel, select, RandomVariable, Receiver, Sender}, + Process, Sim, Time, }; use std::cell::RefCell; @@ -32,121 +32,123 @@ const FERRY_CAPACITY: usize = 5; /// Shared global data for the ferry simulation. struct Ferries { - /// Master random number generator for seeding. - master_rng: RefCell<SmallRng>, - /// Statistical accumulator for ferry cargo lengths. - ferry_cargo_len: RandomVar, - /// Statistical accumulator for ferry load times. - ferry_load_time: RandomVar, - /// Statistical accumulator for car wait times. - car_wait_time: RandomVar, + /// Master random number generator for seeding. + master_rng: RefCell<SmallRng>, + /// Statistical accumulator for ferry cargo lengths. + ferry_cargo_len: RandomVariable, + /// Statistical accumulator for ferry load times. + ferry_load_time: RandomVariable, + /// Statistical accumulator for car wait times. + car_wait_time: RandomVariable, } /// Represents a car in the simulation. #[derive(Debug)] struct Car { - /// Time when the car arrives at the pier. - arrival_time: Time, - /// Duration required to load the car onto the ferry. - load_duration: Time, + /// Time when the car arrives at the pier. + arrival_time: Time, + /// Duration required to load the car onto the ferry. + load_duration: Time, } /// Represents a pier (harbor) where cars arrive and wait for ferries. struct Pier { - /// Random number generator specific to this pier. - rng: SmallRng, - /// Channel to send cars to the ferry. - landing_site: Sender<Car>, + /// Random number generator specific to this pier. + rng: SmallRng, + /// Channel to send cars to the ferry. + landing_site: Sender<Car>, } /// Represents a ferry that transports cars between harbors. struct Ferry { - /// Cargo hold containing the cars on the ferry. - cargo: Vec<Car>, - /// Timeout duration for ferry departure. - timeout: Time, - /// Time taken to travel between harbors. - travel_time: Time, - /// Receivers from piers to accept arriving cars. - piers: Vec<Receiver<Car>>, + /// Cargo hold containing the cars on the ferry. + cargo: Vec<Car>, + /// Timeout duration for ferry departure. + timeout: Time, + /// Time taken to travel between harbors. + travel_time: Time, + /// Receivers from piers to accept arriving cars. + piers: Vec<Receiver<Car>>, } impl Pier { - /// Simulates the actions of a pier in the simulation. - /// - /// Cars arrive at the pier following an exponential distribution - /// and are sent to the ferry when it arrives. - async fn actions(mut self, sim: Sim<'_, Ferries>) { - // Arrival and loading time distributions. - let arrival_delay = Exp::new(0.1).unwrap(); - let loading_delay = Normal::new(0.5, 0.2).unwrap(); - - loop { - // Wait for the next car to arrive. - sim.advance(arrival_delay.sample(&mut self.rng)).await; - - // Create a new car with arrival and load times. - self.landing_site - .send(Car { - arrival_time: sim.now(), - load_duration: loading_delay - .sample_iter(&mut self.rng) - .find(|&val| val >= 0.0) - .unwrap(), - }) - .await - .expect("No ferries in the simulation"); - } - } + /// Simulates the actions of a pier in the simulation. + /// + /// Cars arrive at the pier following an exponential distribution + /// and are sent to the ferry when it arrives. + async fn actions(mut self, sim: Sim<'_, Ferries>) { + // Arrival and loading time distributions. + let arrival_delay = Exp::new(0.1).unwrap(); + let loading_delay = Normal::new(0.5, 0.2).unwrap(); + + loop { + // Wait for the next car to arrive. + sim.advance(arrival_delay.sample(&mut self.rng)).await; + + // Create a new car with arrival and load times. + self.landing_site + .send(Car { + arrival_time: sim.now(), + load_duration: loading_delay + .sample_iter(&mut self.rng) + .find(|&val| val >= 0.0) + .unwrap(), + }) + .await + .expect("No ferries in the simulation"); + } + } } impl Ferry { - /// Simulates the actions of a ferry in the simulation. - /// - /// The ferry travels between harbors, loads cars, waits for a timeout or - /// until it reaches capacity, and then moves to the next harbor. - async fn actions(mut self, sim: Sim<'_, Ferries>) { - loop { - for pier in self.piers.iter() { - // Unload the cars at the current pier. - for car in self.cargo.drain(..) { - sim.advance(car.load_duration).await; - } - - let begin_loading = sim.now(); - - // Load cars until capacity is reached or timeout occurs. - while self.cargo.len() < self.cargo.capacity() { - match select(sim, pier.recv(), async { - sim.advance(self.timeout).await; - None - }).await { - // A car arrived before timeout. - Some(car) => { - sim.global() - .car_wait_time - .tabulate(sim.now() - car.arrival_time); - sim.advance(car.load_duration).await; - self.cargo.push(car); - } - // Timeout occurred; depart to next harbor. - None => break, - } - } - - // Record ferry loading statistics. - sim.global() - .ferry_load_time - .tabulate(sim.now() - begin_loading); - sim.global() - .ferry_cargo_len - .tabulate(self.cargo.len() as f64); - - // Travel to the next harbor. - sim.advance(self.travel_time).await; - } - } - } + /// Simulates the actions of a ferry in the simulation. + /// + /// The ferry travels between harbors, loads cars, waits for a timeout or + /// until it reaches capacity, and then moves to the next harbor. + async fn actions(mut self, sim: Sim<'_, Ferries>) { + loop { + for pier in self.piers.iter() { + // Unload the cars at the current pier. + for car in self.cargo.drain(..) { + sim.advance(car.load_duration).await; + } + + let begin_loading = sim.now(); + + // Load cars until capacity is reached or timeout occurs. + while self.cargo.len() < self.cargo.capacity() { + match select(sim, pier.recv(), async { + sim.advance(self.timeout).await; + None + }) + .await + { + // A car arrived before timeout. + Some(car) => { + sim.global() + .car_wait_time + .tabulate(sim.now() - car.arrival_time); + sim.advance(car.load_duration).await; + self.cargo.push(car); + } + // Timeout occurred; depart to next harbor. + None => break, + } + } + + // Record ferry loading statistics. + sim.global() + .ferry_load_time + .tabulate(sim.now() - begin_loading); + sim.global() + .ferry_cargo_len + .tabulate(self.cargo.len() as f64); + + // Travel to the next harbor. + sim.advance(self.travel_time).await; + } + } + } } /// The main simulation function. @@ -154,47 +156,47 @@ impl Ferry { /// Sets up the piers and ferries, activates their processes, and runs the simulation /// for the specified duration. async fn sim_main(sim: Sim<'_, Ferries>, duration: Time, ferries: usize, harbors: usize) { - let mut ports = Vec::with_capacity(harbors); - - // Create all the harbors (piers). - for _ in 0..harbors { - let (sx, rx) = channel(); - let harbor = Pier { - rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), - landing_site: sx, - }; - sim.activate(harbor.actions(sim)); - ports.push(rx); - } - - // Create all the ferries. - for i in 0..ferries { - let ferry = Ferry { - cargo: Vec::with_capacity(FERRY_CAPACITY), - timeout: FERRY_TIMEOUT, - travel_time: HARBOR_DISTANCE, - piers: ports - .iter() - .skip(i) - .chain(ports.iter().take(i)) - .cloned() - .collect(), - }; - sim.activate(ferry.actions(sim)); - } - - // Run the simulation for the specified duration. - sim.advance(duration).await; - - // Handle any cars that weren't picked up by a ferry. - for port in ports { - for _ in 0..port.len() { - let car = port.recv().await.unwrap(); - sim.global() - .car_wait_time - .tabulate(sim.now() - car.arrival_time); - } - } + let mut ports = Vec::with_capacity(harbors); + + // Create all the harbors (piers). + for _ in 0..harbors { + let (sx, rx) = channel(); + let harbor = Pier { + rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), + landing_site: sx, + }; + sim.activate(harbor.actions(sim)); + ports.push(rx); + } + + // Create all the ferries. + for i in 0..ferries { + let ferry = Ferry { + cargo: Vec::with_capacity(FERRY_CAPACITY), + timeout: FERRY_TIMEOUT, + travel_time: HARBOR_DISTANCE, + piers: ports + .iter() + .skip(i) + .chain(ports.iter().take(i)) + .cloned() + .collect(), + }; + sim.activate(ferry.actions(sim)); + } + + // Run the simulation for the specified duration. + sim.advance(duration).await; + + // Handle any cars that weren't picked up by a ferry. + for port in ports { + for _ in 0..port.len() { + let car = port.recv().await.unwrap(); + sim.global() + .car_wait_time + .tabulate(sim.now() - car.arrival_time); + } + } } /// Entry point for the ferry simulation. @@ -202,29 +204,29 @@ async fn sim_main(sim: Sim<'_, Ferries>, duration: Time, ferries: usize, harbors /// Initializes the simulation environment and runs the simulation for one week. #[cfg(not(test))] fn main() { - let result = simcore_rs::simulation( - // Global shared data. - Ferries { - master_rng: RefCell::new(SmallRng::seed_from_u64(SEED)), - ferry_cargo_len: RandomVar::new(), - ferry_load_time: RandomVar::new(), - car_wait_time: RandomVar::new(), - }, - // Simulation entry point. - |sim| { - Process::new( - sim, - sim_main(sim, 24.0 * 60.0 * 365.0, FERRY_COUNT, HARBOR_COUNT), - ) - }, - ); - - // Output simulation statistics. - println!("Number of harbors: {}", HARBOR_COUNT); - println!("Number of ferries: {}", FERRY_COUNT); - println!("Car wait time: {:#.3?}", result.car_wait_time); - println!("Ferry cargo len: {:#.3?}", result.ferry_cargo_len); - println!("Ferry load time: {:#.3?}", result.ferry_load_time); + let result = simcore_rs::simulation( + // Global shared data. + Ferries { + master_rng: RefCell::new(SmallRng::seed_from_u64(SEED)), + ferry_cargo_len: RandomVariable::new(), + ferry_load_time: RandomVariable::new(), + car_wait_time: RandomVariable::new(), + }, + // Simulation entry point. + |sim| { + Process::new( + sim, + sim_main(sim, 24.0 * 60.0 * 365.0, FERRY_COUNT, HARBOR_COUNT), + ) + }, + ); + + // Output simulation statistics. + println!("Number of harbors: {}", HARBOR_COUNT); + println!("Number of ferries: {}", FERRY_COUNT); + println!("Car wait time: {:#.3?}", result.car_wait_time); + println!("Ferry cargo len: {:#.3?}", result.ferry_cargo_len); + println!("Ferry load time: {:#.3?}", result.ferry_load_time); } #[cfg(test)] @@ -239,105 +241,105 @@ mod slx; /// implementations and simulation durations. #[cfg(test)] mod bench { - use super::*; - use criterion::{ - criterion_group, AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, - }; - - #[cfg(feature = "odemx")] - mod odemx { - use std::os::raw::{c_double, c_uint}; - - #[link(name = "odemx", kind = "static")] - extern "C" { - pub fn ferry(duration: c_double, ferries: c_uint, harbors: c_uint); - } - } - - #[cfg(windows)] - const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; - const RANGE: u32 = 10; - const STEP: Time = 1000.0; - - /// Benchmarks the ferry simulation using different implementations and - /// simulation durations. - fn ferry_bench(c: &mut Criterion) { - let mut group = c.benchmark_group("Ferry"); - - // Set up the benchmark parameters. - group.confidence_level(0.99); - group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); - - #[cfg(windows)] - let slx_path = { - let path = slx::slx_version(SLX_PATH); - - if path.is_none() { - println!("SLX not found, skipping SLX benchmarks!"); - } else { - println!("Using SLX program at {:?}", path.as_ref().unwrap()); - } - - path - }; - - // Vary the length of the simulation run. - for (i, sim_duration) in (0..RANGE).map(|c| Time::from(1 << c) * STEP).enumerate() { - #[cfg(windows)] - if let Some(path) = slx_path.as_ref() { - let duration = sim_duration.to_string(); - let args = [ - "/silent", - "/stdout", - "/noicon", - "/nowarn", - "/noxwarn", - "/#BENCH", - "slx\\ferry.slx", - duration.as_str(), - ]; - - // Benchmark the SLX implementation. - group.bench_function(BenchmarkId::new("SLX", sim_duration), |b| { - b.iter_custom(|iters| { - slx::slx_bench(path.as_os_str(), &args, iters as usize) - .expect("Couldn't benchmark the SLX program") - }) - }); - } - - // Benchmark the C++ implementation. - #[cfg(feature = "odemx")] - group.bench_function(BenchmarkId::new("ODEMx", sim_duration), |b| { - b.iter(|| unsafe { - odemx::ferry(sim_duration as _, FERRY_COUNT as _, HARBOR_COUNT as _) - }) - }); - - // Benchmark the Rust implementation. - group.bench_function(BenchmarkId::new("Rust", sim_duration), |b| { - b.iter_batched( - || Ferries { - master_rng: RefCell::new(SmallRng::seed_from_u64(SEED * ((i + 1) as u64))), - ferry_cargo_len: RandomVar::new(), - ferry_load_time: RandomVar::new(), - car_wait_time: RandomVar::new(), - }, - |shared| { - simcore_rs::simulation(shared, |sim| { - Process::new( - sim, - sim_main(sim, sim_duration, FERRY_COUNT, HARBOR_COUNT), - ) - }) - }, - BatchSize::SmallInput, - ) - }); - } - - group.finish(); - } - - criterion_group!(benches, ferry_bench); + use super::*; + use criterion::{ + criterion_group, AxisScale, BatchSize, BenchmarkId, Criterion, PlotConfiguration, + }; + + #[cfg(feature = "odemx")] + mod odemx { + use std::os::raw::{c_double, c_uint}; + + #[link(name = "odemx", kind = "static")] + extern "C" { + pub fn ferry(duration: c_double, ferries: c_uint, harbors: c_uint); + } + } + + #[cfg(windows)] + const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; + const RANGE: u32 = 10; + const STEP: Time = 1000.0; + + /// Benchmarks the ferry simulation using different implementations and + /// simulation durations. + fn ferry_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("Ferry"); + + // Set up the benchmark parameters. + group.confidence_level(0.99); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + #[cfg(windows)] + let slx_path = { + let path = slx::slx_version(SLX_PATH); + + if path.is_none() { + println!("SLX not found, skipping SLX benchmarks!"); + } else { + println!("Using SLX program at {:?}", path.as_ref().unwrap()); + } + + path + }; + + // Vary the length of the simulation run. + for (i, sim_duration) in (0..RANGE).map(|c| Time::from(1 << c) * STEP).enumerate() { + #[cfg(windows)] + if let Some(path) = slx_path.as_ref() { + let duration = sim_duration.to_string(); + let args = [ + "/silent", + "/stdout", + "/noicon", + "/nowarn", + "/noxwarn", + "/#BENCH", + "slx\\ferry.slx", + duration.as_str(), + ]; + + // Benchmark the SLX implementation. + group.bench_function(BenchmarkId::new("SLX", sim_duration), |b| { + b.iter_custom(|iters| { + slx::slx_bench(path.as_os_str(), &args, iters as usize) + .expect("Couldn't benchmark the SLX program") + }) + }); + } + + // Benchmark the C++ implementation. + #[cfg(feature = "odemx")] + group.bench_function(BenchmarkId::new("ODEMx", sim_duration), |b| { + b.iter(|| unsafe { + odemx::ferry(sim_duration as _, FERRY_COUNT as _, HARBOR_COUNT as _) + }) + }); + + // Benchmark the Rust implementation. + group.bench_function(BenchmarkId::new("Rust", sim_duration), |b| { + b.iter_batched( + || Ferries { + master_rng: RefCell::new(SmallRng::seed_from_u64(SEED * ((i + 1) as u64))), + ferry_cargo_len: RandomVariable::new(), + ferry_load_time: RandomVariable::new(), + car_wait_time: RandomVariable::new(), + }, + |shared| { + simcore_rs::simulation(shared, |sim| { + Process::new( + sim, + sim_main(sim, sim_duration, FERRY_COUNT, HARBOR_COUNT), + ) + }) + }, + BatchSize::SmallInput, + ) + }); + } + + group.finish(); + } + + criterion_group!(benches, ferry_bench); } diff --git a/examples/philosophers.rs b/examples/philosophers.rs index d3cdedfe1d921f1ce538716253d907a47db9cc11..6e1e2d152c0eea91571a5ca9e846316d8417a4ec 100644 --- a/examples/philosophers.rs +++ b/examples/philosophers.rs @@ -19,12 +19,12 @@ use rand::{rngs::SmallRng, Rng, SeedableRng}; use rand_distr::{Distribution, Exp, Normal, Uniform}; use rayon::prelude::*; use simcore_rs::{ - util::{until, Control, RandomVar}, - Process, Sim, Time, + util::{until, Control, RandomVariable}, + Process, Sim, Time, }; use std::{ - cell::{Cell, RefCell}, - rc::Rc, + cell::{Cell, RefCell}, + rc::Rc, }; // Constants for simulation parameters. @@ -33,125 +33,125 @@ const SEED: u64 = 100_000; /// Shared global data for the simulation. struct Philosophers { - /// Master random number generator for seeding. - master_rng: RefCell<SmallRng>, - /// Cell to store the simulation duration until deadlock. - sim_duration: Cell<Time>, + /// Master random number generator for seeding. + master_rng: RefCell<SmallRng>, + /// Cell to store the simulation duration until deadlock. + sim_duration: Cell<Time>, } /// Represents the shared table with forks for the philosophers. struct Table { - /// Controls for each fork, indicating whether it is held. - forks: Vec<Control<bool>>, - /// Counter for the number of forks currently held. - forks_held: Control<usize>, - /// Counter for the number of forks currently awaited. - forks_awaited: Control<usize>, + /// Controls for each fork, indicating whether it is held. + forks: Vec<Control<bool>>, + /// Counter for the number of forks currently held. + forks_held: Control<usize>, + /// Counter for the number of forks currently awaited. + forks_awaited: Control<usize>, } impl Table { - /// Creates a new table with a specified number of forks. - fn new(forks: usize) -> Self { - Table { - forks: (0..forks).map(|_| Control::new(false)).collect(), - forks_held: Control::default(), - forks_awaited: Control::default(), - } - } - - /// Acquires a fork at a given position, blocking until it is available. - async fn acquire_fork(&self, i: usize) { - let num = i % self.forks.len(); - self.forks_awaited.set(self.forks_awaited.get() + 1); - until(&self.forks[num], |fork| !fork.get()).await; - self.forks[num].set(true); - self.forks_awaited.set(self.forks_awaited.get() - 1); - self.forks_held.set(self.forks_held.get() + 1); - } - - /// Releases a fork at a given position back to the table. - fn release_fork(&self, i: usize) { - self.forks[i % self.forks.len()].set(false); - self.forks_held.set(self.forks_held.get() - 1); - } + /// Creates a new table with a specified number of forks. + fn new(forks: usize) -> Self { + Table { + forks: (0..forks).map(|_| Control::new(false)).collect(), + forks_held: Control::default(), + forks_awaited: Control::default(), + } + } + + /// Acquires a fork at a given position, blocking until it is available. + async fn acquire_fork(&self, i: usize) { + let num = i % self.forks.len(); + self.forks_awaited.set(self.forks_awaited.get() + 1); + until(&self.forks[num], |fork| !fork.get()).await; + self.forks[num].set(true); + self.forks_awaited.set(self.forks_awaited.get() - 1); + self.forks_held.set(self.forks_held.get() + 1); + } + + /// Releases a fork at a given position back to the table. + fn release_fork(&self, i: usize) { + self.forks[i % self.forks.len()].set(false); + self.forks_held.set(self.forks_held.get() - 1); + } } /// Represents a philosopher in the simulation. struct Philosopher { - /// Reference to the shared table. - table: Rc<Table>, - /// The seat index of the philosopher at the table. - seat: usize, - /// Random number generator for this philosopher. - rng: SmallRng, + /// Reference to the shared table. + table: Rc<Table>, + /// The seat index of the philosopher at the table. + seat: usize, + /// Random number generator for this philosopher. + rng: SmallRng, } impl Philosopher { - /// Simulates the actions of a philosopher. - /// - /// The philosopher alternates between thinking and eating, and tries to - /// acquire forks. - async fn actions(mut self, sim: Sim<'_, Philosophers>) { - let thinking_duration = Exp::new(1.0).unwrap(); - let artificial_delay = Uniform::new(0.1, 0.2); - let eating_duration = Normal::new(0.5, 0.2).unwrap(); - - loop { - // Spend some time pondering the nature of things. - sim.advance(thinking_duration.sample(&mut self.rng)).await; - - // Acquire the first fork. - self.table.acquire_fork(self.seat).await; - - // Introduce an artificial delay to leave room for deadlocks. - sim.advance(artificial_delay.sample(&mut self.rng)).await; - - // Acquire the second fork. - self.table.acquire_fork(self.seat + 1).await; - - // Spend some time eating. - sim.advance( - eating_duration - .sample_iter(&mut self.rng) - .find(|&val| val >= 0.0) - .unwrap(), - ) - .await; - - // Release the forks. - self.table.release_fork(self.seat + 1); - self.table.release_fork(self.seat); - } - } + /// Simulates the actions of a philosopher. + /// + /// The philosopher alternates between thinking and eating, and tries to + /// acquire forks. + async fn actions(mut self, sim: Sim<'_, Philosophers>) { + let thinking_duration = Exp::new(1.0).unwrap(); + let artificial_delay = Uniform::new(0.1, 0.2); + let eating_duration = Normal::new(0.5, 0.2).unwrap(); + + loop { + // Spend some time pondering the nature of things. + sim.advance(thinking_duration.sample(&mut self.rng)).await; + + // Acquire the first fork. + self.table.acquire_fork(self.seat).await; + + // Introduce an artificial delay to leave room for deadlocks. + sim.advance(artificial_delay.sample(&mut self.rng)).await; + + // Acquire the second fork. + self.table.acquire_fork(self.seat + 1).await; + + // Spend some time eating. + sim.advance( + eating_duration + .sample_iter(&mut self.rng) + .find(|&val| val >= 0.0) + .unwrap(), + ) + .await; + + // Release the forks. + self.table.release_fork(self.seat + 1); + self.table.release_fork(self.seat); + } + } } /// Runs a single simulation instance of the Dining Philosophers problem. /// /// Initializes the table and philosophers, and waits until a deadlock occurs. async fn sim_main(sim: Sim<'_, Philosophers>, count: usize) { - let table = Rc::new(Table::new(count)); - - // Create the philosopher processes and seat them. - for i in 0..count { - sim.activate( - Philosopher { - table: table.clone(), - seat: i, - rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), - } - .actions(sim), - ); - } - - // Wait for the precise configuration indicating a deadlock. - until( - (&table.forks_held, &table.forks_awaited), - |(held, awaited)| held.get() == count && awaited.get() == count, - ) - .await; - - // Record the current simulation time. - sim.global().sim_duration.set(sim.now()); + let table = Rc::new(Table::new(count)); + + // Create the philosopher processes and seat them. + for i in 0..count { + sim.activate( + Philosopher { + table: table.clone(), + seat: i, + rng: SmallRng::from_seed(sim.global().master_rng.borrow_mut().gen()), + } + .actions(sim), + ); + } + + // Wait for the precise configuration indicating a deadlock. + until( + (&table.forks_held, &table.forks_awaited), + |(held, awaited)| held.get() == count && awaited.get() == count, + ) + .await; + + // Record the current simulation time. + sim.global().sim_duration.set(sim.now()); } /// Runs multiple simulation instances in parallel and collects statistics. @@ -165,37 +165,31 @@ async fn sim_main(sim: Sim<'_, Philosophers>, count: usize) { /// /// A `RandomVar` containing statistical data of simulation durations until /// deadlock. -fn philosophers(count: usize, reruns: usize) -> RandomVar { - // Use thread-based parallelism to concurrently run simulation models. - (1..=reruns) - .into_par_iter() - .map(|i| { - simcore_rs::simulation( - // Global data. - Philosophers { - master_rng: RefCell::new(SmallRng::seed_from_u64(i as u64 * SEED)), - sim_duration: Cell::default(), - }, - // Simulation entry point. - |sim| Process::new(sim, sim_main(sim, count)), - ) - .sim_duration - .get() - }) - .fold( - RandomVar::new, - |var, duration| { - var.tabulate(duration); - var - }, - ) - .reduce( - RandomVar::new, - |var_a, var_b| { - var_a.merge(&var_b); - var_a - }, - ) +fn philosophers(count: usize, reruns: usize) -> RandomVariable { + // Use thread-based parallelism to concurrently run simulation models. + (1..=reruns) + .into_par_iter() + .map(|i| { + simcore_rs::simulation( + // Global data. + Philosophers { + master_rng: RefCell::new(SmallRng::seed_from_u64(i as u64 * SEED)), + sim_duration: Cell::default(), + }, + // Simulation entry point. + |sim| Process::new(sim, sim_main(sim, count)), + ) + .sim_duration + .get() + }) + .fold(RandomVariable::new, |var, duration| { + var.tabulate(duration); + var + }) + .reduce(RandomVariable::new, |var_a, var_b| { + var_a.merge(&var_b); + var_a + }) } /// Entry point for the Dining Philosophers simulation. @@ -203,10 +197,10 @@ fn philosophers(count: usize, reruns: usize) -> RandomVar { /// Runs multiple simulations and prints out the statistical results. #[cfg(not(test))] fn main() { - const EXPERIMENT_COUNT: usize = 500; + const EXPERIMENT_COUNT: usize = 500; - let sim_duration = philosophers(PHILOSOPHER_COUNT, EXPERIMENT_COUNT); - println!("Simulation duration until deadlock: {:#?}", sim_duration); + let sim_duration = philosophers(PHILOSOPHER_COUNT, EXPERIMENT_COUNT); + println!("Simulation duration until deadlock: {:#?}", sim_duration); } #[cfg(test)] @@ -221,87 +215,87 @@ mod slx; /// of reruns. #[cfg(test)] mod bench { - use super::*; - use criterion::{criterion_group, AxisScale, BenchmarkId, Criterion, PlotConfiguration}; - - #[cfg(feature = "odemx")] - mod odemx { - use std::os::raw::c_uint; - - #[link(name = "odemx", kind = "static")] - extern "C" { - pub fn philosophers(count: c_uint, reruns: c_uint); - } - } - - #[cfg(windows)] - const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; - const RANGE: u32 = 10; - const STEP: usize = 3; - - /// Benchmarks the Dining Philosophers simulation using different numbers of - /// reruns. - fn philosopher_bench(c: &mut Criterion) { - let mut group = c.benchmark_group("Philosophers"); - - // Set up the benchmark parameters. - group.confidence_level(0.99); - group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); - - #[cfg(windows)] - let slx_path = { - let path = slx::slx_version(SLX_PATH); - - if path.is_none() { - println!("SLX not found, skipping SLX benchmarks!"); - } else { - println!("Using SLX program at {:?}", path.as_ref().unwrap()); - } - - path - }; - - // Vary the number of performed reruns in each experiment. - for experiment_count in (0..RANGE).map(|c| (1 << c) * STEP) { - #[cfg(windows)] - if let Some(path) = slx_path.as_ref() { - let count = experiment_count.to_string(); - let args = [ - "/silent", - "/stdout", - "/noicon", - "/nowarn", - "/noxwarn", - "/#BENCH", - "slx\\philosophers.slx", - count.as_str(), - ]; - - // Benchmark the SLX implementation. - group.bench_function(BenchmarkId::new("SLX", experiment_count), |b| { - b.iter_custom(|iters| { - slx::slx_bench(path.as_os_str(), &args, iters as usize) - .expect("Couldn't benchmark the SLX program") - }) - }); - } - - // Benchmark the C++ implementation. - #[cfg(feature = "odemx")] - group.bench_function(BenchmarkId::new("ODEMx", experiment_count), |b| { - b.iter(|| unsafe { - odemx::philosophers(PHILOSOPHER_COUNT as _, experiment_count as _) - }) - }); - - // Benchmark the Rust implementation. - group.bench_function(BenchmarkId::new("Rust", experiment_count), |b| { - b.iter(|| philosophers(PHILOSOPHER_COUNT, experiment_count)) - }); - } - - group.finish(); - } - - criterion_group!(benches, philosopher_bench); + use super::*; + use criterion::{criterion_group, AxisScale, BenchmarkId, Criterion, PlotConfiguration}; + + #[cfg(feature = "odemx")] + mod odemx { + use std::os::raw::c_uint; + + #[link(name = "odemx", kind = "static")] + extern "C" { + pub fn philosophers(count: c_uint, reruns: c_uint); + } + } + + #[cfg(windows)] + const SLX_PATH: &'static str = "C:\\Wolverine\\SLX"; + const RANGE: u32 = 10; + const STEP: usize = 3; + + /// Benchmarks the Dining Philosophers simulation using different numbers of + /// reruns. + fn philosopher_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("Philosophers"); + + // Set up the benchmark parameters. + group.confidence_level(0.99); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + #[cfg(windows)] + let slx_path = { + let path = slx::slx_version(SLX_PATH); + + if path.is_none() { + println!("SLX not found, skipping SLX benchmarks!"); + } else { + println!("Using SLX program at {:?}", path.as_ref().unwrap()); + } + + path + }; + + // Vary the number of performed reruns in each experiment. + for experiment_count in (0..RANGE).map(|c| (1 << c) * STEP) { + #[cfg(windows)] + if let Some(path) = slx_path.as_ref() { + let count = experiment_count.to_string(); + let args = [ + "/silent", + "/stdout", + "/noicon", + "/nowarn", + "/noxwarn", + "/#BENCH", + "slx\\philosophers.slx", + count.as_str(), + ]; + + // Benchmark the SLX implementation. + group.bench_function(BenchmarkId::new("SLX", experiment_count), |b| { + b.iter_custom(|iters| { + slx::slx_bench(path.as_os_str(), &args, iters as usize) + .expect("Couldn't benchmark the SLX program") + }) + }); + } + + // Benchmark the C++ implementation. + #[cfg(feature = "odemx")] + group.bench_function(BenchmarkId::new("ODEMx", experiment_count), |b| { + b.iter(|| unsafe { + odemx::philosophers(PHILOSOPHER_COUNT as _, experiment_count as _) + }) + }); + + // Benchmark the Rust implementation. + group.bench_function(BenchmarkId::new("Rust", experiment_count), |b| { + b.iter(|| philosophers(PHILOSOPHER_COUNT, experiment_count)) + }); + } + + group.finish(); + } + + criterion_group!(benches, philosopher_bench); } diff --git a/examples/slx/mod.rs b/examples/slx/mod.rs index 0fb39b1e50fe3b3c61fe9c8648248454850a39da..9b69aa14e40a171d5633db1593af31ccfa1ae383 100644 --- a/examples/slx/mod.rs +++ b/examples/slx/mod.rs @@ -7,45 +7,45 @@ use std::{iter::once, path::Path, process::Command, time::Duration}; /// /// Errors during this execution are returned back to the caller. fn slx_run_once( - program: &OsStr, - arguments: &[&str], - iterations: usize, + program: &OsStr, + arguments: &[&str], + iterations: usize, ) -> Result<Duration, (i32, String)> { - // start the SLX runtime and wait for its return - let rc = Command::new(program) - .args(arguments) - .arg(iterations.to_string()) - .output() - .map_err(|err| (-1, format!("Failed to run the SLX program: {}", err)))?; + // start the SLX runtime and wait for its return + let rc = Command::new(program) + .args(arguments) + .arg(iterations.to_string()) + .output() + .map_err(|err| (-1, format!("Failed to run the SLX program: {}", err)))?; - if !rc.status.success() { - // check the error code and add a description to it - Err(rc.status.code().map_or_else( - || (-10000, "Unknown error code".to_string()), - |code| { - ( - code, - match code { - -10001 => "Unable to find a security key with SLX permission".to_string(), - -10002 => "The command line includes an invalid option".to_string(), - -10003 => "The source file cannot be found".to_string(), - -10004 => "The specified/implied output file cannot be written".to_string(), - -10005 => "The program contains compile-time errors".to_string(), - -10006 => "The program has terminated with a run-time error".to_string(), - -10007 => "The /genrts option failed".to_string(), - _ => format!("Unknown error code: {}", code), - }, - ) - }, - )) - } else { - Ok(Duration::from_secs_f64( - (&String::from_utf8_lossy(&rc.stdout)) - .trim() - .parse() - .unwrap(), - )) - } + if !rc.status.success() { + // check the error code and add a description to it + Err(rc.status.code().map_or_else( + || (-10000, "Unknown error code".to_string()), + |code| { + ( + code, + match code { + -10001 => "Unable to find a security key with SLX permission".to_string(), + -10002 => "The command line includes an invalid option".to_string(), + -10003 => "The source file cannot be found".to_string(), + -10004 => "The specified/implied output file cannot be written".to_string(), + -10005 => "The program contains compile-time errors".to_string(), + -10006 => "The program has terminated with a run-time error".to_string(), + -10007 => "The /genrts option failed".to_string(), + _ => format!("Unknown error code: {}", code), + }, + ) + }, + )) + } else { + Ok(Duration::from_secs_f64( + (&String::from_utf8_lossy(&rc.stdout)) + .trim() + .parse() + .unwrap(), + )) + } } /// Repeats runs of an SLX program for a specified number of times and collects @@ -54,59 +54,59 @@ fn slx_run_once( /// The SLX program in question has to report its own real-time duration using /// the standard output. pub fn slx_bench( - program: &OsStr, - arguments: &[&str], - iterations: usize, + program: &OsStr, + arguments: &[&str], + iterations: usize, ) -> Result<Duration, String> { - // try to complete the iterations in a single run of the SLX program - slx_run_once(program, arguments, iterations).or_else(|(code, desc)| { - if code == -10006 { - // Runtime error: this happens when the free version of SLX - // exceeds its total instance budget, either through a - // complicated scenario or too many iterations in one call. - // We can still make this work by running SLX multiple times - // with lower iteration counts and combining the timings. - // This effectively trades benchmark speed with simulation model - // size. - (1..) - .map(|i| iterations >> i) - .take_while(|&chunk_size| chunk_size > 0) - // .inspect(|chunk_size| println!("reducing the chunk size to {}", chunk_size)) - .map(|chunk_size| { - (0..(iterations - 1) / chunk_size) - .map(|_| chunk_size) - .chain(once((iterations - 1) % chunk_size + 1)) - .map(|size| slx_run_once(program, arguments, size)) - .fold_ok(Duration::default(), |duration, run| duration + run) - }) - .find_map(|result| result.ok()) - .ok_or(desc) - } else { - Err(desc) - } - }) + // try to complete the iterations in a single run of the SLX program + slx_run_once(program, arguments, iterations).or_else(|(code, desc)| { + if code == -10006 { + // Runtime error: this happens when the free version of SLX + // exceeds its total instance budget, either through a + // complicated scenario or too many iterations in one call. + // We can still make this work by running SLX multiple times + // with lower iteration counts and combining the timings. + // This effectively trades benchmark speed with simulation model + // size. + (1..) + .map(|i| iterations >> i) + .take_while(|&chunk_size| chunk_size > 0) + // .inspect(|chunk_size| println!("reducing the chunk size to {}", chunk_size)) + .map(|chunk_size| { + (0..(iterations - 1) / chunk_size) + .map(|_| chunk_size) + .chain(once((iterations - 1) % chunk_size + 1)) + .map(|size| slx_run_once(program, arguments, size)) + .fold_ok(Duration::default(), |duration, run| duration + run) + }) + .find_map(|result| result.ok()) + .ok_or(desc) + } else { + Err(desc) + } + }) } /// Attempts to find the best SLX version for the benchmarks, falling back on /// the free version if no SLX license can be found. pub fn slx_version(base: impl AsRef<Path>) -> Option<OsString> { - if base.as_ref().exists() { - let programs = ["se64.exe", "se32.exe", "sse.exe"]; - let args = ["/silent", "/noicon", "/nowarn", "/noxwarn", "bad_file.slx"]; + if base.as_ref().exists() { + let programs = ["se64.exe", "se32.exe", "sse.exe"]; + let args = ["/silent", "/noicon", "/nowarn", "/noxwarn", "bad_file.slx"]; - for path in programs.iter().map(|p| base.as_ref().join(p)) { - match Command::new(&path).args(&args).status() { - Err(_) => continue, - Ok(rc) => { - if let Some(code) = rc.code() { - if code == -10003 { - return Some(path.into_os_string()); - } - } - } - } - } - } + for path in programs.iter().map(|p| base.as_ref().join(p)) { + match Command::new(&path).args(&args).status() { + Err(_) => continue, + Ok(rc) => { + if let Some(code) = rc.code() { + if code == -10003 { + return Some(path.into_os_string()); + } + } + } + } + } + } - None + None } diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000000000000000000000000000000000000..7c224aabf8b6771d4da37e846690145438e89b2f --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +hard_tabs=true diff --git a/src/bin/coroutines.rs b/src/bin/coroutines.rs index 0da0f7d624ba7a286e4d4fb85fd65a135e5eb1fe..5547610db86d4f6978231a84f0323fbe72fc1aea 100644 --- a/src/bin/coroutines.rs +++ b/src/bin/coroutines.rs @@ -3,62 +3,62 @@ use std::rc::Rc; async fn decompress<I>(mut inp: I, out: Sender<char>) where - I: Iterator<Item = u8>, + I: Iterator<Item = u8>, { - while let Some(c) = inp.next() { - if c == 0xFF { - let len = inp.next().unwrap(); - let c = inp.next().unwrap(); - - for _ in 0..len { - out.send(c as char).await; - } - } else { - out.send(c as char).await; - } - } + while let Some(c) = inp.next() { + if c == 0xFF { + let len = inp.next().unwrap(); + let c = inp.next().unwrap(); + + for _ in 0..len { + out.send(c as char).await; + } + } else { + out.send(c as char).await; + } + } } #[derive(Clone, Debug)] enum Token { - WORD(String), - PUNCT(char), + WORD(String), + PUNCT(char), } async fn tokenize(inp: Receiver<char>, out: Sender<Token>) { - while let Some(mut c) = inp.recv().await { - if c.is_alphabetic() { - let mut text = c.to_string(); - while let Some(new_c) = inp.recv().await { - if new_c.is_alphabetic() { - text.push(new_c); - } else { - c = new_c; - break; - } - } - out.send(WORD(text)).await; - } - out.send(PUNCT(c)).await; - } + while let Some(mut c) = inp.recv().await { + if c.is_alphabetic() { + let mut text = c.to_string(); + while let Some(new_c) = inp.recv().await { + if new_c.is_alphabetic() { + text.push(new_c); + } else { + c = new_c; + break; + } + } + out.send(WORD(text)).await; + } + out.send(PUNCT(c)).await; + } } fn main() { - let exec = Executor::new(); - let spawner = exec.spawner(); - let input = b"He\xff\x02lo IST!".iter().cloned(); + let exec = Executor::new(); + let spawner = exec.spawner(); + let input = b"He\xff\x02lo IST!".iter().cloned(); - exec.run(async { - let (s1, r1) = channel::<char>(); - let (s2, r2) = channel::<Token>(); + exec.run(async { + let (s1, r1) = channel::<char>(); + let (s2, r2) = channel::<Token>(); - spawner.spawn(decompress(input, s1)); - spawner.spawn(tokenize(r1, s2)); + spawner.spawn(decompress(input, s1)); + spawner.spawn(tokenize(r1, s2)); - while let Some(token) = r2.recv().await { - println!("{:?}", token); - } - }); + while let Some(token) = r2.recv().await { + println!("{:?}", token); + } + }); } /* **************************** library contents **************************** */ @@ -66,11 +66,11 @@ fn main() { // additional imports for the necessary execution environment use std::{ - cell::RefCell, - future::Future, - mem::swap, - pin::Pin, - task::{self, Context, Poll}, + cell::RefCell, + future::Future, + mem::swap, + pin::Pin, + task::{self, Context, Poll}, }; /* ************************** executor environment ************************** */ @@ -80,69 +80,69 @@ type FutureQ = Vec<Pin<Box<dyn Future<Output = ()>>>>; /// A simple executor that drives the event loop. struct Executor { - sched: RefCell<FutureQ>, + sched: RefCell<FutureQ>, } impl Executor { - /// Constructs a new executor. - pub fn new() -> Self { - Executor { - sched: RefCell::new(vec![]), - } - } - - /// Creates and returns a Spawner that can be used to insert new futures - /// into the executors future queue. - pub fn spawner(&self) -> Spawner { - Spawner { sched: &self.sched } - } - - /// Runs a future to completion. - pub fn run<F: Future>(&self, mut future: F) -> F::Output { - // construct a custom context to pass into the poll()-method - let waker = unsafe { - task::Waker::from_raw(task::RawWaker::new( - std::ptr::null(), - &EXECUTOR_WAKER_VTABLE, - )) - }; - let mut cx = task::Context::from_waker(&waker); - - // pin the passed future for the duration of this function; - // note: this is safe since we're not moving it around - let mut future = unsafe { Pin::new_unchecked(&mut future) }; - let mut sched = FutureQ::new(); - - // implement a busy-wait event loop for simplicity - loop { - // poll the primary future, allowing secondary futures to spawn - if let Poll::Ready(val) = future.as_mut().poll(&mut cx) { - break val; - } - - // swap the scheduled futures with an empty queue - swap(&mut sched, &mut self.sched.borrow_mut()); - - // iterate over all secondary futures presently scheduled - for mut future in sched.drain(..) { - // if they are not completed, reschedule - if future.as_mut().poll(&mut cx).is_pending() { - self.sched.borrow_mut().push(future); - } - } - } - } + /// Constructs a new executor. + pub fn new() -> Self { + Executor { + sched: RefCell::new(vec![]), + } + } + + /// Creates and returns a Spawner that can be used to insert new futures + /// into the executors future queue. + pub fn spawner(&self) -> Spawner { + Spawner { sched: &self.sched } + } + + /// Runs a future to completion. + pub fn run<F: Future>(&self, mut future: F) -> F::Output { + // construct a custom context to pass into the poll()-method + let waker = unsafe { + task::Waker::from_raw(task::RawWaker::new( + std::ptr::null(), + &EXECUTOR_WAKER_VTABLE, + )) + }; + let mut cx = task::Context::from_waker(&waker); + + // pin the passed future for the duration of this function; + // note: this is safe since we're not moving it around + let mut future = unsafe { Pin::new_unchecked(&mut future) }; + let mut sched = FutureQ::new(); + + // implement a busy-wait event loop for simplicity + loop { + // poll the primary future, allowing secondary futures to spawn + if let Poll::Ready(val) = future.as_mut().poll(&mut cx) { + break val; + } + + // swap the scheduled futures with an empty queue + swap(&mut sched, &mut self.sched.borrow_mut()); + + // iterate over all secondary futures presently scheduled + for mut future in sched.drain(..) { + // if they are not completed, reschedule + if future.as_mut().poll(&mut cx).is_pending() { + self.sched.borrow_mut().push(future); + } + } + } + } } /// A handle to the executors future queue that can be used to insert new tasks. struct Spawner<'a> { - sched: &'a RefCell<FutureQ>, + sched: &'a RefCell<FutureQ>, } impl<'a> Spawner<'a> { - pub fn spawn(&self, future: impl Future<Output = ()> + 'static) { - self.sched.borrow_mut().push(Box::pin(future)); - } + pub fn spawn(&self, future: impl Future<Output = ()> + 'static) { + self.sched.borrow_mut().push(Box::pin(future)); + } } /* **************************** specialized waker *************************** */ @@ -151,121 +151,121 @@ impl<'a> Spawner<'a> { struct TrivialWaker; impl TrivialWaker { - unsafe fn clone(_this: *const ()) -> task::RawWaker { - unimplemented!() - } + unsafe fn clone(_this: *const ()) -> task::RawWaker { + unimplemented!() + } - unsafe fn wake(_this: *const ()) { - unimplemented!() - } + unsafe fn wake(_this: *const ()) { + unimplemented!() + } - unsafe fn wake_by_ref(_this: *const ()) { - unimplemented!() - } + unsafe fn wake_by_ref(_this: *const ()) { + unimplemented!() + } - unsafe fn drop(_this: *const ()) {} + unsafe fn drop(_this: *const ()) {} } /// Virtual function table for the simple waker. static EXECUTOR_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( - TrivialWaker::clone, - TrivialWaker::wake, - TrivialWaker::wake_by_ref, - TrivialWaker::drop, + TrivialWaker::clone, + TrivialWaker::wake, + TrivialWaker::wake_by_ref, + TrivialWaker::drop, ); /* ************************** asynchronous wrappers ************************* */ /// Simple channel with space for only one element. struct Channel<T> { - slot: RefCell<Option<T>>, + slot: RefCell<Option<T>>, } /// Creates a channel and returns a pair of read and write ends. fn channel<T>() -> (Sender<T>, Receiver<T>) { - let channel = Rc::new(Channel { - slot: RefCell::new(None), - }); - ( - Sender { - channel: channel.clone(), - }, - Receiver { channel }, - ) + let channel = Rc::new(Channel { + slot: RefCell::new(None), + }); + ( + Sender { + channel: channel.clone(), + }, + Receiver { channel }, + ) } /// Write-end of a channel. struct Sender<T> { - channel: Rc<Channel<T>>, + channel: Rc<Channel<T>>, } /// Read-end of a channel. struct Receiver<T> { - channel: Rc<Channel<T>>, + channel: Rc<Channel<T>>, } impl<T: Clone> Sender<T> { - /// Method used to push an element into the channel. - /// Blocks until the previous element has been consumed. - pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's { - SendFuture { - channel: &self.channel, - elem, - } - } + /// Method used to push an element into the channel. + /// Blocks until the previous element has been consumed. + pub fn send<'s>(&'s self, elem: T) -> impl Future<Output = ()> + 's { + SendFuture { + channel: &self.channel, + elem, + } + } } impl<T: Clone> Receiver<T> { - /// Method used to consume an element from the channel. - /// Blocks until an element can be consumed. - pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r { - ReceiveFuture { - channel: &self.channel, - } - } + /// Method used to consume an element from the channel. + /// Blocks until an element can be consumed. + pub fn recv<'r>(&'r self) -> impl Future<Output = Option<T>> + 'r { + ReceiveFuture { + channel: &self.channel, + } + } } /// A future that pushes an element into a channel. struct SendFuture<'c, T> { - channel: &'c Rc<Channel<T>>, - elem: T, + channel: &'c Rc<Channel<T>>, + elem: T, } /// A future that consumes an element from a channel. struct ReceiveFuture<'c, T> { - channel: &'c Rc<Channel<T>>, + channel: &'c Rc<Channel<T>>, } impl<T: Clone> Future for SendFuture<'_, T> { - type Output = (); - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { - // check if there is space for the element - if self.channel.slot.borrow().is_none() { - // replace the empty element with ours - self.channel.slot.replace(Some(self.elem.clone())); - Poll::Ready(()) - } else { - // try again at a later time - Poll::Pending - } - } + type Output = (); + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { + // check if there is space for the element + if self.channel.slot.borrow().is_none() { + // replace the empty element with ours + self.channel.slot.replace(Some(self.elem.clone())); + Poll::Ready(()) + } else { + // try again at a later time + Poll::Pending + } + } } impl<T> Future for ReceiveFuture<'_, T> { - type Output = Option<T>; - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { - // check if there is an element in the channel - if let Some(c) = self.channel.slot.borrow_mut().take() { - // return it - Poll::Ready(Some(c)) - } else if Rc::strong_count(self.channel) == 1 { - // check if the sender disconnected - Poll::Ready(None) - } else { - // try again at a later time - Poll::Pending - } - } + type Output = Option<T>; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { + // check if there is an element in the channel + if let Some(c) = self.channel.slot.borrow_mut().take() { + // return it + Poll::Ready(Some(c)) + } else if Rc::strong_count(self.channel) == 1 { + // check if the sender disconnected + Poll::Ready(None) + } else { + // try again at a later time + Poll::Pending + } + } } diff --git a/src/lib.rs b/src/lib.rs index 6dee5e32826d8afd6b07a58cf6aa6d39046ae478..b294e011b54515f2940e62d4671193f4d0196603 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,15 +66,15 @@ //! cases or be suitable for production use. use std::{ - cell::{Cell, RefCell}, - cmp::Ordering, - collections::BinaryHeap, - future::{poll_fn, Future}, - hash::{Hash, Hasher}, - mem::ManuallyDrop, - pin::Pin, - rc::{Rc, Weak}, - task::{self, Context, Poll}, + cell::{Cell, RefCell}, + cmp::Ordering, + collections::BinaryHeap, + future::{poll_fn, Future}, + hash::{Hash, Hasher}, + mem::ManuallyDrop, + pin::Pin, + rc::{Rc, Weak}, + task::{self, Context, Poll}, }; pub mod util; @@ -88,216 +88,219 @@ pub type Time = f64; /// process. pub fn simulation<G, F>(shared: G, main: F) -> G where - F: for<'s> FnOnce(Sim<'s, G>) -> Process<'s, G>, + F: for<'s> FnOnce(Sim<'s, G>) -> Process<'s, G>, { - let simulator; - let registry = Rc::default(); - - // create a fresh simulation context and a handle to it - simulator = Simulator::new(&shared, Rc::downgrade(®istry)); - let sim = Sim { handle: &simulator }; - - // construct a custom context to pass into the poll()-method - let event_waker = StateEventWaker::new(sim); - let waker = unsafe { event_waker.as_waker() }; - let mut cx = Context::from_waker(&waker); - - // evaluate the passed function for the main process and schedule it - let root = main(sim); - simulator.schedule(root.clone()); - - // pop processes until empty or the main process terminates - while let Some(process) = simulator.next_process() { - if process.poll(&mut cx).is_ready() && process == root { - break; - } - - // drop the active process if only the weak reference of the - // `active` field in the event calendar remains - if Rc::weak_count(&process.0) <= 1 { - registry.borrow_mut().swap_remove(&process); - } - } - - // Here we drop `registry` first and `simulator` after. Since `registry` - // is an `IndexSet` (and not a `HashSet`), all processes are guaranteed - // to be dropped, even if a destructor of a future unwinds. This ensures - // that any weak process wakers leaked out of the `simulation` function - // can no longer be upgraded. - drop(registry); - - // return the global data - shared + let simulator; + let registry = Rc::default(); + + // create a fresh simulation context and a handle to it + simulator = Simulator::new(&shared, Rc::downgrade(®istry)); + let sim = Sim { handle: &simulator }; + + // construct a custom context to pass into the poll()-method + let event_waker = StateEventWaker::new(sim); + let waker = unsafe { event_waker.as_waker() }; + let mut cx = Context::from_waker(&waker); + + // evaluate the passed function for the main process and schedule it + let root = main(sim); + simulator.schedule(root.clone()); + + // pop processes until empty or the main process terminates + while let Some(process) = simulator.next_process() { + if process.poll(&mut cx).is_ready() && process == root { + break; + } + + // drop the active process if only the weak reference of the + // `active` field in the event calendar remains + if Rc::weak_count(&process.0) <= 1 { + registry.borrow_mut().swap_remove(&process); + } + } + + // Here we drop `registry` first and `simulator` after. Since `registry` + // is an `IndexSet` (and not a `HashSet`), all processes are guaranteed + // to be dropped, even if a destructor of a future unwinds. This ensures + // that any weak process wakers leaked out of the `simulation` function + // can no longer be upgraded. + drop(registry); + + // return the global data + shared } /// Private simulation context that holds a reference to the shared data, the /// event calendar, and the owning container for all processes. struct Simulator<'s, G> { - /// A reference to the globally shared data. - shared: &'s G, - - /// The scheduler for processes. - calendar: RefCell<Calendar<'s, G>>, - - /// The container that has strong ownership of the processes during a - /// simulation run. - registry: Weak<RefCell<indexmap::IndexSet<StrongProcess<'s, G>>>> + /// A reference to the globally shared data. + shared: &'s G, + + /// The scheduler for processes. + calendar: RefCell<Calendar<'s, G>>, + + /// The container that has strong ownership of the processes during a + /// simulation run. + registry: Weak<RefCell<indexmap::IndexSet<StrongProcess<'s, G>>>>, } impl<'s, G> Simulator<'s, G> { - /// Create a new simulator instance referencing shared data and using the - /// registry as owner of the processes. - fn new( - shared: &'s G, - registry: Weak<RefCell<indexmap::IndexSet<StrongProcess<'s, G>>>> - ) -> Self { - Simulator { - shared, - calendar: RefCell::new(Calendar::default()), - registry, - } - } - - /// Returns a (reference-counted) copy of the currently active process. - fn active(&self) -> Process<'s, G> { - self.calendar - .borrow() - .active - .as_ref() - .expect("no active process") - .clone() - } - - /// Returns the current simulation time. - fn now(&self) -> Time { - self.calendar.borrow().now - } - - /// Schedules a process at the current simulation time. - fn schedule(&self, process: Process<'s, G>) { - self.schedule_in(Time::default(), process); - } - - /// Schedules a process at a later simulation time. - fn schedule_in(&self, dt: Time, process: Process<'s, G>) { - self.calendar.borrow_mut().schedule_in(dt, process); - } - - /// Returns the next process according to the event calendar. - fn next_process(&self) -> Option<StrongProcess<'s, G>> { - self.calendar.borrow_mut().next_process() - } + /// Create a new simulator instance referencing shared data and using the + /// registry as owner of the processes. + fn new( + shared: &'s G, + registry: Weak<RefCell<indexmap::IndexSet<StrongProcess<'s, G>>>>, + ) -> Self { + Simulator { + shared, + calendar: RefCell::new(Calendar::default()), + registry, + } + } + + /// Returns a (reference-counted) copy of the currently active process. + fn active(&self) -> Process<'s, G> { + self.calendar + .borrow() + .active + .as_ref() + .expect("no active process") + .clone() + } + + /// Returns the current simulation time. + fn now(&self) -> Time { + self.calendar.borrow().now + } + + /// Schedules a process at the current simulation time. + fn schedule(&self, process: Process<'s, G>) { + self.schedule_in(Time::default(), process); + } + + /// Schedules a process at a later simulation time. + fn schedule_in(&self, dt: Time, process: Process<'s, G>) { + self.calendar.borrow_mut().schedule_in(dt, process); + } + + /// Returns the next process according to the event calendar. + fn next_process(&self) -> Option<StrongProcess<'s, G>> { + self.calendar.borrow_mut().next_process() + } } /// The (private) scheduler for processes. struct Calendar<'s, G> { - /// The current simulation time. - now: Time, - /// The event-calendar organized chronologically. - events: BinaryHeap<NextEvent<'s, G>>, - /// The currently active process. - active: Option<Process<'s, G>>, + /// The current simulation time. + now: Time, + /// The event-calendar organized chronologically. + events: BinaryHeap<NextEvent<'s, G>>, + /// The currently active process. + active: Option<Process<'s, G>>, } impl<G> Default for Calendar<'_, G> { - fn default() -> Self { - Self { - now: Time::default(), - events: Default::default(), - active: Default::default(), - } - } + fn default() -> Self { + Self { + now: Time::default(), + events: Default::default(), + active: Default::default(), + } + } } impl<'s, G> Calendar<'s, G> { - /// Schedules a process at a later simulation time. - #[inline] - fn schedule_in(&mut self, dt: Time, process: Process<'s, G>) { - let calender = &mut self.events; - let strong_process = process.upgrade().expect("already terminated"); - let is_scheduled = &strong_process.0.is_scheduled; - - assert!(!is_scheduled.get(), "already scheduled"); - is_scheduled.set(true); - - calender.push(NextEvent { - move_time: self.now + dt, - process - }); - } - - /// Removes the process with the next event time from the calendar and - /// activates it. - #[inline] - fn next_process(&mut self) -> Option<StrongProcess<'s, G>> { - loop { - let NextEvent { move_time: now, process } = self.events.pop()?; - let Some(strong_process) = process.upgrade() else { - // process was terminated after being queued - continue; - }; - strong_process.0.is_scheduled.set(false); - self.now = now; - self.active = Some(process); - - return Some(strong_process); - } - } + /// Schedules a process at a later simulation time. + #[inline] + fn schedule_in(&mut self, dt: Time, process: Process<'s, G>) { + let calender = &mut self.events; + let strong_process = process.upgrade().expect("already terminated"); + let is_scheduled = &strong_process.0.is_scheduled; + + assert!(!is_scheduled.get(), "already scheduled"); + is_scheduled.set(true); + + calender.push(NextEvent { + move_time: self.now + dt, + process, + }); + } + + /// Removes the process with the next event time from the calendar and + /// activates it. + #[inline] + fn next_process(&mut self) -> Option<StrongProcess<'s, G>> { + loop { + let NextEvent { + move_time: now, + process, + } = self.events.pop()?; + let Some(strong_process) = process.upgrade() else { + // process was terminated after being queued + continue; + }; + strong_process.0.is_scheduled.set(false); + self.now = now; + self.active = Some(process); + + return Some(strong_process); + } + } } /// A light-weight handle to the simulation context. pub struct Sim<'s, G = ()> { - handle: &'s Simulator<'s, G>, + handle: &'s Simulator<'s, G>, } // this allows the creation of copies impl<G> Clone for Sim<'_, G> { - #[inline] - fn clone(&self) -> Self { - *self - } + #[inline] + fn clone(&self) -> Self { + *self + } } // this allows moving the context without invalidating it (copy semantics) impl<G> Copy for Sim<'_, G> {} impl<'s, G> Sim<'s, G> { - /// Returns a (reference-counted) copy of the currently active process. - #[inline] - pub fn active(self) -> Process<'s, G> { - self.handle.active() - } - - /// Activates a new process with the given future. - #[inline] - pub fn activate(self, f: impl Future<Output = ()> + 's) { - self.reactivate(Process::new(self, f)); - } - - /// Reactivates a process that has been suspended with wait(). - #[inline] - pub fn reactivate(self, process: Process<'s, G>) { - self.handle.schedule(process); - } - - /// Reactivates the currently active process after some time has passed. - #[inline] - pub async fn advance(self, dt: Time) { - self.handle.schedule_in(dt, self.handle.active()); - sleep().await - } - - /// Returns the current simulation time. - #[inline] - pub fn now(self) -> Time { - self.handle.now() - } - - /// Returns a shared reference to the global data. - #[inline] - pub fn global(self) -> &'s G { - self.handle.shared - } + /// Returns a (reference-counted) copy of the currently active process. + #[inline] + pub fn active(self) -> Process<'s, G> { + self.handle.active() + } + + /// Activates a new process with the given future. + #[inline] + pub fn activate(self, f: impl Future<Output = ()> + 's) { + self.reactivate(Process::new(self, f)); + } + + /// Reactivates a process that has been suspended with wait(). + #[inline] + pub fn reactivate(self, process: Process<'s, G>) { + self.handle.schedule(process); + } + + /// Reactivates the currently active process after some time has passed. + #[inline] + pub async fn advance(self, dt: Time) { + self.handle.schedule_in(dt, self.handle.active()); + sleep().await + } + + /// Returns the current simulation time. + #[inline] + pub fn now(self) -> Time { + self.handle.now() + } + + /// Returns a shared reference to the global data. + #[inline] + pub fn global(self) -> &'s G { + self.handle.shared + } } /// A bare-bone process type that can also be used as a waker. @@ -313,137 +316,138 @@ struct StrongProcess<'s, G>(Rc<ProcessInner<'s, G>>); /// The private details of the [`Process`](struct.Process.html) type. struct ProcessInner<'s, G> { - /// The simulation context needed to implement the `Waker` interface. - context: Sim<'s, G>, - /// A boolean flag indicating whether this process is currently scheduled. - is_scheduled: Cell<bool>, - /// `Some` [`Future`] associated with this process or `None` if it has been - /// terminated externally. - /// - /// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html - state: RefCell<Option<Pin<Box<dyn Future<Output = ()> + 's>>>>, + /// The simulation context needed to implement the `Waker` interface. + context: Sim<'s, G>, + /// A boolean flag indicating whether this process is currently scheduled. + is_scheduled: Cell<bool>, + /// `Some` [`Future`] associated with this process or `None` if it has been + /// terminated externally. + /// + /// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html + state: RefCell<Option<Pin<Box<dyn Future<Output = ()> + 's>>>>, } impl<'s, G> Process<'s, G> { - /// Combines a future and a simulation context to a process. - #[inline] - pub fn new(sim: Sim<'s, G>, fut: impl Future<Output = ()> + 's) -> Self { - let strong_process = StrongProcess(Rc::new(ProcessInner { - context: sim, - is_scheduled: Cell::new(false), - state: RefCell::new(Some(Box::pin(fut))), - })); - - let process = strong_process.downgrade(); - - let registry = sim - .handle - .registry - .upgrade() - .expect("attempted to create process after simulation ended"); - registry.borrow_mut().insert(strong_process); - - process - } - - /// Releases the [`Future`] contained in this process. - /// - /// This will also execute all the destructors for the local variables - /// initialized by this process. - #[inline] - pub fn terminate(&self) { - let Some(strong_process) = self.upgrade() else { - // ok, we're already terminated - return; - }; - let sim = strong_process.0.context; - assert!( - sim.handle.calendar - .borrow() - .active - .as_ref() - .is_none_or(|active| active != self), - "attempted to terminate active process" - ); - if let Some(registry) = sim.handle.registry.upgrade() { - registry.borrow_mut().swap_remove(&strong_process); - } else { - // This can happen if we're dropping the `registry` and the - // destructor of another process tries to terminate this process. - // In this case, we need to manually drop our future now to ensure - // that there are no dangling references from our future to their - // future. - *strong_process.0.state.borrow_mut() = None; - }; - drop(strong_process); - assert!(self.is_terminated(), "failed to terminate process"); - } - - /// Returns a `Waker` for this process. - #[inline] - pub fn waker(self) -> task::Waker { - unsafe { task::Waker::from_raw(self.raw_waker()) } - } - - /// Returns whether this process has finished its life-cycle. - #[inline] - pub fn is_terminated(&self) -> bool { - self.upgrade() - .is_none_or(|strong_process| strong_process.0.state.borrow().is_none()) - } - - /// Gets private access to the process. - /// - /// # Safety - /// - /// The caller must ensure that the [`StrongProcess`] is not leaked. - /// In particular, it must not be passed to user code. - #[inline] - fn upgrade(&self) -> Option<StrongProcess<'s, G>> { - self.0.upgrade().map(StrongProcess) - } + /// Combines a future and a simulation context to a process. + #[inline] + pub fn new(sim: Sim<'s, G>, fut: impl Future<Output = ()> + 's) -> Self { + let strong_process = StrongProcess(Rc::new(ProcessInner { + context: sim, + is_scheduled: Cell::new(false), + state: RefCell::new(Some(Box::pin(fut))), + })); + + let process = strong_process.downgrade(); + + let registry = sim + .handle + .registry + .upgrade() + .expect("attempted to create process after simulation ended"); + registry.borrow_mut().insert(strong_process); + + process + } + + /// Releases the [`Future`] contained in this process. + /// + /// This will also execute all the destructors for the local variables + /// initialized by this process. + #[inline] + pub fn terminate(&self) { + let Some(strong_process) = self.upgrade() else { + // ok, we're already terminated + return; + }; + let sim = strong_process.0.context; + assert!( + sim.handle + .calendar + .borrow() + .active + .as_ref() + .is_none_or(|active| active != self), + "attempted to terminate active process" + ); + if let Some(registry) = sim.handle.registry.upgrade() { + registry.borrow_mut().swap_remove(&strong_process); + } else { + // This can happen if we're dropping the `registry` and the + // destructor of another process tries to terminate this process. + // In this case, we need to manually drop our future now to ensure + // that there are no dangling references from our future to their + // future. + *strong_process.0.state.borrow_mut() = None; + }; + drop(strong_process); + assert!(self.is_terminated(), "failed to terminate process"); + } + + /// Returns a `Waker` for this process. + #[inline] + pub fn waker(self) -> task::Waker { + unsafe { task::Waker::from_raw(self.raw_waker()) } + } + + /// Returns whether this process has finished its life-cycle. + #[inline] + pub fn is_terminated(&self) -> bool { + self.upgrade() + .is_none_or(|strong_process| strong_process.0.state.borrow().is_none()) + } + + /// Gets private access to the process. + /// + /// # Safety + /// + /// The caller must ensure that the [`StrongProcess`] is not leaked. + /// In particular, it must not be passed to user code. + #[inline] + fn upgrade(&self) -> Option<StrongProcess<'s, G>> { + self.0.upgrade().map(StrongProcess) + } } impl<'s, G> StrongProcess<'s, G> { - /// Private function for polling the process. - #[inline] - fn poll(&self, cx: &mut Context<'_>) -> Poll<()> { - self.0 - .state - .borrow_mut() - .as_mut() - .expect("attempted to poll terminated task") - .as_mut() - .poll(cx) - } - - /// Converts the owning process into a weaker referencing process. - #[inline] - fn downgrade(&self) -> Process<'s, G> { - Process(Rc::downgrade(&self.0)) - } - - /// Returns whether this process is currently scheduled. - #[inline] - fn is_scheduled(&self) -> bool { - self.0.is_scheduled.get() - } + /// Private function for polling the process. + #[inline] + fn poll(&self, cx: &mut Context<'_>) -> Poll<()> { + self.0 + .state + .borrow_mut() + .as_mut() + .expect("attempted to poll terminated task") + .as_mut() + .poll(cx) + } + + /// Converts the owning process into a weaker referencing process. + #[inline] + fn downgrade(&self) -> Process<'s, G> { + Process(Rc::downgrade(&self.0)) + } + + /// Returns whether this process is currently scheduled. + #[inline] + fn is_scheduled(&self) -> bool { + self.0.is_scheduled.get() + } } // Increases the reference counter of this process. impl<G> Clone for Process<'_, G> { - #[inline] - fn clone(&self) -> Self { - Process(self.0.clone()) - } + #[inline] + fn clone(&self) -> Self { + Process(self.0.clone()) + } } // allows processes to be compared for equality impl<G> PartialEq for Process<'_, G> { - #[inline] - fn eq(&self, other: &Self) -> bool { - Weak::ptr_eq(&self.0, &other.0) - } + #[inline] + fn eq(&self, other: &Self) -> bool { + Weak::ptr_eq(&self.0, &other.0) + } } // marks the equality-relation as total @@ -451,24 +455,24 @@ impl<G> Eq for Process<'_, G> {} // hash processes for pointer equality impl<G> Hash for Process<'_, G> { - fn hash<H: Hasher>(&self, state: &mut H) { - self.0.as_ptr().hash(state); - } + fn hash<H: Hasher>(&self, state: &mut H) { + self.0.as_ptr().hash(state); + } } // allows processes to be compared for equality impl<G> PartialEq for StrongProcess<'_, G> { - #[inline] - fn eq(&self, other: &Self) -> bool { - Rc::ptr_eq(&self.0, &other.0) - } + #[inline] + fn eq(&self, other: &Self) -> bool { + Rc::ptr_eq(&self.0, &other.0) + } } impl<'s, G> PartialEq<Process<'s, G>> for StrongProcess<'s, G> { - #[inline] - fn eq(&self, other: &Process<'s, G>) -> bool { - Rc::as_ptr(&self.0) == other.0.as_ptr() - } + #[inline] + fn eq(&self, other: &Process<'s, G>) -> bool { + Rc::as_ptr(&self.0) == other.0.as_ptr() + } } // marks the equality-relation as total @@ -476,108 +480,107 @@ impl<G> Eq for StrongProcess<'_, G> {} // hash processes for pointer equality impl<G> Hash for StrongProcess<'_, G> { - fn hash<H: Hasher>(&self, state: &mut H) { - Rc::as_ptr(&self.0).hash(state); - } + fn hash<H: Hasher>(&self, state: &mut H) { + Rc::as_ptr(&self.0).hash(state); + } } /// Time-process-pair that has a total order defined based on the time. struct NextEvent<'p, G> { - move_time: Time, - process: Process<'p, G>, + move_time: Time, + process: Process<'p, G>, } impl<G> PartialEq for NextEvent<'_, G> { - #[inline] - fn eq(&self, other: &Self) -> bool { - self.move_time == other.move_time - } + #[inline] + fn eq(&self, other: &Self) -> bool { + self.move_time == other.move_time + } } impl<G> Eq for NextEvent<'_, G> {} impl<G> PartialOrd for NextEvent<'_, G> { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option<Ordering> { - Some(self.cmp(other)) - } + #[inline] + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(other)) + } } impl<G> Ord for NextEvent<'_, G> { - #[inline] - fn cmp(&self, other: &Self) -> Ordering { - self.move_time - .partial_cmp(&other.move_time) - .expect("illegal event time NaN") - .reverse() - } + #[inline] + fn cmp(&self, other: &Self) -> Ordering { + self.move_time + .partial_cmp(&other.move_time) + .expect("illegal event time NaN") + .reverse() + } } /* **************************** specialized waker *************************** */ impl<G> Process<'_, G> { - /// Virtual function table for the waker. - const VTABLE: task::RawWakerVTable = - task::RawWakerVTable::new(Self::clone, Self::wake, Self::wake_by_ref, Self::drop); - - /// Constructs a raw waker from a simulation context and a process. - #[inline] - fn raw_waker(self) -> task::RawWaker { - task::RawWaker::new(Weak::into_raw(self.0) as *const (), &Self::VTABLE) - } - - unsafe fn clone(this: *const ()) -> task::RawWaker { - let waker_ref = - &*ManuallyDrop::new(Weak::from_raw(this as *const ProcessInner<'_, G>)); - - // increase the reference counter once - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - let waker = waker_ref.clone(); - - task::RawWaker::new(Weak::into_raw(waker) as *const (), &Self::VTABLE) - } - - unsafe fn wake(this: *const ()) { - let waker = Weak::from_raw(this as *const ProcessInner<'_, G>); - let process = Process(waker); - process.wake_impl(); - } - - unsafe fn wake_by_ref(this: *const ()) { - // keep the waker alive by incrementing the reference count - let waker = Weak::from_raw(this as *const ProcessInner<'_, G>); - let process = &*ManuallyDrop::new(Process(waker)); - - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - process.clone().wake_impl(); - } - - fn wake_impl(self) { - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - let Some(strong_process) = self.upgrade() else { - // this can happen if a synchronization structure forgets to clean - // up registered Waker objects on destruct; this would lead to - // hard-to-diagnose bugs if we were to ignore it - panic!("attempted to wake a terminated process"); - }; - - if !strong_process.is_scheduled() { - strong_process.0.context.reactivate(self); - } - } - - unsafe fn drop(this: *const ()) { - // this is technically unsafe because Wakers are Send + Sync and so this - // call might be executed from a different thread, creating a data race - // hazard; we leave preventing this as an exercise to the reader! - Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>); - } + /// Virtual function table for the waker. + const VTABLE: task::RawWakerVTable = + task::RawWakerVTable::new(Self::clone, Self::wake, Self::wake_by_ref, Self::drop); + + /// Constructs a raw waker from a simulation context and a process. + #[inline] + fn raw_waker(self) -> task::RawWaker { + task::RawWaker::new(Weak::into_raw(self.0) as *const (), &Self::VTABLE) + } + + unsafe fn clone(this: *const ()) -> task::RawWaker { + let waker_ref = &*ManuallyDrop::new(Weak::from_raw(this as *const ProcessInner<'_, G>)); + + // increase the reference counter once + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + let waker = waker_ref.clone(); + + task::RawWaker::new(Weak::into_raw(waker) as *const (), &Self::VTABLE) + } + + unsafe fn wake(this: *const ()) { + let waker = Weak::from_raw(this as *const ProcessInner<'_, G>); + let process = Process(waker); + process.wake_impl(); + } + + unsafe fn wake_by_ref(this: *const ()) { + // keep the waker alive by incrementing the reference count + let waker = Weak::from_raw(this as *const ProcessInner<'_, G>); + let process = &*ManuallyDrop::new(Process(waker)); + + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + process.clone().wake_impl(); + } + + fn wake_impl(self) { + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + let Some(strong_process) = self.upgrade() else { + // this can happen if a synchronization structure forgets to clean + // up registered Waker objects on destruct; this would lead to + // hard-to-diagnose bugs if we were to ignore it + panic!("attempted to wake a terminated process"); + }; + + if !strong_process.is_scheduled() { + strong_process.0.context.reactivate(self); + } + } + + unsafe fn drop(this: *const ()) { + // this is technically unsafe because Wakers are Send + Sync and so this + // call might be executed from a different thread, creating a data race + // hazard; we leave preventing this as an exercise to the reader! + Weak::from_raw(this as *const RefCell<ProcessInner<'_, G>>); + } } /// Complex waker that is used to implement state events. @@ -585,181 +588,183 @@ impl<G> Process<'_, G> { /// This is the shallow version that is created on the stack of the function /// running the event loop. It creates the deep version when it is cloned. struct StateEventWaker<'s, G> { - context: Sim<'s, G>, + context: Sim<'s, G>, } impl<'s, G> StateEventWaker<'s, G> { - /// Virtual function table for the waker. - const VTABLE: task::RawWakerVTable = - task::RawWakerVTable::new(Self::clone, Self::wake, Self::wake_by_ref, Self::drop); - - /// Creates a new (shallow) waker using only the simulation context. - #[inline] - fn new(sim: Sim<'s, G>) -> Self { - StateEventWaker { context: sim } - } - - /// Constructs a new waker using only a reference. - /// - /// # Safety - /// - /// This function is unsafe because it is up to the caller to ensure that - /// the waker doesn't outlive the reference. - #[inline] - unsafe fn as_waker(&self) -> task::Waker { - task::Waker::from_raw(task::RawWaker::new( - self as *const _ as *const (), - &Self::VTABLE, - )) - } - - unsafe fn clone(this: *const ()) -> task::RawWaker { - // return the currently active process as a raw waker - (*(this as *const Self)).context.active().raw_waker() - } - - unsafe fn wake(_this: *const ()) { - // waking the active process can safely be ignored - } - - unsafe fn wake_by_ref(_this: *const ()) { - // waking the active process can safely be ignored - } - - unsafe fn drop(_this: *const ()) { - // memory is released in the main event loop - } + /// Virtual function table for the waker. + const VTABLE: task::RawWakerVTable = + task::RawWakerVTable::new(Self::clone, Self::wake, Self::wake_by_ref, Self::drop); + + /// Creates a new (shallow) waker using only the simulation context. + #[inline] + fn new(sim: Sim<'s, G>) -> Self { + StateEventWaker { context: sim } + } + + /// Constructs a new waker using only a reference. + /// + /// # Safety + /// + /// This function is unsafe because it is up to the caller to ensure that + /// the waker doesn't outlive the reference. + #[inline] + unsafe fn as_waker(&self) -> task::Waker { + task::Waker::from_raw(task::RawWaker::new( + self as *const _ as *const (), + &Self::VTABLE, + )) + } + + unsafe fn clone(this: *const ()) -> task::RawWaker { + // return the currently active process as a raw waker + (*(this as *const Self)).context.active().raw_waker() + } + + unsafe fn wake(_this: *const ()) { + // waking the active process can safely be ignored + } + + unsafe fn wake_by_ref(_this: *const ()) { + // waking the active process can safely be ignored + } + + unsafe fn drop(_this: *const ()) { + // memory is released in the main event loop + } } /* *************************** specialized futures ************************** */ /// Returns a future that unconditionally puts the calling process to sleep. pub fn sleep() -> impl Future<Output = ()> { - Sleep { ready: false } + Sleep { ready: false } } /// Custom structure that implements [`Future`] by suspending on the first call /// and returning on the second one. -/// +/// /// # Note /// This can be expressed more succinctly using the `poll_fn` function, but it /// is easier to see what's happening in this version. -struct Sleep { ready: bool } +struct Sleep { + ready: bool, +} impl Future for Sleep { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { - if self.ready { - Poll::Ready(()) - } else { - self.ready = true; - Poll::Pending - } - } + type Output = (); + + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.ready { + Poll::Ready(()) + } else { + self.ready = true; + Poll::Pending + } + } } /// Returns a future that can be awaited to produce the currently set waker. pub fn waker() -> impl Future<Output = task::Waker> { - poll_fn(|cx| Poll::Ready(cx.waker().clone())) + poll_fn(|cx| Poll::Ready(cx.waker().clone())) } #[cfg(test)] mod tests { - use super::*; - use std::panic::{catch_unwind, AssertUnwindSafe}; - - /// Test that waking a leaked waker does not cause UB. - #[test] - #[should_panic = "attempted to wake a terminated process"] - fn leak_waker_simple() { - let shared = RefCell::new(None); - let shared = simulation(shared, |sim| { - Process::new(sim, async move { - poll_fn(|cx| { - *sim.global().borrow_mut() = Some(cx.waker().clone()); - Poll::Ready(()) - }) - .await; - }) - }); - shared.take().unwrap().wake(); - } - - /// Test that all processes are dropped even if a destructor of a process unwinds. - /// This is required to properly invalidate all wakers that may be leaked. - #[test] - #[should_panic = "attempted to wake a terminated process"] - fn leak_waker_unwind() { - struct PanicOnDrop; - impl Drop for PanicOnDrop { - fn drop(&mut self) { - panic!("PanicOnDrop"); - } - } - - struct PushDropOrderOnDrop<'s> { - me: i32, - drop_order: &'s RefCell<Vec<i32>>, - } - - impl Drop for PushDropOrderOnDrop<'_> { - fn drop(&mut self) { - self.drop_order.borrow_mut().push(self.me); - } - } - - #[derive(Default)] - struct Shared { - channel: RefCell<Option<task::Waker>>, - drop_order: RefCell<Vec<i32>>, - } - - let shared = Shared::default(); - catch_unwind(AssertUnwindSafe(|| { - simulation(&shared, |sim| { - Process::new(sim, async move { - // This process is dropped 2nd and will begin the unwind. - sim.activate(async move { - let _guard = PushDropOrderOnDrop { - me: 2, - drop_order: &sim.global().drop_order, - }; - let _guard = PanicOnDrop; - sim.advance(2.0).await; - unreachable!(); - }); - - // This process is dropped 3rd, after the unwind started. - // We test that this process cannot be woken from a leaked waker. - sim.activate(async move { - let _guard = PushDropOrderOnDrop { - me: 3, - drop_order: &sim.global().drop_order, - }; - poll_fn(|cx| { - *sim.global().channel.borrow_mut() = Some(cx.waker().clone()); - Poll::Ready(()) - }) - .await; - sim.advance(2.0).await; - unreachable!(); - }); - - let _guard: PushDropOrderOnDrop = PushDropOrderOnDrop { - me: 1, - drop_order: &sim.global().drop_order, - }; - sim.advance(1.0).await; - // Finish the main process here, this will drop the other processes. - }) - }); - })) - .unwrap_err(); - - shared.drop_order.borrow_mut().sort(); - assert_eq!(*shared.drop_order.borrow(), [1, 2, 3]); - - shared.channel.take().unwrap().wake(); - } + use super::*; + use std::panic::{catch_unwind, AssertUnwindSafe}; + + /// Test that waking a leaked waker does not cause UB. + #[test] + #[should_panic = "attempted to wake a terminated process"] + fn leak_waker_simple() { + let shared = RefCell::new(None); + let shared = simulation(shared, |sim| { + Process::new(sim, async move { + poll_fn(|cx| { + *sim.global().borrow_mut() = Some(cx.waker().clone()); + Poll::Ready(()) + }) + .await; + }) + }); + shared.take().unwrap().wake(); + } + + /// Test that all processes are dropped even if a destructor of a process unwinds. + /// This is required to properly invalidate all wakers that may be leaked. + #[test] + #[should_panic = "attempted to wake a terminated process"] + fn leak_waker_unwind() { + struct PanicOnDrop; + impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("PanicOnDrop"); + } + } + + struct PushDropOrderOnDrop<'s> { + me: i32, + drop_order: &'s RefCell<Vec<i32>>, + } + + impl Drop for PushDropOrderOnDrop<'_> { + fn drop(&mut self) { + self.drop_order.borrow_mut().push(self.me); + } + } + + #[derive(Default)] + struct Shared { + channel: RefCell<Option<task::Waker>>, + drop_order: RefCell<Vec<i32>>, + } + + let shared = Shared::default(); + catch_unwind(AssertUnwindSafe(|| { + simulation(&shared, |sim| { + Process::new(sim, async move { + // This process is dropped 2nd and will begin the unwind. + sim.activate(async move { + let _guard = PushDropOrderOnDrop { + me: 2, + drop_order: &sim.global().drop_order, + }; + let _guard = PanicOnDrop; + sim.advance(2.0).await; + unreachable!(); + }); + + // This process is dropped 3rd, after the unwind started. + // We test that this process cannot be woken from a leaked waker. + sim.activate(async move { + let _guard = PushDropOrderOnDrop { + me: 3, + drop_order: &sim.global().drop_order, + }; + poll_fn(|cx| { + *sim.global().channel.borrow_mut() = Some(cx.waker().clone()); + Poll::Ready(()) + }) + .await; + sim.advance(2.0).await; + unreachable!(); + }); + + let _guard: PushDropOrderOnDrop = PushDropOrderOnDrop { + me: 1, + drop_order: &sim.global().drop_order, + }; + sim.advance(1.0).await; + // Finish the main process here, this will drop the other processes. + }) + }); + })) + .unwrap_err(); + + shared.drop_order.borrow_mut().sort(); + assert_eq!(*shared.drop_order.borrow(), [1, 2, 3]); + + shared.channel.take().unwrap().wake(); + } } diff --git a/src/main.rs b/src/main.rs index a60c363b84280b46499c2f17358f833e43e2b27c..1321936c07595e8f19c9b02f8e9679a24f9c46fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,17 @@ -use simcore_rs::*; use simcore_rs::util::channel; +use simcore_rs::*; fn main() { - simulation((), |sim| Process::new(sim, async move { - let (sx, rx) = channel(); - - sim.activate(async move { - rx.recv().await; - }); - - sim.advance(1.0).await; - sx.send(1).await.unwrap(); - })); + simulation((), |sim| { + Process::new(sim, async move { + let (sx, rx) = channel(); + + sim.activate(async move { + rx.recv().await; + }); + + sim.advance(1.0).await; + sx.send(1).await.unwrap(); + }) + }); } diff --git a/src/util.rs b/src/util.rs index 265124047aa33260292d512ee94ff21eed8e6268..eb68cb5cfc9ea3a1ba0a001f4d9ce64bf2dcd107 100644 --- a/src/util.rs +++ b/src/util.rs @@ -39,82 +39,82 @@ use crate::{sleep, waker, Process, Sim}; use std::{ - cell::{Cell, RefCell}, - collections::VecDeque, - fmt, - future::Future, - pin::Pin, - rc::{Rc, Weak}, - task::{self, Context, Poll}, + cell::{Cell, RefCell}, + collections::VecDeque, + fmt, + future::Future, + pin::Pin, + rc::{Rc, Weak}, + task::{self, Context, Poll}, }; /// GPSS-inspired facility that houses one exclusive resource to be used by /// arbitrary many processes. #[derive(Default)] pub struct Facility { - /// A queue of waiting-to-be-activated processes. - queue: RefCell<VecDeque<task::Waker>>, - /// A boolean indicating whether the facility is in use or not. - in_use: Cell<bool>, + /// A queue of waiting-to-be-activated processes. + queue: RefCell<VecDeque<task::Waker>>, + /// A boolean indicating whether the facility is in use or not. + in_use: Cell<bool>, } impl Facility { - /// Creates a new facility. - pub fn new() -> Self { - Facility { - queue: RefCell::new(VecDeque::new()), - in_use: Cell::new(false), - } - } - - /// Attempts to seize the facility, blocking until it's possible. - pub async fn seize(&self) { - if self.in_use.replace(true) { - let waker = waker().await; - self.queue.borrow_mut().push_back(waker); - sleep().await; - } - } - - /// Releases the facility and activates the next waiting process. - pub fn release(&self) { - if let Some(process) = self.queue.borrow_mut().pop_front() { - process.wake(); - } else { - self.in_use.set(false); - } - } + /// Creates a new facility. + pub fn new() -> Self { + Facility { + queue: RefCell::new(VecDeque::new()), + in_use: Cell::new(false), + } + } + + /// Attempts to seize the facility, blocking until it's possible. + pub async fn seize(&self) { + if self.in_use.replace(true) { + let waker = waker().await; + self.queue.borrow_mut().push_back(waker); + sleep().await; + } + } + + /// Releases the facility and activates the next waiting process. + pub fn release(&self) { + if let Some(process) = self.queue.borrow_mut().pop_front() { + process.wake(); + } else { + self.in_use.set(false); + } + } } /// A one-shot, writable container type that awakens a pre-registered process /// on write. pub struct Promise<T> { - /// The waker of the process to awaken on write. - caller: task::Waker, - /// The written value to be extracted by the caller. - result: Cell<Option<T>>, + /// The waker of the process to awaken on write. + caller: task::Waker, + /// The written value to be extracted by the caller. + result: Cell<Option<T>>, } impl<T> Promise<T> { - /// Creates a new promise with an unwritten result. - pub fn new(waker: task::Waker) -> Self { - Self { - caller: waker, - result: Cell::new(None), - } - } - - /// Writes the result value and reawakens the caller. - pub fn fulfill(&self, result: T) { - if self.result.replace(Some(result)).is_none() { - self.caller.wake_by_ref(); - } - } - - /// Extracts the written value and resets the state of the promise. - pub fn redeem(&self) -> Option<T> { - self.result.replace(None) - } + /// Creates a new promise with an unwritten result. + pub fn new(waker: task::Waker) -> Self { + Self { + caller: waker, + result: Cell::new(None), + } + } + + /// Writes the result value and reawakens the caller. + pub fn fulfill(&self, result: T) { + if self.result.replace(Some(result)).is_none() { + self.caller.wake_by_ref(); + } + } + + /// Extracts the written value and resets the state of the promise. + pub fn redeem(&self) -> Option<T> { + self.result.replace(None) + } } /// Returns a future that takes two other futures and completes as soon as @@ -126,56 +126,56 @@ impl<T> Promise<T> { /// compute a value, as long as the return type is identical in both cases. pub async fn select<'s, 'u, G, E, O, R>(sim: Sim<'s, G>, either: E, or: O) -> R where - E: Future<Output = R> + 'u, - O: Future<Output = R> + 'u, - R: 'u, + E: Future<Output = R> + 'u, + O: Future<Output = R> + 'u, + R: 'u, { - use std::mem::transmute; - - // create a one-shot channel that reactivates the caller on write - let promise = Promise::new(sim.active().waker()); - - // this unsafe block shortens the guaranteed lifetimes of the processes - // contained in the scheduler; it is safe because the constructed future - // ensures that both futures are terminated before the promise and - // itself are terminated, thereby preventing references to the lesser - // constrained processes to survive the call to select() - let sim = unsafe { transmute::<Sim<'s, G>, Sim<'_, G>>(sim) }; - - { - // create the two competing processes - // a future optimization would be to keep them on the stack - let p1 = Process::new(sim, async { - promise.fulfill(either.await); - }); - let p2 = Process::new(sim, async { - promise.fulfill(or.await); - }); - - struct TerminateOnDrop<'s, G>(Process<'s, G>); - impl<G> Drop for TerminateOnDrop<'_, G> { - fn drop(&mut self) { - self.0.terminate(); - } - } - - // `p1` and `p2` borrow from `promise` and must therefore be dropped - // before `promise` is dropped. In particular, we must ensure that the - // future returned by the outer `async fn` outlives the futures crated - // by the `async` blocks above. - let _g1 = TerminateOnDrop(p1.clone()); - let _g2 = TerminateOnDrop(p2.clone()); - - // activate them - sim.reactivate(p1); - sim.reactivate(p2); - - // wait for reactivation; the promise will wake us on fulfillment - sleep().await; - } - - // extract the promised value - promise.redeem().unwrap() + use std::mem::transmute; + + // create a one-shot channel that reactivates the caller on write + let promise = Promise::new(sim.active().waker()); + + // this unsafe block shortens the guaranteed lifetimes of the processes + // contained in the scheduler; it is safe because the constructed future + // ensures that both futures are terminated before the promise and + // itself are terminated, thereby preventing references to the lesser + // constrained processes to survive the call to select() + let sim = unsafe { transmute::<Sim<'s, G>, Sim<'_, G>>(sim) }; + + { + // create the two competing processes + // a future optimization would be to keep them on the stack + let p1 = Process::new(sim, async { + promise.fulfill(either.await); + }); + let p2 = Process::new(sim, async { + promise.fulfill(or.await); + }); + + struct TerminateOnDrop<'s, G>(Process<'s, G>); + impl<G> Drop for TerminateOnDrop<'_, G> { + fn drop(&mut self) { + self.0.terminate(); + } + } + + // `p1` and `p2` borrow from `promise` and must therefore be dropped + // before `promise` is dropped. In particular, we must ensure that the + // future returned by the outer `async fn` outlives the futures crated + // by the `async` blocks above. + let _g1 = TerminateOnDrop(p1.clone()); + let _g2 = TerminateOnDrop(p2.clone()); + + // activate them + sim.reactivate(p1); + sim.reactivate(p2); + + // wait for reactivation; the promise will wake us on fulfillment + sleep().await; + } + + // extract the promised value + promise.redeem().unwrap() } /// Complex channel with space for infinitely many elements of arbitrary type. @@ -183,22 +183,22 @@ where /// This channel supports arbitrary many readers and writers and uses wakers /// to reactivate suspended processes. struct Channel<T> { - /// A queue of messages. - store: VecDeque<T>, - /// A queue of processes waiting to receive a message. - waiting: VecDeque<(task::Waker, usize)>, - /// The number of unique wakers waiting so far. - ids: usize + /// A queue of messages. + store: VecDeque<T>, + /// A queue of processes waiting to receive a message. + waiting: VecDeque<(task::Waker, usize)>, + /// The number of unique wakers waiting so far. + ids: usize, } /// Creates a channel and returns a pair of read and write ends. pub fn channel<T>() -> (Sender<T>, Receiver<T>) { - let channel = Rc::new(RefCell::new(Channel { - store: VecDeque::new(), - waiting: VecDeque::new(), - ids: 0 - })); - (Sender(Rc::downgrade(&channel)), Receiver(channel)) + let channel = Rc::new(RefCell::new(Channel { + store: VecDeque::new(), + waiting: VecDeque::new(), + ids: 0, + })); + (Sender(Rc::downgrade(&channel)), Receiver(channel)) } /// Write-end of a channel. @@ -209,88 +209,93 @@ pub struct Receiver<T>(Rc<RefCell<Channel<T>>>); // Allow senders to be duplicated. impl<T> Clone for Sender<T> { - fn clone(&self) -> Self { - Self(self.0.clone()) - } + fn clone(&self) -> Self { + Self(self.0.clone()) + } } // Allow receivers to be duplicated. impl<T> Clone for Receiver<T> { - fn clone(&self) -> Self { - Self(self.0.clone()) - } + fn clone(&self) -> Self { + Self(self.0.clone()) + } } impl<T: Unpin> Sender<T> { - /// Returns a future that can be awaited to send a message. - pub fn send(&self, elem: T) -> SendFuture<'_, T> { - SendFuture(&self.0, Some(elem)) - } + /// Returns a future that can be awaited to send a message. + pub fn send(&self, elem: T) -> SendFuture<'_, T> { + SendFuture(&self.0, Some(elem)) + } } impl<T> Drop for Sender<T> { - #[inline] - fn drop(&mut self) { - // check if there are still receivers - if let Some(chan) = self.0.upgrade() { - // check if we're the last sender to drop - if Rc::weak_count(&chan) == 1 { - // awake all waiting receivers so that they get to return - for (process, _) in chan.borrow_mut().waiting.drain(..) { - process.wake(); - } - } - } - } + #[inline] + fn drop(&mut self) { + // check if there are still receivers + if let Some(chan) = self.0.upgrade() { + // check if we're the last sender to drop + if Rc::weak_count(&chan) == 1 { + // awake all waiting receivers so that they get to return + for (process, _) in chan.borrow_mut().waiting.drain(..) { + process.wake(); + } + } + } + } } impl<T: Unpin> Receiver<T> { - /// Returns a future that can be awaited to receive a message. - pub fn recv(&self) -> ReceiveFuture<'_, T> { - ReceiveFuture(&self.0, 0) - } - - /// Returns the number of elements that can be received before model time - /// has to be consumed. - pub fn len(&self) -> usize { - self.0.borrow().len() - } + /// Returns a future that can be awaited to receive a message. + pub fn recv(&self) -> ReceiveFuture<'_, T> { + ReceiveFuture(&self.0, 0) + } + + /// Returns the number of elements that can be received before model time + /// has to be consumed. + pub fn len(&self) -> usize { + self.0.borrow().len() + } + + /// Returns if the receiver is empty at the current model time. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl<T> Channel<T> { - /// Private method returning the number of elements left in the channel. - fn len(&self) -> usize { - self.store.len() - } - - /// Private method enqueuing a process into the waiting list. - fn enqueue(&mut self, process: task::Waker) -> usize { - self.ids = self.ids.checked_add(1).expect("ID overflow"); - self.waiting.push_back((process, self.ids)); - self.ids - } - - /// Private method removing a process from the waiting list. - fn dequeue(&mut self) -> Option<task::Waker> { - self.waiting.pop_front().map(|(p,_)| p) - } - - /// Private method that unregisters a previously registered waker. - fn unregister(&mut self, id: usize) { - if let Some(pos) = self.waiting.iter().position(|(_, tid)| *tid == id) { - self.waiting.remove(pos); - } - } - - /// Private method inserting a message into the queue. - fn send(&mut self, value: T) { - self.store.push_back(value); - } - - /// Private method extracting a message from the queue non-blocking. - fn recv(&mut self) -> Option<T> { - self.store.pop_front() - } + /// Private method returning the number of elements left in the channel. + fn len(&self) -> usize { + self.store.len() + } + + /// Private method enqueuing a process into the waiting list. + fn enqueue(&mut self, process: task::Waker) -> usize { + self.ids = self.ids.checked_add(1).expect("ID overflow"); + self.waiting.push_back((process, self.ids)); + self.ids + } + + /// Private method removing a process from the waiting list. + fn dequeue(&mut self) -> Option<task::Waker> { + self.waiting.pop_front().map(|(p, _)| p) + } + + /// Private method that unregisters a previously registered waker. + fn unregister(&mut self, id: usize) { + if let Some(pos) = self.waiting.iter().position(|(_, tid)| *tid == id) { + self.waiting.remove(pos); + } + } + + /// Private method inserting a message into the queue. + fn send(&mut self, value: T) { + self.store.push_back(value); + } + + /// Private method extracting a message from the queue non-blocking. + fn recv(&mut self) -> Option<T> { + self.store.pop_front() + } } /// Future for the [`send()`] operation on a channel sender. @@ -304,61 +309,61 @@ pub struct SendFuture<'c, T>(&'c Weak<RefCell<Channel<T>>>, Option<T>); pub struct ReceiveFuture<'c, T>(&'c Rc<RefCell<Channel<T>>>, usize); impl<T: Unpin> Future for SendFuture<'_, T> { - type Output = Result<(), T>; + type Output = Result<(), T>; - fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { - let elem = self.1.take().unwrap(); + fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { + let elem = self.1.take().unwrap(); - // test if there are still readers left to listen - if let Some(channel) = self.0.upgrade() { - let mut channel = channel.borrow_mut(); - channel.send(elem); + // test if there are still readers left to listen + if let Some(channel) = self.0.upgrade() { + let mut channel = channel.borrow_mut(); + channel.send(elem); - // awake a waiting process - if let Some(process) = channel.dequeue() { - process.wake(); - } + // awake a waiting process + if let Some(process) = channel.dequeue() { + process.wake(); + } - Poll::Ready(Ok(())) - } else { - Poll::Ready(Err(elem)) - } - } + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(elem)) + } + } } impl<T: Unpin> Future for ReceiveFuture<'_, T> { - type Output = Option<T>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut channel = self.0.borrow_mut(); - - // clear the local copy of our waker to prevent unnecessary - // de-registration attempts in our destructor - self.1 = 0; - - if let Some(c) = channel.recv() { - // there are elements in the channel - Poll::Ready(Some(c)) - } else if Rc::weak_count(self.0) == 0 { - // no elements in the channel and no potential senders exist - Poll::Ready(None) - } else { - // no elements, but potential senders exist: check back later - let waker = cx.waker().clone(); - self.1 = channel.enqueue(waker); - Poll::Pending - } - } + type Output = Option<T>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut channel = self.0.borrow_mut(); + + // clear the local copy of our waker to prevent unnecessary + // de-registration attempts in our destructor + self.1 = 0; + + if let Some(c) = channel.recv() { + // there are elements in the channel + Poll::Ready(Some(c)) + } else if Rc::weak_count(self.0) == 0 { + // no elements in the channel and no potential senders exist + Poll::Ready(None) + } else { + // no elements, but potential senders exist: check back later + let waker = cx.waker().clone(); + self.1 = channel.enqueue(waker); + Poll::Pending + } + } } impl<T> Drop for ReceiveFuture<'_, T> { - #[inline] - fn drop(&mut self) { - // take care to unregister our waker from the channel - if self.1 > 0 { - self.0.borrow_mut().unregister(self.1); - } - } + #[inline] + fn drop(&mut self) { + // take care to unregister our waker from the channel + if self.1 > 0 { + self.0.borrow_mut().unregister(self.1); + } + } } /// A trait signifying that the type implementing it is able to broadcast state @@ -371,86 +376,86 @@ impl<T> Drop for ReceiveFuture<'_, T> { /// [`until()`]: fn.until.html /// [`Control`]: struct.Control.html pub trait Publisher { - /// Allows a [waker] to subscribe to the controlled expression to be - /// informed about any and all state changes. - /// - /// # Safety - /// This method is unsafe, because the controlled expression will call the - /// [`wake_by_ref()`] method on the waker during every change in state. The caller - /// is responsible to ensure the validity of all subscribed wakers at - /// the time of the state change. - /// - /// The [`span()`] method provides a safe alternative. - /// - /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html - /// [`span()`]: trait.Controlled.html#method.span - /// [`wake_by_ref()`]: https://doc.rust-lang.org/std/task/struct.Waker.html#method.wake_by_ref - unsafe fn subscribe(&self, waker: &task::Waker); - - /// Unsubscribes a previously subscribed [waker] from the controlled - /// expression. - /// - /// # Safety - /// This method is unsafe, because it asserts that the waker that it - /// receives has been registered using [subscribe] previously, and has not - /// been unsubscribed from yet. - /// - /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html - /// [subscribe]: Self::subscribe - unsafe fn unsubscribe(&self, waker: &task::Waker); + /// Allows a [waker] to subscribe to the controlled expression to be + /// informed about any and all state changes. + /// + /// # Safety + /// This method is unsafe, because the controlled expression will call the + /// [`wake_by_ref()`] method on the waker during every change in state. The caller + /// is responsible to ensure the validity of all subscribed wakers at + /// the time of the state change. + /// + /// The [`span()`] method provides a safe alternative. + /// + /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + /// [`span()`]: trait.Controlled.html#method.span + /// [`wake_by_ref()`]: https://doc.rust-lang.org/std/task/struct.Waker.html#method.wake_by_ref + unsafe fn subscribe(&self, waker: &task::Waker); + + /// Unsubscribes a previously subscribed [waker] from the controlled + /// expression. + /// + /// # Safety + /// This method is unsafe, because it asserts that the waker that it + /// receives has been registered using [subscribe] previously, and has not + /// been unsubscribed from yet. + /// + /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + /// [subscribe]: Self::subscribe + unsafe fn unsubscribe(&self, waker: &task::Waker); } /// Guarding structure that ensures that waker and controlled expression are /// valid and fixed in space. pub struct WakerSpan<'s, C: ?Sized + Publisher> { - /// The controlled expression. - cv: &'s C, - /// The waker. - waker: &'s task::Waker, + /// The controlled expression. + cv: &'s C, + /// The waker. + waker: &'s task::Waker, } // implement a constructor for the guard impl<'s, C: ?Sized + Publisher> WakerSpan<'s, C> { - /// Subscribes a [waker] to a controlled expression for a certain duration. - /// - /// This method subscribes the waker to the controlled expression and - /// returns an object that unsubscribes the waker from the same controlled - /// expression upon drop. The borrow-checker secures the correct lifetimes - /// of waker and controlled expression so that this method can be safe. - /// - /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html - #[inline] - pub fn new(cv: &'s C, waker: &'s task::Waker) -> Self { - // this is safe because we bind the lifetime of the waker to the guard - unsafe { - cv.subscribe(waker); - } - WakerSpan { cv, waker } - } + /// Subscribes a [waker] to a controlled expression for a certain duration. + /// + /// This method subscribes the waker to the controlled expression and + /// returns an object that unsubscribes the waker from the same controlled + /// expression upon drop. The borrow-checker secures the correct lifetimes + /// of waker and controlled expression so that this method can be safe. + /// + /// [waker]: https://doc.rust-lang.org/std/task/struct.Waker.html + #[inline] + pub fn new(cv: &'s C, waker: &'s task::Waker) -> Self { + // this is safe because we bind the lifetime of the waker to the guard + unsafe { + cv.subscribe(waker); + } + WakerSpan { cv, waker } + } } // implement drop for the guard impl<C: ?Sized + Publisher> Drop for WakerSpan<'_, C> { - #[inline] - fn drop(&mut self) { - // this is safe because we subscribed in the only public constructor - unsafe { - self.cv.unsubscribe(self.waker); - } - } + #[inline] + fn drop(&mut self) { + // this is safe because we subscribed in the only public constructor + unsafe { + self.cv.unsubscribe(self.waker); + } + } } // a reference of a controlled expression is also a controlled expression impl<T: Publisher> Publisher for &'_ T { - #[inline] - unsafe fn subscribe(&self, waker: &task::Waker) { - Publisher::subscribe(*self, waker); - } + #[inline] + unsafe fn subscribe(&self, waker: &task::Waker) { + Publisher::subscribe(*self, waker); + } - #[inline] - unsafe fn unsubscribe(&self, waker: &task::Waker) { - Publisher::unsubscribe(*self, waker); - } + #[inline] + unsafe fn unsubscribe(&self, waker: &task::Waker) { + Publisher::unsubscribe(*self, waker); + } } /// Marks a variable as a state variable which can be used in conjunction with @@ -462,82 +467,82 @@ impl<T: Publisher> Publisher for &'_ T { /// /// [`until()`]: fn.until.html pub struct Control<T> { - /// The inner value. - value: Cell<T>, - /// A list of waiting processes, identified by their [wakers]. - /// - /// [wakers]: https://doc.rust-lang.org/std/task/struct.Waker.html - waiting: RefCell<Vec<task::Waker>>, + /// The inner value. + value: Cell<T>, + /// A list of waiting processes, identified by their [wakers]. + /// + /// [wakers]: https://doc.rust-lang.org/std/task/struct.Waker.html + waiting: RefCell<Vec<task::Waker>>, } impl<T> Control<T> { - /// Creates a new control variable and initializes its value. - #[inline] - pub const fn new(value: T) -> Self { - Self { - value: Cell::new(value), - waiting: RefCell::new(Vec::new()), - } - } - - /// Assigns a new value to the control variable. - /// - /// This can lead to potential activations of waiting processes. - #[inline] - pub fn set(&self, val: T) { - self.notify(); - self.value.set(val); - } - - /// Extracts the current value from the control variable. - #[inline] - pub fn get(&self) -> T - where - T: Copy, - { - self.value.get() - } - - /// Notifies all the waiting processes to re-check their state condition. - /// - /// This action is usually performed automatically when the value of the - /// control variable is changed through the [`set()`]-method but may be - /// triggered manually when this mechanism is bypassed somehow. - /// - /// [`set()`]: #method.set - pub fn notify(&self) { - for waker in self.waiting.borrow().iter() { - waker.wake_by_ref(); - } - } + /// Creates a new control variable and initializes its value. + #[inline] + pub const fn new(value: T) -> Self { + Self { + value: Cell::new(value), + waiting: RefCell::new(Vec::new()), + } + } + + /// Assigns a new value to the control variable. + /// + /// This can lead to potential activations of waiting processes. + #[inline] + pub fn set(&self, val: T) { + self.notify(); + self.value.set(val); + } + + /// Extracts the current value from the control variable. + #[inline] + pub fn get(&self) -> T + where + T: Copy, + { + self.value.get() + } + + /// Notifies all the waiting processes to re-check their state condition. + /// + /// This action is usually performed automatically when the value of the + /// control variable is changed through the [`set()`]-method but may be + /// triggered manually when this mechanism is bypassed somehow. + /// + /// [`set()`]: #method.set + pub fn notify(&self) { + for waker in self.waiting.borrow().iter() { + waker.wake_by_ref(); + } + } } // implement the trait marking control variables as controlled expressions impl<T: Copy> Publisher for Control<T> { - #[inline] - unsafe fn subscribe(&self, waker: &task::Waker) { - self.waiting.borrow_mut().push(waker.clone()); - } - - unsafe fn unsubscribe(&self, waker: &task::Waker) { - let mut waiting = self.waiting.borrow_mut(); - let pos = waiting.iter().position(|w| w.will_wake(waker)); - - if let Some(pos) = pos { - waiting.remove(pos); - } else { - panic!("attempt to unsubscribe waker that isn't subscribed to"); - } - } + #[inline] + unsafe fn subscribe(&self, waker: &task::Waker) { + self.waiting.borrow_mut().push(waker.clone()); + } + + unsafe fn unsubscribe(&self, waker: &task::Waker) { + let mut waiting = self.waiting.borrow_mut(); + let pos = waiting.iter().position(|w| w.will_wake(waker)); + + if let Some(pos) = pos { + waiting.remove(pos); + } else { + panic!("attempt to unsubscribe waker that isn't subscribed to"); + } + } } impl<T: Default> Default for Control<T> { - fn default() -> Self { - Self { - value: Cell::new(T::default()), - waiting: RefCell::new(Vec::new()), - } - } + fn default() -> Self { + Self { + value: Cell::new(T::default()), + waiting: RefCell::new(Vec::new()), + } + } } /// An internal macro to generate implementations of the [`Publisher`] trait @@ -567,116 +572,116 @@ macro_rules! controlled_tuple_impl { } controlled_tuple_impl! { - A -> 0, B -> 1, C -> 2, D -> 3, E -> 4, F -> 5, G -> 6, H -> 7, - I -> 8, J -> 9, K ->10, L ->11, M ->12, N ->13, O ->14, P ->15, + A -> 0, B -> 1, C -> 2, D -> 3, E -> 4, F -> 5, G -> 6, H -> 7, + I -> 8, J -> 9, K ->10, L ->11, M ->12, N ->13, O ->14, P ->15, } /// Returns a future that suspends until an arbitrary boolean condition /// involving control expressions evaluates to `true`. #[inline] pub async fn until<C: Publisher>(set: C, pred: impl Fn(&C) -> bool) { - if !pred(&set) { - let waker = waker().await; - let _span = WakerSpan::new(&set, &waker); - - loop { - sleep().await; - if pred(&set) { - break; - } - } - } + if !pred(&set) { + let waker = waker().await; + let _span = WakerSpan::new(&set, &waker); + + loop { + sleep().await; + if pred(&set) { + break; + } + } + } } /* ************************* statistical facilities ************************* */ /// A simple collector for statistical data, inspired by SLX's random_variable. -/// +/// /// # Warning /// This implementation is not numerically stable, since it keeps sums of the /// passed values and of the squared values. It would be better to use a /// numerically stable online algorithm instead. #[derive(Clone)] -pub struct RandomVar { - total: Cell<u32>, - sum: Cell<f64>, - sqr: Cell<f64>, - min: Cell<f64>, - max: Cell<f64>, -} - -impl RandomVar { - /// Creates a new random variable. - #[inline] - pub fn new() -> Self { - RandomVar::default() - } - - /// Resets all stored statistical data. - pub fn clear(&self) { - self.total.set(0); - self.sum.set(0.0); - self.sqr.set(0.0); - self.min.set(f64::INFINITY); - self.max.set(f64::NEG_INFINITY); - } - - /// Adds another value to the statistical collection. - pub fn tabulate<T: Into<f64>>(&self, val: T) { - let val: f64 = val.into(); - - self.total.set(self.total.get() + 1); - self.sum.set(self.sum.get() + val); - self.sqr.set(self.sqr.get() + val * val); - - if self.min.get() > val { - self.min.set(val); - } - if self.max.get() < val { - self.max.set(val); - } - } - - /// Combines the statistical collection of two random variables into one. - pub fn merge(&self, other: &Self) { - self.total.set(self.total.get() + other.total.get()); - self.sum.set(self.sum.get() + other.sum.get()); - self.sqr.set(self.sqr.get() + other.sqr.get()); - - if self.min.get() > other.min.get() { - self.min.set(other.min.get()); - } - if self.max.get() < other.max.get() { - self.max.set(other.max.get()); - } - } -} - -impl Default for RandomVar { - fn default() -> Self { - RandomVar { - total: Cell::default(), - sum: Cell::default(), - sqr: Cell::default(), - min: Cell::new(f64::INFINITY), - max: Cell::new(f64::NEG_INFINITY), - } - } -} - -impl fmt::Debug for RandomVar { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let total = self.total.get(); - let mean = self.sum.get() / f64::from(total); - let variance = self.sqr.get() / f64::from(total) - mean * mean; - let std_dev = variance.sqrt(); - - f.debug_struct("RandomVar") - .field("n", &total) - .field("min", &self.min.get()) - .field("max", &self.max.get()) - .field("mean", &mean) - .field("sdev", &std_dev) - .finish() - } +pub struct RandomVariable { + total: Cell<u32>, + sum: Cell<f64>, + sqr: Cell<f64>, + min: Cell<f64>, + max: Cell<f64>, +} + +impl RandomVariable { + /// Creates a new random variable. + #[inline] + pub fn new() -> Self { + RandomVariable::default() + } + + /// Resets all stored statistical data. + pub fn clear(&self) { + self.total.set(0); + self.sum.set(0.0); + self.sqr.set(0.0); + self.min.set(f64::INFINITY); + self.max.set(f64::NEG_INFINITY); + } + + /// Adds another value to the statistical collection. + pub fn tabulate<T: Into<f64>>(&self, val: T) { + let val: f64 = val.into(); + + self.total.set(self.total.get() + 1); + self.sum.set(self.sum.get() + val); + self.sqr.set(self.sqr.get() + val * val); + + if self.min.get() > val { + self.min.set(val); + } + if self.max.get() < val { + self.max.set(val); + } + } + + /// Combines the statistical collection of two random variables into one. + pub fn merge(&self, other: &Self) { + self.total.set(self.total.get() + other.total.get()); + self.sum.set(self.sum.get() + other.sum.get()); + self.sqr.set(self.sqr.get() + other.sqr.get()); + + if self.min.get() > other.min.get() { + self.min.set(other.min.get()); + } + if self.max.get() < other.max.get() { + self.max.set(other.max.get()); + } + } +} + +impl Default for RandomVariable { + fn default() -> Self { + RandomVariable { + total: Cell::default(), + sum: Cell::default(), + sqr: Cell::default(), + min: Cell::new(f64::INFINITY), + max: Cell::new(f64::NEG_INFINITY), + } + } +} + +impl fmt::Debug for RandomVariable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let total = self.total.get(); + let mean = self.sum.get() / f64::from(total); + let variance = self.sqr.get() / f64::from(total) - mean * mean; + let std_dev = variance.sqrt(); + + f.debug_struct("RandomVar") + .field("n", &total) + .field("min", &self.min.get()) + .field("max", &self.max.get()) + .field("mean", &mean) + .field("sdev", &std_dev) + .finish() + } }