Source file durable_pipe_rpc.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
open Core
open Async_kernel
open Async_rpc_kernel
module Update = struct
type ('response, 'error) t =
| Attempting_new_connection
| Connection_success of Rpc.Pipe_rpc.Metadata.t
| Lost_connection
| Failed_to_connect of Error.t
| Rpc_error of 'error
| Update of 'response
end
let filter_map_update update =
let module L = Durable_state_rpc.Update in
let module R = Update in
match update with
| L.Attempting_new_connection -> Some R.Attempting_new_connection
| L.Connection_success metadata -> Some (R.Connection_success metadata)
| L.Lost_connection -> Some R.Lost_connection
| L.Failed_to_connect e -> Some (R.Failed_to_connect e)
| L.Rpc_error e -> Some (R.Rpc_error e)
| L.Update r -> Some (R.Update r)
| L.State () -> None
;;
let create' ?time_source connection ~dispatch ~resubscribe_delay =
let dispatch conn = dispatch conn >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id) in
Durable_state_rpc.create' ?time_source connection ~dispatch ~resubscribe_delay
|> Pipe.filter_map ~f:filter_map_update
;;
let create ?time_source connection rpc ~query ~resubscribe_delay =
let dispatch conn = Rpc.Pipe_rpc.dispatch rpc conn query in
create' ?time_source connection ~dispatch ~resubscribe_delay
;;
let create_versioned
(type query response error)
?time_source
connection
rpc_module
~(query : query)
~resubscribe_delay
=
let dispatch conn =
let module Pipe_rpc =
(val rpc_module
: Versioned_rpc.Both_convert.Pipe_rpc.S
with type caller_query = query
and type caller_response = response
and type caller_error = error)
in
Pipe_rpc.dispatch_multi conn query
in
create' ?time_source connection ~dispatch ~resubscribe_delay
;;
let create_versioned'
(type query response error)
?time_source
connection
rpc_module
~(query : query)
~resubscribe_delay
=
let dispatch conn =
let module Pipe_rpc =
(val rpc_module
: Versioned_rpc.Caller_converts.Pipe_rpc.S
with type query = query
and type response = response
and type error = error)
in
Pipe_rpc.dispatch_multi conn query
in
create' ?time_source connection ~dispatch ~resubscribe_delay
;;
let create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay =
let dispatch conn = dispatch conn >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id) in
Durable_state_rpc.create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
>>|? Result.map ~f:(Pipe.filter_map ~f:filter_map_update)
;;
let create_or_fail ?time_source connection rpc ~query ~resubscribe_delay =
let dispatch conn = Rpc.Pipe_rpc.dispatch rpc conn query in
create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
;;
let create_or_fail_versioned
(type query response error)
?time_source
connection
rpc_module
~(query : query)
~resubscribe_delay
=
let dispatch conn =
let module Pipe_rpc =
(val rpc_module
: Versioned_rpc.Both_convert.Pipe_rpc.S
with type caller_query = query
and type caller_response = response
and type caller_error = error)
in
Pipe_rpc.dispatch_multi conn query
in
create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
;;
let create_or_fail_versioned'
(type query response error)
?time_source
connection
rpc_module
~(query : query)
~resubscribe_delay
=
let dispatch conn =
let module Pipe_rpc =
(val rpc_module
: Versioned_rpc.Caller_converts.Pipe_rpc.S
with type query = query
and type response = response
and type error = error)
in
Pipe_rpc.dispatch_multi conn query
in
create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
;;