1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
open Async
module T = struct
module Reader = struct
type 'a t = 'a Pipe.Reader.t
let create (reader : 'a Pipe.Reader.t) : 'a t = reader
end
let read ?consumer t =
Pipe.read ?consumer t >>| function
| `Ok x -> x
| `Eof -> assert false
let read_now ?consumer t =
Pipe.read_now ?consumer t |> function
| `Ok _ as ok -> ok
| `Nothing_available as na -> na
| `Eof -> assert false
let read_exactly ?consumer t ~num_values =
Pipe.read_exactly ?consumer t ~num_values >>| function
| `Exactly e -> e
| `Fewer _
| `Eof ->
assert false
let to_pipe t : 'a Pipe.Reader.t = t
let interleave l = Pipe.interleave l |> Reader.create
let unfold ~init ~f : 'a Reader.t =
Pipe.unfold ~init ~f:(fun (acc : 's) ->
f acc >>| fun (acc, s) -> Some (acc, s) )
let map = Pipe.map
let filter_map = Pipe.filter_map
end
module type S = sig
module Reader : sig
type 'a t = private 'a Pipe.Reader.t
val create : 'a Pipe.Reader.t -> 'a t
end
val read : ?consumer:Pipe.Consumer.t -> 'a Reader.t -> 'a Deferred.t
val read_now :
?consumer:Pipe.Consumer.t ->
'a Reader.t ->
[> `Nothing_available | `Ok of 'a ]
val read_exactly :
?consumer:Pipe.Consumer.t ->
'a Reader.t ->
num_values:int ->
'a Base.Queue.t Deferred.t
val unfold : init:'s -> f:('s -> ('a * 's) Deferred.t) -> 'a Reader.t
val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.t
val filter_map :
?max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option) -> 'b Reader.t
val interleave : 'a Reader.t list -> 'a Reader.t
val to_pipe : 'a Reader.t -> 'a Pipe.Reader.t
end
include (T : S)