1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
(** Eio based bindings for eio *)
exception Closed
type 'a t = {
socket : 'a Zmq.Socket.t;
fd : Unix.file_descr;
senders : (unit -> unit) Queue.t;
receivers : (unit -> unit) Queue.t;
condition : Eio.Condition.t;
mutex : Eio.Mutex.t;
ready_condition: Eio.Condition.t;
mutable thread : unit Eio.Promise.or_exn option;
}
type 'a of_socket_args = sw:Eio.Switch.t -> 'a
type 'a deferred = 'a
(** invoke the first function on the queue, but only pop it if it does not raise EAGAIN *)
let process queue =
match (Queue.peek queue) () with
| () ->
let (_: unit -> unit) = Queue.pop queue in
()
| exception Unix.Unix_error (Unix.EAGAIN, _, _) ->
()
let with_lock lock f =
Eio.Mutex.lock lock;
try
let v = f () in
Eio.Mutex.unlock lock;
v
with
| e ->
Eio.Mutex.unlock lock;
raise e
let rec fd_monitor t =
Eio.Condition.await_no_mutex t.ready_condition;
Eio_unix.await_readable t.fd;
with_lock t.mutex (fun () -> Eio.Condition.broadcast t.condition);
fd_monitor t
let rec event_loop t =
let inner () =
match Zmq.Socket.events t.socket with
| Zmq.Socket.Poll_error ->
failwith "Cannot poll socket"
| (Poll_in_out | Poll_in) when not (Queue.is_empty t.receivers) ->
process t.receivers
| (Poll_in_out | Poll_out) when not (Queue.is_empty t.senders) ->
process t.senders
| _ ->
Eio.Condition.broadcast t.ready_condition;
Eio.Condition.await t.condition t.mutex;
in
with_lock t.mutex (fun () -> inner ());
match t.thread with
| None when Queue.is_empty t.senders && Queue.is_empty t.receivers ->
()
| _ ->
event_loop t
let of_socket: ('a Zmq.Socket.t -> 'a t) of_socket_args = fun ~sw socket ->
let fd = Zmq.Socket.get_fd socket in
let t =
{ socket;
fd;
senders = Queue.create ();
receivers = Queue.create ();
mutex = Eio.Mutex.create ();
condition = Eio.Condition.create ();
ready_condition = Eio.Condition.create ();
thread = None;
}
in
let thread = Eio.Fiber.fork_promise ~sw (fun () ->
Eio.Switch.run (fun sw ->
Eio.Fiber.fork ~sw (fun () -> event_loop t);
Eio.Fiber.fork_daemon ~sw (fun () -> fd_monitor t);
()
);
)
in
t.thread <- Some thread;
t
let to_socket t =
t.socket
(** Stop the deamon thread, and ensure that all sends and receives has been handled *)
let close t =
let thread = match t.thread with
| None -> failwith "Socket already closed"
| Some t -> t
in
with_lock t.mutex (fun () -> t.thread <- None; Eio.Condition.broadcast t.condition);
let _e = Eio.Promise.await_exn thread in
Zmq.Socket.close t.socket;
()
let request t queue f =
let () =
match t.thread with
| None -> raise Closed
| Some _ -> ()
in
let (pt, pu) = Eio.Promise.create ~label:"Zmq" () in
let f () =
let v = f () in
Eio.Promise.resolve pu v
in
with_lock t.mutex (fun () -> Queue.push f queue; Eio.Condition.broadcast t.condition);
Eio.Promise.await pt
let send t message =
request t t.senders (fun () -> Zmq.Socket.send ~block:false t.socket message)
let send_msg t message =
request t t.senders (fun () -> Zmq.Socket.send_msg ~block:false t.socket message)
let send_all t =
request t t.receivers (fun () -> Zmq.Socket.send_all ~block:false t.socket)
let send_msg_all t =
request t t.receivers (fun () -> Zmq.Socket.send_msg_all ~block:false t.socket)
let recv t =
request t t.receivers (fun () -> Zmq.Socket.recv ~block:false t.socket)
let recv_msg t =
request t t.receivers (fun () -> Zmq.Socket.recv_msg ~block:false t.socket)
let recv_all t =
request t t.receivers (fun () -> Zmq.Socket.recv_all ~block:false t.socket)
let recv_msg_all t =
request t t.receivers (fun () -> Zmq.Socket.recv_msg_all ~block:false t.socket)
module Router = struct
type id_t = string
let id_of_string t = t
let recv t =
match recv_all t with
| id :: message -> (id, message)
| _ -> assert false
let send t id message =
send_all t (id :: message)
end
module Monitor = struct
let recv t = request t t.receivers (fun () -> Zmq.Monitor.recv ~block:false t.socket)
end