1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
let _sleep_fn : (float -> unit) option ref = ref None
type with_timeout_fn = { f : 'a. float -> (unit -> 'a) -> 'a option }
let _with_timeout_fn : with_timeout_fn option ref = ref None
let _client : Cohttp_eio.Client.t option ref = ref None
let env_error () =
failwith
"Cosmos_eio: call Databases.with_env before using Database operations"
let get_sleep () = match !_sleep_fn with Some f -> f | None -> env_error ()
let get_with_timeout () =
match !_with_timeout_fn with Some wt -> wt.f | None -> env_error ()
let get_client () = match !_client with Some c -> c | None -> env_error ()
let with_env ~sw:_ env f =
Mirage_crypto_rng_unix.use_default ();
let net = Eio.Stdenv.net env in
let clock = Eio.Stdenv.clock env in
let authenticator = Result.get_ok (Ca_certs.authenticator ()) in
let tls_config = Result.get_ok (Tls.Config.client ~authenticator ()) in
let https _uri socket =
(Tls_eio.client_of_flow tls_config socket :> _ Eio.Flow.two_way)
in
let client = Cohttp_eio.Client.make ~https:(Some https) net in
let old_sleep = !_sleep_fn in
let old_wt = !_with_timeout_fn in
let old_client = !_client in
_sleep_fn := Some (fun secs -> Eio.Time.sleep clock secs);
_with_timeout_fn :=
Some
{
f =
(fun t thunk ->
match Eio.Time.with_timeout clock t (fun () -> Ok (thunk ())) with
| Ok x -> Some x
| Error `Timeout -> None);
};
_client := Some client;
Fun.protect
~finally:(fun () ->
_sleep_fn := old_sleep;
_with_timeout_fn := old_wt;
_client := old_client)
f
module Eio_io : Cosmos.Databases_intf.IO with type 'a t = unit -> 'a = struct
type +'a t = unit -> 'a
let return x () = x
let bind x f () = f (x ()) ()
let catch f handler () = try (f ()) () with exn -> (handler exn) ()
let sleep secs () = (get_sleep ()) secs
let with_timeout t cmd () = (get_with_timeout ()) t (fun () -> cmd ())
let parallel_map f xs () =
Eio.Fiber.List.map ~max_fibers:10 (fun x -> (f x) ()) xs
end
module Eio_http :
Cosmos.Databases_intf.Http_client with type 'a io := 'a Eio_io.t = struct
type http_error = Connection_refused | Other_error of exn
let perform_request f () =
Eio.Switch.run @@ fun sw ->
try Ok (f sw) with
| Unix.Unix_error (Unix.ECONNREFUSED, _, _) -> Error Connection_refused
| exn -> Error (Other_error exn)
let get ~ uri =
perform_request (fun sw ->
let resp, body =
Cohttp_eio.Client.get (get_client ()) ~sw ~headers uri
in
let body_string = Eio.Flow.read_all body in
(resp, body_string))
let post ~ ~body uri =
perform_request (fun sw ->
let body = Cohttp_eio.Body.of_string body in
let resp, resp_body =
Cohttp_eio.Client.post (get_client ()) ~sw ~body ~headers uri
in
let body_string = Eio.Flow.read_all resp_body in
(resp, body_string))
let put ~ ~body uri =
perform_request (fun sw ->
let body = Cohttp_eio.Body.of_string body in
let resp, resp_body =
Cohttp_eio.Client.put (get_client ()) ~sw ~body ~headers uri
in
let body_string = Eio.Flow.read_all resp_body in
(resp, body_string))
let delete ~ uri =
perform_request (fun sw ->
let resp, body =
Cohttp_eio.Client.delete (get_client ()) ~sw ~headers uri
in
let body_string = Eio.Flow.read_all body in
(resp, body_string))
end
type 'a io = 'a Eio_io.t
module type Auth_key = Cosmos.Databases_intf.Auth_key
type cosmos_error = Cosmos.Databases_core.cosmos_error =
| Timeout_error
| Connection_error
| Azure_error of int * Response_headers.t
module Database (Auth : Auth_key) =
Cosmos.Databases_core.Make (Eio_io) (Eio_http) (Auth)