1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
let error_msgf fmt = Format.kasprintf (fun msg -> Error (`Msg msg)) fmt
module type FLOW = Caqti_platform.System_sig.SOCKET_OPS with type 'a fiber = 'a
type ocaml = | and system = |
type 'a impl =
| OCaml : (module FLOW with type t = 'a) * 'a -> ocaml impl
| System : Buffer.t * Miou_unix.file_descr -> system impl
type socket = Socket : 'a impl -> socket [@@unboxed]
type Caqti_error.msg +=
| Msg_unix of Unix.error * string * string
let () =
let pp ppf = function
| Msg_unix (err, f, v) ->
Format.fprintf ppf "%s(%s): %s" f v (Unix.error_message err)
| _ -> assert false
in
Caqti_error.define_msg ~pp [%extension_constructor Msg_unix]
external reraise : exn -> 'a = "%reraise"
module System_core = struct
include Caqti_miou.System_core
type stdenv = unit
end
include System_core
module Alarm = struct
type t = Miou.Condition.t * Miou.Mutex.t
let schedule ~sw ~stdenv:_ t fn =
let t_now = Mtime_clock.now () in
let mutex = Miou.Mutex.create () and condition = Miou.Condition.create () in
let delay =
if Mtime.is_later t ~than:t_now then 0.0
else Mtime.Span.to_float_ns (Mtime.span t t_now) *. 1e-9
in
Logs.debug (fun m -> m "schedule an alarm");
let _ =
async ~sw @@ fun () ->
Logs.debug (fun m -> m "really schedule an alarm");
let sleeper = Miou.async @@ fun () ->
Logs.debug (fun m -> m "Sleep %fs" delay);
Miou_unix.sleep delay;
Logs.debug (fun m -> m "Ring the alarm");
`Continue in
let canceller =
Miou.async @@ fun () ->
Miou.Condition.wait condition mutex;
`Cancel
in
match Miou.await_first [ sleeper; canceller ] with
| Ok `Continue -> fn ()
| Ok `Cancel -> ()
| Error _exn -> ()
in
(condition, mutex)
let unschedule (condition, mutex) =
Miou.Mutex.protect mutex @@ fun () -> Miou.Condition.signal condition
end
module Stream = Caqti_miou.Stream
module Pool = Caqti_platform.Pool.Make (System_core) (Alarm)
module Net = struct
module Sockaddr = struct
type t = Unix.sockaddr
let unix v = Unix.ADDR_UNIX v
let tcp (addr, port) = Unix.ADDR_INET (Ipaddr_unix.to_inet_addr addr, port)
end
let getaddrinfo ~stdenv:() host port =
let opts = Unix.[ AI_SOCKTYPE SOCK_STREAM ] in
match
Unix.getaddrinfo (Domain_name.to_string host) (string_of_int port) opts
with
| lst -> Ok (List.map (fun ai -> ai.Unix.ai_addr) lst)
| exception Not_found -> Ok []
| exception Unix.Unix_error (err, f, v) ->
error_msgf "%s(%s): %s" f v (Unix.error_message err)
let convert_io_exception = function
| Unix.Unix_error (err, f, v) -> Some (Msg_unix (err, f, v))
| _ -> None
type tcp_flow = Miou_unix.file_descr
type tls_flow = ocaml impl
module Socket = struct
type t = socket
let output_char (Socket impl) chr = match impl with
| System (buf, _) -> Buffer.add_char buf chr
| OCaml ((module Flow), fd) -> Flow.output_char fd chr
let output_string (Socket impl) str = match impl with
| System (buf, _) -> Buffer.add_string buf str
| OCaml ((module Flow), fd) -> Flow.output_string fd str
let flush (Socket impl) = match impl with
| System (buf, fd) ->
let str = Buffer.contents buf in
Buffer.clear buf;
if String.length str > 0 then Miou_unix.write fd str
| OCaml ((module Flow), fd) -> Flow.flush fd
let input_char (Socket impl) = match impl with
| System (_, fd) ->
let buf = Bytes.make 1 '\000' in
let len = Miou_unix.read fd buf in
if len = 0 then raise End_of_file else Bytes.get buf 0
| OCaml ((module Flow), fd) ->
Flow.input_char fd
let really_input (Socket impl) buf off len =
match impl with
| System (_, fd) ->
let rec go off len =
if len > 0 then
let len' = Miou_unix.read fd buf ~off ~len in
go (off + len') (len - len')
in
go off len
| OCaml ((module Flow), fd) ->
Flow.really_input fd buf off len
let close = function
| Socket (System (_, fd)) -> Miou_unix.close fd
| Socket (OCaml ((module Flow), fd)) -> Flow.close fd
end
let socket = function
| Unix.ADDR_UNIX _ ->
let fd = Unix.socket ~cloexec:true Unix.PF_UNIX Unix.SOCK_STREAM 0 in
Ok (Miou_unix.of_file_descr ~non_blocking:true fd)
| Unix.ADDR_INET (inet_addr, _) when Unix.is_inet6_addr inet_addr ->
Ok (Miou_unix.tcpv6 ())
| _ -> Ok (Miou_unix.tcpv4 ())
let connect_tcp ~sw:_ ~stdenv:_ sockaddr =
let ( >>= ) = Result.bind in
socket sockaddr >>= fun socket ->
match Miou_unix.connect socket sockaddr with
| () -> Ok (Socket (System (Buffer.create 0x7ff, socket)))
| exception Unix.Unix_error (err, f, v) ->
Miou_unix.close socket;
Error (Msg_unix (err, f, v))
| exception exn -> Miou_unix.close socket; raise exn
let tcp_flow_of_socket (Socket impl) = match impl with
| System (_, fd) -> Some fd
| OCaml _ -> None
let socket_of_tls_flow : sw:_ -> tls_flow -> Socket.t =
fun ~sw:_ -> function
| OCaml _ as impl -> Socket impl
module type TLS_PROVIDER =
Caqti_platform.System_sig.TLS_PROVIDER
with type 'a fiber := 'a
and type tcp_flow := tcp_flow
and type tls_flow := tls_flow
let tls_providers_r : (module TLS_PROVIDER) list ref = ref []
let register_tls_provider p = tls_providers_r := p :: !tls_providers_r
let tls_providers config =
if Caqti_connect_config.mem_name "tls" config then begin
match Caqti_platform.Connector.load_library "caqti-tls-miou" with
| Ok () -> ()
| Error msg -> Log.warn (fun m -> m "TLS configured, but missing caqti-tls-miou: %s" msg)
end;
!tls_providers_r
end