diff --git a/bench/bench.ml b/bench/bench.ml index 88f67360..2d4c6863 100644 --- a/bench/bench.ml +++ b/bench/bench.ml @@ -30,37 +30,11 @@ module Times = struct results.(domain_i) done in - let prepare_for_await () = - let open struct - type state = Init | Released | Awaiting of { mutable released : bool } - end in - let state = Atomic.make Init in - let release () = - if Multicore_magic.fenceless_get state != Released then - match Atomic.exchange state Released with - | Awaiting r -> r.released <- true - | _ -> () - in - let await () = - if Multicore_magic.fenceless_get state != Released then - let awaiting = Awaiting { released = false } in - if Atomic.compare_and_set state Init awaiting then - match awaiting with - | Awaiting r -> - (* Avoid sleeping *) - while not r.released do - Domain.cpu_relax () - done - | _ -> () - in - Domain_local_await.{ release; await } + let domains = + Array.init n_domains @@ fun domain_i -> + Domain.spawn @@ fun () -> main domain_i in - Domain_local_await.using ~prepare_for_await ~while_running:(fun () -> - let domains = - Array.init n_domains @@ fun domain_i -> - Domain.spawn @@ fun () -> main domain_i - in - Array.iter Domain.join domains); + Array.iter Domain.join domains; let n = Stack.length results.(0) in let times = Array.create_float n in for run_i = 0 to n - 1 do diff --git a/bench/dune b/bench/dune index 06c316c6..a8e99540 100644 --- a/bench/dune +++ b/bench/dune @@ -3,7 +3,7 @@ (package kcas_data) (libraries kcas_data - domain-local-await + single-use-event multicore-magic yojson domain_shims diff --git a/doc/scheduler-interop.md b/doc/scheduler-interop.md index 827a4e96..a29f6341 100644 --- a/doc/scheduler-interop.md +++ b/doc/scheduler-interop.md @@ -21,6 +21,7 @@ implementations that are conveniently provided by ```ocaml # #thread # #require "kcas_data" +# #require "single-use-event" # open Kcas_data # open Kcas ``` @@ -36,45 +37,30 @@ module Scheduler : sig val fiber : t -> (unit -> 'a) -> 'a Promise.t end = struct open Effect.Deep - type _ Effect.t += - | Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t type t = { queue: (unit -> unit) Queue.t; domain: unit Domain.t } let spawn () = - let queue = Queue.create () in + let queue: (unit -> unit) Queue.t = Queue.create () in let rec scheduler work = let effc (type a) : a Effect.t -> _ = function - | Suspend ef -> Some ef - | _ -> None in + | Single_use_event.Await sue -> + Some (fun (k: (a, _) continuation) -> + if not (Single_use_event.is_initial sue) || + not (Single_use_event.try_attach sue @@ fun () -> + Queue.add (continue k) queue) then + continue k ()) + | _ -> + None in try_with work () { effc }; match Queue.take_opt queue with | Some work -> scheduler work | None -> () in - let prepare_for_await _ = - let state = Atomic.make `Init in - let release () = - if Atomic.get state != `Released then - match Atomic.exchange state `Released with - | `Awaiting k -> - Queue.add (continue k) queue - | _ -> () in - let await () = - if Atomic.get state != `Released then - Effect.perform @@ Suspend (fun k -> - if not (Atomic.compare_and_set state `Init - (`Awaiting k)) then - continue k ()) - in - Domain_local_await.{ release; await } in let domain = Domain.spawn @@ fun () -> try while true do - let work = Queue.take_blocking queue in - Domain_local_await.using - ~prepare_for_await - ~while_running:(fun () -> scheduler work) + scheduler (Queue.take_blocking queue) done with Exit -> () in { queue; domain } diff --git a/dune-project b/dune-project index cacf25b7..3626f964 100644 --- a/dune-project +++ b/dune-project @@ -13,7 +13,7 @@ (depends (ocaml (>= 4.13.0)) (backoff (>= 0.1.0)) - (domain-local-await (>= 1.0.0)) + single-use-event (domain-local-timeout (>= 1.0.0)) (multicore-magic (>= 2.0.0)) (domain_shims (and (>= 0.1.0) :with-test)) @@ -26,7 +26,7 @@ (depends (kcas (= :version)) (multicore-magic (>= 2.0.0)) - (domain-local-await (and (>= 1.0.0) :with-test)) + single-use-event (domain_shims (and (>= 0.1.0) :with-test)) (mtime (and (>= 2.0.0) :with-test)) (alcotest (and (>= 1.7.0) :with-test)) diff --git a/kcas.opam b/kcas.opam index 14e35571..d2c9504a 100644 --- a/kcas.opam +++ b/kcas.opam @@ -16,7 +16,7 @@ depends: [ "dune" {>= "3.8"} "ocaml" {>= "4.13.0"} "backoff" {>= "0.1.0"} - "domain-local-await" {>= "1.0.0"} + "single-use-event" "domain-local-timeout" {>= "1.0.0"} "multicore-magic" {>= "2.0.0"} "domain_shims" {>= "0.1.0" & with-test} @@ -40,3 +40,6 @@ build: [ ] dev-repo: "git+https://github.com/ocaml-multicore/kcas.git" doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "single-use-event.dev" "git+https://github.com/ocaml-multicore/single-use-event#89a920f0d9bef9ff4b082b3021dea562dcf9c129"] +] diff --git a/kcas.opam.template b/kcas.opam.template index 0fd71d5e..4f4e9336 100644 --- a/kcas.opam.template +++ b/kcas.opam.template @@ -1 +1,4 @@ doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "single-use-event.dev" "git+https://github.com/ocaml-multicore/single-use-event#89a920f0d9bef9ff4b082b3021dea562dcf9c129"] +] diff --git a/kcas_data.opam b/kcas_data.opam index 30426f56..9aaa724b 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -16,7 +16,7 @@ depends: [ "dune" {>= "3.8"} "kcas" {= version} "multicore-magic" {>= "2.0.0"} - "domain-local-await" {>= "1.0.0" & with-test} + "single-use-event" "domain_shims" {>= "0.1.0" & with-test} "mtime" {>= "2.0.0" & with-test} "alcotest" {>= "1.7.0" & with-test} diff --git a/src/kcas/dune b/src/kcas/dune index 70b19af5..7dc89fec 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,7 +1,7 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await domain-local-timeout backoff multicore-magic)) + (libraries single-use-event domain-local-timeout backoff multicore-magic)) (mdx (package kcas) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index d3bf12ce..13a95d7c 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -58,16 +58,19 @@ module Timeout = struct let[@inline] set_opt state seconds = if seconds != None then set_opt state seconds - let[@inline never] await state release = + let[@inline never] await state sue = match fenceless_get state with | Call _ as alive -> - if Atomic.compare_and_set state alive (Call release) then alive + if + Atomic.compare_and_set state alive + (Call (fun () -> Single_use_event.signal sue)) + then alive else timeout () | Unset | Elapsed -> timeout () - let[@inline] await state release = + let[@inline] await state sue = let alive = fenceless_get state in - if alive == Unset then Unset else await state release + if alive == Unset then Unset else await state sue let[@inline never] unawait state alive = match fenceless_get state with @@ -114,9 +117,9 @@ end = struct x end -type awaiter = unit -> unit +type awaiter = Single_use_event.t -let[@inline] resume_awaiter awaiter = awaiter () +let[@inline] resume_awaiter awaiter = Single_use_event.signal awaiter let[@inline] resume_awaiters = function | [] -> () @@ -408,7 +411,8 @@ let add_awaiter loc before awaiter = let awaiters = awaiter :: state_old.awaiters in { before; after = before; casn = casn_after; awaiters } in - before == eval state_old + Single_use_event.is_initial awaiter + && before == eval state_old && Atomic.compare_and_set (as_atomic loc) state_old state_new let[@tail_mod_cons] rec remove_first x' removed = function @@ -429,12 +433,12 @@ let rec remove_awaiter loc before awaiter = remove_awaiter loc before awaiter let block timeout loc before = - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await timeout t.release in - if add_awaiter loc before t.release then begin - try t.await () + let t = Single_use_event.create () in + let alive = Timeout.await timeout t in + if add_awaiter loc before t then begin + try Single_use_event.await t with cancellation_exn -> - remove_awaiter loc before t.release; + remove_awaiter loc before t; Timeout.cancel_alive alive; raise cancellation_exn end; @@ -969,22 +973,22 @@ module Xt = struct commit (Backoff.once backoff) mode (reset_quick xt) tx | exception Retry.Later -> begin if xt.cass == NIL then invalid_retry (); - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await (timeout_as_atomic xt) t.release in - match add_awaiters t.release xt.casn xt.cass with + let t = Single_use_event.create () in + let alive = Timeout.await (timeout_as_atomic xt) t in + match add_awaiters t xt.casn xt.cass with | NIL -> begin - match t.await () with + match Single_use_event.await t with | () -> - remove_awaiters t.release xt.casn NIL xt.cass; + remove_awaiters t xt.casn NIL xt.cass; Timeout.unawait (timeout_as_atomic xt) alive; commit (Backoff.reset backoff) mode (reset_quick xt) tx | exception cancellation_exn -> - remove_awaiters t.release xt.casn NIL xt.cass; + remove_awaiters t xt.casn NIL xt.cass; Timeout.cancel_alive alive; raise cancellation_exn end | CASN _ as stop -> - remove_awaiters t.release xt.casn stop xt.cass; + remove_awaiters t xt.casn stop xt.cass; Timeout.unawait (timeout_as_atomic xt) alive; commit (Backoff.once backoff) mode (reset_quick xt) tx end