1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
module type Service = sig
type t
type cfg
type 'a op
type wrapped_op = Mk : 'a op * 'a Picos.Computation.t -> wrapped_op
val init : ?cfg:cfg -> unit -> t
val run : t -> wrapped_op array -> unit
end
module Make (S : Service) = struct
type t = {
internal : S.t;
running : bool Atomic.t;
container : S.wrapped_op Ts_container.t;
}
let init ?cfg () =
{
internal = S.init ?cfg ();
running = Atomic.make false;
container = Ts_container.create ();
}
let exec t op =
let open Picos in
let comp = Computation.create () in
let op_set = S.Mk (op, comp) in
Ts_container.add t.container op_set;
while Computation.peek comp = None do
if
Ts_container.size t.container > 0
&& Atomic.compare_and_set t.running false true
then (
let batch = Ts_container.get t.container in
S.run t.internal batch;
Atomic.set t.running false)
else
Fiber.yield ()
done;
Computation.await comp
let get_internal t = t.internal
end