Source file emitter_batch.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
open Opentelemetry_emitter
(** Emit current batch, if the conditions are met *)
let maybe_emit_ (b : _ Batch.t) ~(e : _ Emitter.t) ~mtime : unit =
match Batch.pop_if_ready b ~force:false ~mtime with
| None -> ()
| Some l -> Emitter.emit e l
let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t =
let closed_here = Atomic.make false in
let signal_name = e.signal_name in
let enabled () = (not (Atomic.get closed_here)) && e.enabled () in
let closed () = Atomic.get closed_here || e.closed () in
let dropped_name = Printf.sprintf "otel.sdk.%s.batch.dropped" signal_name in
let self_metrics ~now () =
let m =
Opentelemetry_core.Metrics.(
sum ~name:dropped_name [ int ~now (Batch.n_dropped self) ])
in
m :: e.self_metrics ~now ()
in
let flush_and_close () =
if not (Atomic.exchange closed_here true) then (
(match
Batch.pop_if_ready self ~force:true ~mtime:Batch.Internal_.mtime_dummy_
with
| None -> ()
| Some l -> Emitter.emit e l);
Emitter.flush_and_close e
)
in
let tick ~mtime =
if not (Atomic.get closed_here) then (
maybe_emit_ self ~e ~mtime;
Emitter.tick e ~mtime
)
in
let emit l =
if l <> [] && not (Atomic.get closed_here) then (
let old_n_dropped = Batch.n_dropped self in
(match Batch.push self l with
| `Ok -> ()
| `Dropped ->
let n_dropped = Batch.n_dropped self in
if n_dropped / 100_000 <> old_n_dropped / 100_000 then
Self_debug.log Debug (fun () ->
Printf.sprintf "otel: batch %s dropped %d items in total"
signal_name n_dropped));
maybe_emit_ self ~e ~mtime:Batch.Internal_.mtime_dummy_
)
in
{
Emitter.closed;
signal_name;
self_metrics;
enabled;
flush_and_close;
tick;
emit;
}
let add_batching ~timeout ~batch_size (emitter : 'a Emitter.t) : 'a Emitter.t =
let b = Batch.make ~batch:batch_size ~timeout () in
wrap_emitter_with_batch b emitter
let add_batching_opt ~timeout ~batch_size:(b : int option) e =
match b with
| None -> e
| Some b -> add_batching ~timeout ~batch_size:b e