Source file inf_pipe.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
open Async

module T = struct
  module Reader = struct
    type 'a t = 'a Pipe.Reader.t

    let create (reader : 'a Pipe.Reader.t) : 'a t = reader
  end

  let read ?consumer t =
    Pipe.read ?consumer t >>| function
    | `Ok x -> x
    | `Eof -> assert false

  let read_now ?consumer t =
    Pipe.read_now ?consumer t |> function
    | `Ok _ as ok -> ok
    | `Nothing_available as na -> na
    | `Eof -> assert false

  let read_exactly ?consumer t ~num_values =
    Pipe.read_exactly ?consumer t ~num_values >>| function
    | `Exactly e -> e
    | `Fewer _
    | `Eof ->
      assert false

  let to_pipe t : 'a Pipe.Reader.t = t

  let interleave l = Pipe.interleave l |> Reader.create

  let unfold ~init ~f : 'a Reader.t =
    Pipe.unfold ~init ~f:(fun (acc : 's) ->
        f acc >>| fun (acc, s) -> Some (acc, s) )

  let map = Pipe.map

  let filter_map = Pipe.filter_map
end

module type S = sig
  module Reader : sig
    type 'a t = private 'a Pipe.Reader.t

    val create : 'a Pipe.Reader.t -> 'a t
  end

  val read : ?consumer:Pipe.Consumer.t -> 'a Reader.t -> 'a Deferred.t

  val read_now :
    ?consumer:Pipe.Consumer.t ->
    'a Reader.t ->
    [> `Nothing_available | `Ok of 'a ]

  val read_exactly :
    ?consumer:Pipe.Consumer.t ->
    'a Reader.t ->
    num_values:int ->
    'a Base.Queue.t Deferred.t

  val unfold : init:'s -> f:('s -> ('a * 's) Deferred.t) -> 'a Reader.t

  val map : 'a Reader.t -> f:('a -> 'b) -> 'b Reader.t

  val filter_map :
    ?max_queue_length:int -> 'a Reader.t -> f:('a -> 'b option) -> 'b Reader.t

  val interleave : 'a Reader.t list -> 'a Reader.t

  val to_pipe : 'a Reader.t -> 'a Pipe.Reader.t
end

include (T : S)