Source file emitter_batch.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
open Opentelemetry_emitter

(** Emit current batch, if the conditions are met *)
let maybe_emit_ (b : _ Batch.t) ~(e : _ Emitter.t) ~mtime : unit =
  match Batch.pop_if_ready b ~force:false ~mtime with
  | None -> ()
  | Some l -> Emitter.emit e l

let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t =
  (* we need to be able to close this emitter before we close [e]. This
     will become [true] when we close, then we call [Emitter.flush_and_close e],
     then [e] itself will be closed. *)
  let closed_here = Atomic.make false in

  let signal_name = e.signal_name in
  let enabled () = (not (Atomic.get closed_here)) && e.enabled () in
  let closed () = Atomic.get closed_here || e.closed () in

  let dropped_name = Printf.sprintf "otel.sdk.%s.batch.dropped" signal_name in
  let self_metrics ~now () =
    let m =
      Opentelemetry_core.Metrics.(
        sum ~name:dropped_name [ int ~now (Batch.n_dropped self) ])
    in
    m :: e.self_metrics ~now ()
  in
  let flush_and_close () =
    if not (Atomic.exchange closed_here true) then (
      (* NOTE: we need to close this wrapping emitter first, to prevent
         further pushes; then write the content to [e]; then
         flusn and close [e]. In this order. *)
      (match
         Batch.pop_if_ready self ~force:true ~mtime:Batch.Internal_.mtime_dummy_
       with
      | None -> ()
      | Some l -> Emitter.emit e l);

      (* now we can close [e], nothing remains in [self] *)
      Emitter.flush_and_close e
    )
  in

  let tick ~mtime =
    if not (Atomic.get closed_here) then (
      (* first, check if batch has timed out *)
      maybe_emit_ self ~e ~mtime;

      (* only then, tick the underlying emitter *)
      Emitter.tick e ~mtime
    )
  in

  let emit l =
    if l <> [] && not (Atomic.get closed_here) then (
      let old_n_dropped = Batch.n_dropped self in
      (match Batch.push self l with
      | `Ok -> ()
      | `Dropped ->
        let n_dropped = Batch.n_dropped self in
        if n_dropped / 100_000 <> old_n_dropped / 100_000 then
          Self_debug.log Debug (fun () ->
              Printf.sprintf "otel: batch %s dropped %d items in total"
                signal_name n_dropped));
      maybe_emit_ self ~e ~mtime:Batch.Internal_.mtime_dummy_
    )
  in

  {
    Emitter.closed;
    signal_name;
    self_metrics;
    enabled;
    flush_and_close;
    tick;
    emit;
  }

let add_batching ~timeout ~batch_size (emitter : 'a Emitter.t) : 'a Emitter.t =
  let b = Batch.make ~batch:batch_size ~timeout () in
  wrap_emitter_with_batch b emitter

let add_batching_opt ~timeout ~batch_size:(b : int option) e =
  match b with
  | None -> e
  | Some b -> add_batching ~timeout ~batch_size:b e