Source file ts_container.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type 'a t = { q : 'a Picos_aux_mpscq.t; size : int Atomic.t; batch_limit : int }
let create ?(batch_limit = max_int) () =
{ q = Picos_aux_mpscq.create (); size = Atomic.make 0; batch_limit }
let add t elt =
Picos_aux_mpscq.push t.q elt;
Atomic.incr t.size
let get t =
let batch_size = Atomic.exchange t.size 0 in
let limit = min batch_size t.batch_limit in
let topup = batch_size - limit in
let _ = Atomic.fetch_and_add t.size topup in
Array.init batch_size (fun _ -> Picos_aux_mpscq.pop_exn t.q)
let size t = Atomic.get t.size