1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type t = Broadcast.t
let create () = Broadcast.create ()
let await_generic ?mutex t =
match
Suspend.enter_unchecked (fun ctx enqueue ->
match Fiber_context.get_error ctx with
| Some ex ->
Option.iter Eio_mutex.unlock mutex;
enqueue (Error ex)
| None ->
match Broadcast.suspend t (fun () -> enqueue (Ok ())) with
| None ->
Option.iter Eio_mutex.unlock mutex
| Some request ->
Option.iter Eio_mutex.unlock mutex;
Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
)
)
with
| () -> Option.iter Eio_mutex.lock mutex
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Option.iter Eio_mutex.lock mutex;
Printexc.raise_with_backtrace ex bt
let await t mutex = await_generic ~mutex t
let await_no_mutex t = await_generic t
let broadcast = Broadcast.resume_all