Source file rpc_impl.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
open! Core
open! Async

module Monitor = struct
  let errors () =
    let seqnum = ref 0 in
    let error_stream =
      Bus.create_exn
        [%here]
        Arity1
        ~on_subscription_after_first_write:Allow
        ~on_callback_raise:Error.raise
    in
    (* Hearbeats *)
    Clock.every (sec 10.) (fun () -> Bus.write error_stream (!seqnum, None));
    (* Actual errors *)
    let send_errors =
      Log.Output.create
        ~flush:(fun () -> return ())
        (fun messages ->
          Queue.iter messages ~f:(fun message ->
            match Mail_log.Message.level message with
            | `Error ->
              let error = Log.Message.message message |> Error.of_string in
              incr seqnum;
              Bus.write error_stream (!seqnum, Some error)
            | _ -> ());
          Deferred.unit)
    in
    Log.Global.set_output (send_errors :: Log.Global.get_output ());
    Rpc.Pipe_rpc.implement Rpc_intf.Monitor.errors (fun () () ->
      [%log.global.debug_string "received error stream subscription"];
      let pipe = Async_bus.pipe1_exn error_stream [%here] in
      return (Ok pipe))
  ;;

  let rpcs () = [ errors () ]
end

module Spool = struct
  module Cache = struct
    let status =
      Rpc.Pipe_rpc.implement Rpc_intf.Spool.Cache.status (fun spool update_interval ->
        let cache = Spool.client_cache spool in
        let r, w = Pipe.create () in
        Clock.every' ~stop:(Pipe.closed w) update_interval (fun () ->
          Pipe.write w (Client_cache.status cache));
        return (Ok r))
    ;;

    let config =
      Rpc.Rpc.implement Rpc_intf.Spool.Cache.config (fun spool () ->
        return
          (Client_cache.config (Spool.client_cache spool)
           |> Resource_cache.Address_config.Stable.V2.of_v3
           |> Resource_cache.Address_config.Stable.V1.of_v2))
    ;;

    let rpcs = [ status; config ]
  end

  let status =
    Rpc.Rpc.implement Rpc_intf.Spool.status (fun spool () -> return (Spool.status spool))
  ;;

  let freeze =
    Rpc.Rpc.implement Rpc_intf.Spool.freeze (fun spool msgids ->
      Spool.freeze spool msgids)
  ;;

  let send =
    Rpc.Rpc.implement Rpc_intf.Spool.send (fun spool (retry_intervals, send_info) ->
      Spool.send ~retry_intervals spool send_info)
  ;;

  let remove =
    Rpc.Rpc.implement Rpc_intf.Spool.remove (fun spool msgids ->
      Spool.remove spool msgids)
  ;;

  let recover =
    Rpc.Rpc.implement Rpc_intf.Spool.recover (fun spool info -> Spool.recover spool info)
  ;;

  let events =
    Rpc.Pipe_rpc.implement Rpc_intf.Spool.events (fun spool () ->
      let pipe = Spool.event_stream spool in
      return (Ok pipe))
  ;;

  let rpcs = [ status; freeze; send; remove; recover; events ] @ Cache.rpcs
end

module Smtp_events = struct
  let events =
    Rpc.Pipe_rpc.implement Rpc_intf.Smtp_events.events (fun server_events () ->
      let pipe = Smtp_events.event_stream server_events in
      return (Ok pipe))
  ;;

  let rpcs = [ events ]
end

module Gc = struct
  let stat = Rpc.Rpc.implement Rpc_intf.Gc.stat (fun () () -> Gc.stat () |> return)

  let quick_stat =
    Rpc.Rpc.implement Rpc_intf.Gc.quick_stat (fun () () -> Gc.quick_stat () |> return)
  ;;

  let full_major =
    Rpc.Rpc.implement Rpc_intf.Gc.full_major (fun () () -> Gc.full_major () |> return)
  ;;

  let major = Rpc.Rpc.implement Rpc_intf.Gc.major (fun () () -> Gc.major () |> return)
  let minor = Rpc.Rpc.implement Rpc_intf.Gc.minor (fun () () -> Gc.minor () |> return)

  let compact =
    Rpc.Rpc.implement Rpc_intf.Gc.compact (fun () () -> Gc.compact () |> return)
  ;;

  let stat_pipe =
    Rpc.Pipe_rpc.implement Rpc_intf.Gc.stat_pipe (fun () () ->
      let r, w = Pipe.create () in
      Clock.every' ~stop:(Pipe.closed w) (Time_float.Span.of_sec 15.) (fun () ->
        Pipe.write w (Gc.quick_stat ()));
      return (Ok r))
  ;;

  let rpcs = [ stat; quick_stat; full_major; major; minor; compact; stat_pipe ]
end

module Process = struct
  let pid = Rpc.Rpc.implement Rpc_intf.Process.pid (fun () () -> Unix.getpid () |> return)
  let rpcs = [ pid ]
end