1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
open! Core
open! Async
module Monitor = struct
let errors () =
let seqnum = ref 0 in
let error_stream =
Bus.create_exn
[%here]
Arity1
~on_subscription_after_first_write:Allow
~on_callback_raise:Error.raise
in
Clock.every (sec 10.) (fun () -> Bus.write error_stream (!seqnum, None));
let send_errors =
Log.Output.create
~flush:(fun () -> return ())
(fun messages ->
Queue.iter messages ~f:(fun message ->
match Mail_log.Message.level message with
| `Error ->
let error = Log.Message.message message |> Error.of_string in
incr seqnum;
Bus.write error_stream (!seqnum, Some error)
| _ -> ());
Deferred.unit)
in
Log.Global.set_output (send_errors :: Log.Global.get_output ());
Rpc.Pipe_rpc.implement Rpc_intf.Monitor.errors (fun () () ->
[%log.global.debug_string "received error stream subscription"];
let pipe = Async_bus.pipe1_exn error_stream [%here] in
return (Ok pipe))
;;
let rpcs () = [ errors () ]
end
module Spool = struct
module Cache = struct
let status =
Rpc.Pipe_rpc.implement Rpc_intf.Spool.Cache.status (fun spool update_interval ->
let cache = Spool.client_cache spool in
let r, w = Pipe.create () in
Clock.every' ~stop:(Pipe.closed w) update_interval (fun () ->
Pipe.write w (Client_cache.status cache));
return (Ok r))
;;
let config =
Rpc.Rpc.implement Rpc_intf.Spool.Cache.config (fun spool () ->
return
(Client_cache.config (Spool.client_cache spool)
|> Resource_cache.Address_config.Stable.V2.of_v3
|> Resource_cache.Address_config.Stable.V1.of_v2))
;;
let rpcs = [ status; config ]
end
let status =
Rpc.Rpc.implement Rpc_intf.Spool.status (fun spool () -> return (Spool.status spool))
;;
let freeze =
Rpc.Rpc.implement Rpc_intf.Spool.freeze (fun spool msgids ->
Spool.freeze spool msgids)
;;
let send =
Rpc.Rpc.implement Rpc_intf.Spool.send (fun spool (retry_intervals, send_info) ->
Spool.send ~retry_intervals spool send_info)
;;
let remove =
Rpc.Rpc.implement Rpc_intf.Spool.remove (fun spool msgids ->
Spool.remove spool msgids)
;;
let recover =
Rpc.Rpc.implement Rpc_intf.Spool.recover (fun spool info -> Spool.recover spool info)
;;
let events =
Rpc.Pipe_rpc.implement Rpc_intf.Spool.events (fun spool () ->
let pipe = Spool.event_stream spool in
return (Ok pipe))
;;
let rpcs = [ status; freeze; send; remove; recover; events ] @ Cache.rpcs
end
module Smtp_events = struct
let events =
Rpc.Pipe_rpc.implement Rpc_intf.Smtp_events.events (fun server_events () ->
let pipe = Smtp_events.event_stream server_events in
return (Ok pipe))
;;
let rpcs = [ events ]
end
module Gc = struct
let stat = Rpc.Rpc.implement Rpc_intf.Gc.stat (fun () () -> Gc.stat () |> return)
let quick_stat =
Rpc.Rpc.implement Rpc_intf.Gc.quick_stat (fun () () -> Gc.quick_stat () |> return)
;;
let full_major =
Rpc.Rpc.implement Rpc_intf.Gc.full_major (fun () () -> Gc.full_major () |> return)
;;
let major = Rpc.Rpc.implement Rpc_intf.Gc.major (fun () () -> Gc.major () |> return)
let minor = Rpc.Rpc.implement Rpc_intf.Gc.minor (fun () () -> Gc.minor () |> return)
let compact =
Rpc.Rpc.implement Rpc_intf.Gc.compact (fun () () -> Gc.compact () |> return)
;;
let stat_pipe =
Rpc.Pipe_rpc.implement Rpc_intf.Gc.stat_pipe (fun () () ->
let r, w = Pipe.create () in
Clock.every' ~stop:(Pipe.closed w) (Time_float.Span.of_sec 15.) (fun () ->
Pipe.write w (Gc.quick_stat ()));
return (Ok r))
;;
let rpcs = [ stat; quick_stat; full_major; major; minor; compact; stat_pipe ]
end
module Process = struct
let pid = Rpc.Rpc.implement Rpc_intf.Process.pid (fun () () -> Unix.getpid () |> return)
let rpcs = [ pid ]
end