Source file database_pools.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
open CCFun.Infix
exception Exception of string
module type Sig = Database_pools_sig.Sig
let src = Logs.Src.create "guardian.pools"
let find_pool_name = CCList.assoc_opt ~eq:CCString.equal "pool"
module LogTag = struct
let add_label : string Logs.Tag.def =
Logs.Tag.def "database_label" ~doc:"Database Label" CCString.pp
;;
let create database = Logs.Tag.(empty |> add add_label database)
let ctx_opt ?ctx () =
let open CCOption.Infix in
ctx >>= find_pool_name >|= fun db -> Logs.Tag.(empty |> add add_label db)
;;
end
module type ConfigSig = sig
val database : string * string
val database_pool_size : int
val expected_databases : int
end
module DefaultConfig : ConfigSig = struct
let database = "main", "mariadb://root@database:3306/test"
let database_pool_size = 5
let expected_databases = 1
end
module Make (Config : ConfigSig) = struct
module Config = Config
type connection =
| Close
| Open of (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt_unix.Pool.t
| Fail of Caqti_error.t
module Pool = struct
type t =
{ database_label : string
; database_url : string
; required : bool
; connection : connection [@opaque]
; n_retries : int
}
let database_label { database_label; _ } = database_label
let connection { connection; _ } = connection
let create ?(required = false) database_label database_url =
{ database_label
; database_url
; required
; connection = Close
; n_retries = 0
}
;;
let reset_retry pool = { pool with n_retries = 0 }
let increment_retry pool = { pool with n_retries = pool.n_retries + 1 }
let connect_pool =
Uri.of_string
%> Caqti_lwt_unix.connect_pool
~pool_config:
(Caqti_pool_config.create ~max_size:Config.database_pool_size ())
;;
let connect
?(retries = 2)
({ database_label; database_url; required; _ } as pool)
=
let tags = database_label |> LogTag.create in
CCResult.retry retries (fun () -> database_url |> connect_pool)
|> (function
| Error [] -> raise (Exception "Failed to connect: empty error")
| Error (err :: _) when required -> raise (Caqti_error.Exn err)
| Error (err :: _ as errors) ->
Logs.warn ~src (fun m ->
m
~tags
"Failed to connect: %s (%s)"
database_label
([%show: Caqti_error.t list] errors));
Fail err
| Ok con -> Open con)
|> fun connection -> { pool with connection }
;;
end
module Cache = struct
module Hashtbl = CCHashtbl.Make (CCString)
let pools : Pool.t Hashtbl.t =
Hashtbl.create (max 1 Config.expected_databases)
;;
let add = Hashtbl.add pools
let remove = Hashtbl.remove pools
let find_opt = Hashtbl.find_opt pools
let replace pool = Hashtbl.replace pools (Pool.database_label pool) pool
end
let print_pool_usage ?tags =
Pool.connection
%> function
| Open pool ->
let n_connections = Caqti_lwt_unix.Pool.size pool in
Logs.debug ~src (fun m ->
m ?tags "Pool usage: %i/%i" n_connections Config.database_pool_size)
| Close | Fail _ ->
Logs.debug ~src (fun m -> m ?tags "Pool usage: No connection found")
;;
let drain_opt =
Pool.connection
%> function
| Open pool -> Caqti_lwt_unix.Pool.drain pool
| Close | Fail _ -> Lwt.return_unit
;;
let add_pool ?required database_label database_url =
match Cache.find_opt database_label with
| Some _ ->
let msg =
[%string "Failed to add pool: Pool already exists %{database_label}"]
in
Logs.err ~src (fun m ->
m ~tags:(database_label |> LogTag.create) "%s" msg);
failwith msg
| None ->
Pool.create ?required database_label database_url
|> Cache.add database_label
;;
let drop_pool name =
match Cache.find_opt name with
| None ->
let msg =
[%string "Failed to drop pool: connection to '%{name}' doesn't exist"]
in
Logs.info ~src (fun m -> m ~tags:(LogTag.create name) "%s" msg);
Lwt.return_unit
| Some pool ->
let%lwt () = drain_opt pool in
Cache.remove name |> Lwt.return
;;
let initialize ?(additional_pools : (string * string) list = []) () : unit =
Config.database :: additional_pools
|> CCList.filter (fst %> Cache.find_opt %> CCOption.is_none)
|> CCList.iter (CCFun.uncurry (Pool.create ~required:true) %> Cache.replace)
;;
let connect =
Cache.find_opt
%> function
| Some pool ->
let rec connect pool =
match pool.Pool.connection with
| Fail err -> Error (Caqti_error.show err)
| Close -> Pool.connect pool |> connect
| Open _ -> Ok ()
in
connect pool
| None -> Error "Database not found"
;;
let disconnect ?error =
Cache.find_opt
%> function
| Some pool ->
let%lwt () = drain_opt pool in
Cache.replace
{ pool with
Pool.connection =
CCOption.map_or ~default:Close (fun err -> Fail err) error
}
|> Lwt.return
| None -> Lwt.return_unit
;;
let raise_caqti_error label input =
let open Caqti_error in
match%lwt input with
| Ok resp -> Lwt.return resp
| Error `Unsupported -> raise (Exception "Caqti error")
| Error (#load_or_connect as err) ->
let%lwt () = disconnect ~error:err label in
raise (Exn err)
| Error (#t as err) -> raise (Exn err)
;;
let rec fetch_pool ?(ctx = []) ?(retries = 2) () =
match ctx |> find_pool_name |> CCFun.flip CCOption.bind Cache.find_opt with
| Some pool ->
(match Pool.connection pool with
| Fail err when pool.Pool.n_retries >= retries ->
raise_caqti_error
(Pool.database_label pool)
(Error err |> Lwt_result.lift)
| Fail _ ->
let () = Pool.connect pool |> Pool.increment_retry |> Cache.replace in
fetch_pool ~ctx ~retries ()
| Close ->
let () = Pool.connect pool |> Cache.replace in
fetch_pool ~ctx ~retries ()
| Open connection when pool.Pool.n_retries > 0 ->
let () = Pool.reset_retry pool |> Cache.replace in
print_pool_usage ?tags:(LogTag.ctx_opt ~ctx ()) pool;
Lwt.return connection
| Open connection ->
print_pool_usage ?tags:(LogTag.ctx_opt ~ctx ()) pool;
Lwt.return connection)
| None ->
Exception
(Format.asprintf
"Unknown Pool: Please 'add_pool' first! (%s)"
CCOption.(find_pool_name ctx |> value ~default:"-"))
|> raise
;;
let map_fetched ?ctx ?retries (fcn : 'a -> ('b, 'e) Lwt_result.t) =
let label =
CCOption.(bind ctx find_pool_name |> get_exn_or "Unknown pool")
in
let%lwt connection = fetch_pool ?ctx ?retries () in
fcn connection |> raise_caqti_error label
;;
let query ?ctx f =
Caqti_lwt_unix.Pool.use (fun connection -> f connection) |> map_fetched ?ctx
;;
let find_opt ?ctx request input =
query ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.find_opt request input)
;;
let find ?ctx request input =
query ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.find request input)
;;
let collect ?ctx request input =
query ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.collect_list request input)
;;
let exec ?ctx request input =
query ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.exec request input)
;;
let populate ?ctx table columns request input =
query ?ctx (fun connection ->
let module Connection = (val connection : Caqti_lwt.CONNECTION) in
Connection.populate
~table
~columns
request
(Caqti_lwt.Stream.of_list input)
|> Lwt.map Caqti_error.uncongested)
;;
let exec_each connection =
let open CCFun.Infix in
Lwt_list.map_s (fun request -> request connection)
%> Lwt.map CCResult.flatten_l
%> Lwt_result.map (fun (_ : unit list) -> ())
;;
let rollback ?ctx connection error =
let (module Connection : Caqti_lwt.CONNECTION) = connection in
let label =
CCOption.(bind ctx find_pool_name |> get_exn_or "Unknown pool")
in
let%lwt () =
Connection.rollback ()
|> Lwt_result.map
(CCFun.tap (fun _ ->
Logs.debug ~src (fun m ->
m "Successfully rolled back transaction")))
|> raise_caqti_error label
in
Lwt.fail error
;;
let transaction
?ctx
?(setup :
(Caqti_lwt.connection -> (unit, Caqti_error.t) Lwt_result.t) list =
[])
?(cleanup :
(Caqti_lwt.connection -> (unit, Caqti_error.t) Lwt_result.t) list =
[])
(f : Caqti_lwt.connection -> ('a, Caqti_error.t) Lwt_result.t)
: 'a Lwt.t
=
let open Lwt_result.Syntax in
Caqti_lwt_unix.Pool.use (fun connection ->
let (module Connection : Caqti_lwt.CONNECTION) = connection in
let* () = Connection.start () in
Lwt.catch
(fun () ->
let* () = exec_each connection setup in
let* result = f connection in
let* () = exec_each connection cleanup in
match%lwt Connection.commit () with
| Ok () -> Lwt.return_ok result
| Error error -> Lwt.return_error error)
(rollback ?ctx connection))
|> map_fetched ?ctx
;;
let transaction_iter ?ctx queries =
let open Lwt_result.Syntax in
Caqti_lwt_unix.Pool.use (fun connection ->
let (module Connection : Caqti_lwt.CONNECTION) = connection in
let* () = Connection.start () in
Lwt.catch
(fun () ->
let* () = exec_each connection queries in
Connection.commit ())
(rollback ?ctx connection))
|> map_fetched ?ctx
;;
end