1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
type waker = unit -> unit
let[@inline] suspend ~before_suspend =
Effect.perform @@ Effects.Suspend { before_suspend }
type 'a t = {
max_size: int;
q: 'a Queue.t;
receivers: waker Queue.t;
senders: waker Queue.t;
mutable closed: bool;
}
let create ?(max_size = max_int) () : _ t =
{
max_size;
q = Queue.create ();
receivers = Queue.create ();
senders = Queue.create ();
closed = false;
}
exception Closed
let[@inline] size self = Queue.length self.q
let[@inline] is_empty self = Queue.is_empty self.q
let[@inline] is_full_ self : bool = Queue.length self.q >= self.max_size
let close self : unit =
self.closed <- true;
Queue.iter (fun f -> f ()) self.senders;
Queue.iter (fun f -> f ()) self.receivers;
()
let wakeup_receivers self =
while not (Queue.is_empty self.receivers) do
let r = Queue.pop self.receivers in
r ()
done
let send self x : unit =
let continue = ref true in
while !continue do
if self.closed then raise Closed;
if is_full_ self then
suspend ~before_suspend:(fun ~wakeup -> Queue.push wakeup self.senders)
else (
continue := false;
Queue.push x self.q;
wakeup_receivers self
)
done
let try_send self x : bool =
if self.closed then raise Closed;
if is_full_ self then
false
else (
Queue.push x self.q;
wakeup_receivers self;
true
)
let on_send_ready self cb =
if is_full_ self then
Queue.push cb self.senders
else
cb ()
let wakeup_senders self =
while not (Queue.is_empty self.senders) do
let w = Queue.pop self.senders in
w ()
done
let rec receive_exn (self : 'a t) : 'a =
match Queue.pop self.q with
| x ->
wakeup_senders self;
x
| exception Queue.Empty ->
if self.closed then raise Closed;
suspend ~before_suspend:(fun ~wakeup -> Queue.push wakeup self.receivers);
receive_exn self
let receive (self : 'a t) : 'a option =
try Some (receive_exn self) with Closed -> None
let try_receive (self : 'a t) : 'a option =
if Queue.is_empty self.q then (
if self.closed then raise Closed;
None
) else (
let x = Queue.pop self.q in
wakeup_senders self;
Some x
)
let on_receive_ready (self : _ t) cb : unit =
if Queue.is_empty self.q then
Queue.push cb self.receivers
else
cb ()
let ev_send c x : unit Event.t =
let poll () =
if try_send c x then
Some (Ok ())
else
None
in
let wait cb =
on_send_ready c cb;
Cancel_handle.dummy
in
{ poll; wait }
let ev_receive (self : 'a t) : 'a Event.t =
let poll () =
match try_receive self with
| Some x -> Some (Ok x)
| None -> None
| exception (Closed as exn) ->
let ebt = Exn_bt.get exn in
Some (Error ebt)
in
let wait cb =
on_receive_ready self cb;
Cancel_handle.dummy
in
{ poll; wait }