Source file durable_pipe_rpc.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
open Core
open Async_kernel
open Async_rpc_kernel

module Update = struct
  type ('response, 'error) t =
    | Attempting_new_connection
    | Connection_success of Rpc.Pipe_rpc.Metadata.t
    | Lost_connection
    | Failed_to_connect of Error.t
    | Rpc_error of 'error
    | Update of 'response
end

let filter_map_update update =
  let module L = Durable_state_rpc.Update in
  let module R = Update in
  match update with
  | L.Attempting_new_connection -> Some R.Attempting_new_connection
  | L.Connection_success metadata -> Some (R.Connection_success metadata)
  | L.Lost_connection -> Some R.Lost_connection
  | L.Failed_to_connect e -> Some (R.Failed_to_connect e)
  | L.Rpc_error e -> Some (R.Rpc_error e)
  | L.Update r -> Some (R.Update r)
  | L.State () -> None
;;

let create' ?time_source connection ~dispatch ~resubscribe_delay =
  let dispatch conn = dispatch conn >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id) in
  Durable_state_rpc.create' ?time_source connection ~dispatch ~resubscribe_delay
  |> Pipe.filter_map ~f:filter_map_update
;;

let create ?time_source connection rpc ~query ~resubscribe_delay =
  let dispatch conn = Rpc.Pipe_rpc.dispatch rpc conn query in
  create' ?time_source connection ~dispatch ~resubscribe_delay
;;

let create_versioned
  (type query response error)
  ?time_source
  connection
  rpc_module
  ~(query : query)
  ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc =
      (val rpc_module
          : Versioned_rpc.Both_convert.Pipe_rpc.S
          with type caller_query = query
           and type caller_response = response
           and type caller_error = error)
    in
    Pipe_rpc.dispatch_multi conn query
  in
  create' ?time_source connection ~dispatch ~resubscribe_delay
;;

let create_versioned'
  (type query response error)
  ?time_source
  connection
  rpc_module
  ~(query : query)
  ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc =
      (val rpc_module
          : Versioned_rpc.Caller_converts.Pipe_rpc.S
          with type query = query
           and type response = response
           and type error = error)
    in
    Pipe_rpc.dispatch_multi conn query
  in
  create' ?time_source connection ~dispatch ~resubscribe_delay
;;

let create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay =
  let dispatch conn = dispatch conn >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id) in
  Durable_state_rpc.create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
  >>|? Result.map ~f:(Pipe.filter_map ~f:filter_map_update)
;;

let create_or_fail ?time_source connection rpc ~query ~resubscribe_delay =
  let dispatch conn = Rpc.Pipe_rpc.dispatch rpc conn query in
  create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
;;

let create_or_fail_versioned
  (type query response error)
  ?time_source
  connection
  rpc_module
  ~(query : query)
  ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc =
      (val rpc_module
          : Versioned_rpc.Both_convert.Pipe_rpc.S
          with type caller_query = query
           and type caller_response = response
           and type caller_error = error)
    in
    Pipe_rpc.dispatch_multi conn query
  in
  create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
;;

let create_or_fail_versioned'
  (type query response error)
  ?time_source
  connection
  rpc_module
  ~(query : query)
  ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc =
      (val rpc_module
          : Versioned_rpc.Caller_converts.Pipe_rpc.S
          with type query = query
           and type response = response
           and type error = error)
    in
    Pipe_rpc.dispatch_multi conn query
  in
  create_or_fail' ?time_source connection ~dispatch ~resubscribe_delay
;;