Skip to content

Commit

Permalink
Change to use single-use-event instead of domain-local-await
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Sep 20, 2023
1 parent 4c16f6b commit 69b6b0e
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 80 deletions.
34 changes: 4 additions & 30 deletions bench/bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,11 @@ module Times = struct
results.(domain_i)
done
in
let prepare_for_await () =
let open struct
type state = Init | Released | Awaiting of { mutable released : bool }
end in
let state = Atomic.make Init in
let release () =
if Multicore_magic.fenceless_get state != Released then
match Atomic.exchange state Released with
| Awaiting r -> r.released <- true
| _ -> ()
in
let await () =
if Multicore_magic.fenceless_get state != Released then
let awaiting = Awaiting { released = false } in
if Atomic.compare_and_set state Init awaiting then
match awaiting with
| Awaiting r ->
(* Avoid sleeping *)
while not r.released do
Domain.cpu_relax ()
done
| _ -> ()
in
Domain_local_await.{ release; await }
let domains =
Array.init n_domains @@ fun domain_i ->
Domain.spawn @@ fun () -> main domain_i
in
Domain_local_await.using ~prepare_for_await ~while_running:(fun () ->
let domains =
Array.init n_domains @@ fun domain_i ->
Domain.spawn @@ fun () -> main domain_i
in
Array.iter Domain.join domains);
Array.iter Domain.join domains;
let n = Stack.length results.(0) in
let times = Array.create_float n in
for run_i = 0 to n - 1 do
Expand Down
2 changes: 1 addition & 1 deletion bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(package kcas_data)
(libraries
kcas_data
domain-local-await
single-use-event
multicore-magic
yojson
domain_shims
Expand Down
36 changes: 11 additions & 25 deletions doc/scheduler-interop.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ implementations that are conveniently provided by
```ocaml
# #thread
# #require "kcas_data"
# #require "single-use-event"
# open Kcas_data
# open Kcas
```
Expand All @@ -36,45 +37,30 @@ module Scheduler : sig
val fiber : t -> (unit -> 'a) -> 'a Promise.t
end = struct
open Effect.Deep
type _ Effect.t +=
| Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t
type t = {
queue: (unit -> unit) Queue.t;
domain: unit Domain.t
}
let spawn () =
let queue = Queue.create () in
let queue: (unit -> unit) Queue.t = Queue.create () in
let rec scheduler work =
let effc (type a) : a Effect.t -> _ = function
| Suspend ef -> Some ef
| _ -> None in
| Single_use_event.Await sue ->
Some (fun (k: (a, _) continuation) ->
if not (Single_use_event.is_initial sue) ||
not (Single_use_event.try_attach sue @@ fun () ->
Queue.add (continue k) queue) then
continue k ())
| _ ->
None in
try_with work () { effc };
match Queue.take_opt queue with
| Some work -> scheduler work
| None -> () in
let prepare_for_await _ =
let state = Atomic.make `Init in
let release () =
if Atomic.get state != `Released then
match Atomic.exchange state `Released with
| `Awaiting k ->
Queue.add (continue k) queue
| _ -> () in
let await () =
if Atomic.get state != `Released then
Effect.perform @@ Suspend (fun k ->
if not (Atomic.compare_and_set state `Init
(`Awaiting k)) then
continue k ())
in
Domain_local_await.{ release; await } in
let domain = Domain.spawn @@ fun () ->
try
while true do
let work = Queue.take_blocking queue in
Domain_local_await.using
~prepare_for_await
~while_running:(fun () -> scheduler work)
scheduler (Queue.take_blocking queue)
done
with Exit -> () in
{ queue; domain }
Expand Down
4 changes: 2 additions & 2 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
(depends
(ocaml (>= 4.13.0))
(backoff (>= 0.1.0))
(domain-local-await (>= 1.0.0))
single-use-event
(domain-local-timeout (>= 1.0.0))
(multicore-magic (>= 2.0.0))
(domain_shims (and (>= 0.1.0) :with-test))
Expand All @@ -26,7 +26,7 @@
(depends
(kcas (= :version))
(multicore-magic (>= 2.0.0))
(domain-local-await (and (>= 1.0.0) :with-test))
single-use-event
(domain_shims (and (>= 0.1.0) :with-test))
(mtime (and (>= 2.0.0) :with-test))
(alcotest (and (>= 1.7.0) :with-test))
Expand Down
5 changes: 4 additions & 1 deletion kcas.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ depends: [
"dune" {>= "3.8"}
"ocaml" {>= "4.13.0"}
"backoff" {>= "0.1.0"}
"domain-local-await" {>= "1.0.0"}
"single-use-event"
"domain-local-timeout" {>= "1.0.0"}
"multicore-magic" {>= "2.0.0"}
"domain_shims" {>= "0.1.0" & with-test}
Expand All @@ -40,3 +40,6 @@ build: [
]
dev-repo: "git+https://github.com/ocaml-multicore/kcas.git"
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
pin-depends: [
[ "single-use-event.dev" "git+https://github.com/ocaml-multicore/single-use-event#89a920f0d9bef9ff4b082b3021dea562dcf9c129"]
]
3 changes: 3 additions & 0 deletions kcas.opam.template
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
pin-depends: [
[ "single-use-event.dev" "git+https://github.com/ocaml-multicore/single-use-event#89a920f0d9bef9ff4b082b3021dea562dcf9c129"]
]
2 changes: 1 addition & 1 deletion kcas_data.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ depends: [
"dune" {>= "3.8"}
"kcas" {= version}
"multicore-magic" {>= "2.0.0"}
"domain-local-await" {>= "1.0.0" & with-test}
"single-use-event"
"domain_shims" {>= "0.1.0" & with-test}
"mtime" {>= "2.0.0" & with-test}
"alcotest" {>= "1.7.0" & with-test}
Expand Down
2 changes: 1 addition & 1 deletion src/kcas/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(name kcas)
(public_name kcas)
(libraries domain-local-await domain-local-timeout backoff multicore-magic))
(libraries single-use-event domain-local-timeout backoff multicore-magic))

(mdx
(package kcas)
Expand Down
42 changes: 23 additions & 19 deletions src/kcas/kcas.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@ module Timeout = struct
let[@inline] set_opt state seconds =
if seconds != None then set_opt state seconds

let[@inline never] await state release =
let[@inline never] await state sue =
match fenceless_get state with
| Call _ as alive ->
if Atomic.compare_and_set state alive (Call release) then alive
if
Atomic.compare_and_set state alive
(Call (fun () -> Single_use_event.signal sue))
then alive
else timeout ()
| Unset | Elapsed -> timeout ()

let[@inline] await state release =
let[@inline] await state sue =
let alive = fenceless_get state in
if alive == Unset then Unset else await state release
if alive == Unset then Unset else await state sue

let[@inline never] unawait state alive =
match fenceless_get state with
Expand Down Expand Up @@ -114,9 +117,9 @@ end = struct
x
end

type awaiter = unit -> unit
type awaiter = Single_use_event.t

let[@inline] resume_awaiter awaiter = awaiter ()
let[@inline] resume_awaiter awaiter = Single_use_event.signal awaiter

let[@inline] resume_awaiters = function
| [] -> ()
Expand Down Expand Up @@ -408,7 +411,8 @@ let add_awaiter loc before awaiter =
let awaiters = awaiter :: state_old.awaiters in
{ before; after = before; casn = casn_after; awaiters }
in
before == eval state_old
Single_use_event.is_initial awaiter
&& before == eval state_old
&& Atomic.compare_and_set (as_atomic loc) state_old state_new

let[@tail_mod_cons] rec remove_first x' removed = function
Expand All @@ -429,12 +433,12 @@ let rec remove_awaiter loc before awaiter =
remove_awaiter loc before awaiter

let block timeout loc before =
let t = Domain_local_await.prepare_for_await () in
let alive = Timeout.await timeout t.release in
if add_awaiter loc before t.release then begin
try t.await ()
let t = Single_use_event.create () in
let alive = Timeout.await timeout t in
if add_awaiter loc before t then begin
try Single_use_event.await t
with cancellation_exn ->
remove_awaiter loc before t.release;
remove_awaiter loc before t;
Timeout.cancel_alive alive;
raise cancellation_exn
end;
Expand Down Expand Up @@ -969,22 +973,22 @@ module Xt = struct
commit (Backoff.once backoff) mode (reset_quick xt) tx
| exception Retry.Later -> begin
if xt.cass == NIL then invalid_retry ();
let t = Domain_local_await.prepare_for_await () in
let alive = Timeout.await (timeout_as_atomic xt) t.release in
match add_awaiters t.release xt.casn xt.cass with
let t = Single_use_event.create () in
let alive = Timeout.await (timeout_as_atomic xt) t in
match add_awaiters t xt.casn xt.cass with
| NIL -> begin
match t.await () with
match Single_use_event.await t with
| () ->
remove_awaiters t.release xt.casn NIL xt.cass;
remove_awaiters t xt.casn NIL xt.cass;
Timeout.unawait (timeout_as_atomic xt) alive;
commit (Backoff.reset backoff) mode (reset_quick xt) tx
| exception cancellation_exn ->
remove_awaiters t.release xt.casn NIL xt.cass;
remove_awaiters t xt.casn NIL xt.cass;
Timeout.cancel_alive alive;
raise cancellation_exn
end
| CASN _ as stop ->
remove_awaiters t.release xt.casn stop xt.cass;
remove_awaiters t xt.casn stop xt.cass;
Timeout.unawait (timeout_as_atomic xt) alive;
commit (Backoff.once backoff) mode (reset_quick xt) tx
end
Expand Down

0 comments on commit 69b6b0e

Please sign in to comment.