1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
module Fmt = CCFormat
type cancel = Lwt_switch.t
type 'a m = 'a Lwt.t
type 'a t = {
descr: string option;
cancel: cancel option;
mutable n_child: int;
cond: unit Lwt_condition.t;
parent: parent;
run: 'a t -> 'a m;
}
and parent =
| NoParent
| Parent : 'a t -> parent
let cancel self =
let cancel_ s = Lwt.async (fun () -> Lwt_switch.turn_off s) in
CCOpt.iter cancel_ self.cancel
let is_cancelled self = CCOpt.exists Lwt_switch.is_on self.cancel
let return x : _ m = Lwt.return x
let descr self = self.descr
let pause (self:_ t) =
Lwt_switch.check self.cancel;
Lwt.pause()
module Infix = struct
let (>|=) = Lwt.(>|=)
let (>>= ) = Lwt.(>>=)
let (let+) = (>|=)
let (let* ) = (>>=)
let (and+ ) = Lwt.both
end
include Infix
let wait_all l =
let+ l = Lwt.all l in
match CCList.find_map (function Error e -> Some e | Ok () -> None) l with
| None -> Ok ()
| Some e -> Error e
let unwrap = function
| Ok x -> return x
| Error e -> Lwt.fail e
(** An exception caught and re-launched from a task *)
module Wrapped_error = struct
type 'a task = 'a t
type t = E : {
task: 'a task;
e: exn;
} -> t
exception Wrapped of t
let rec pp out (E {task;e}) =
let descr = CCOpt.get_or ~default:"<no descr>" task.descr in
let pp_e out e = match e with
| Wrapped e -> pp out e
| e -> Fmt.string out (Printexc.to_string e)
in
Fmt.fprintf out "@[<v>error in task '%s':@ %a@]" descr pp_e e
let to_string = Fmt.to_string pp
end
let run_ (self:'a t) : ('a, exn) result m =
let rec wait_children() : unit m =
if self.n_child = 0 then Lwt.return ()
else (
let* () = Lwt_condition.wait self.cond in
wait_children()
)
in
let res () =
Lwt.catch
(fun () ->
let* x = self.run self in
let+ () = wait_children() in
Ok x)
(fun e ->
return @@ Error (Wrapped_error.Wrapped (Wrapped_error.E {task=self; e})))
in
match self.parent with
| NoParent -> res()
| Parent p ->
p.n_child <- 1 + p.n_child;
let+ r = res() in
p.n_child <- p.n_child - 1;
Lwt_condition.signal p.cond ();
r
let run self = Lwt_main.run (run_ self)
let start ?descr ?cancel run : _ t =
{descr; cancel; run; parent=NoParent; cond=Lwt_condition.create (); n_child=0; }
let run_sub ~parent ?descr ?cancel run : _ m =
let t = {
descr; cancel; run; parent=Parent parent;
cond=Lwt_condition.create (); n_child=0;
} in
run_ t
let () =
Printexc.register_printer
(function
| Wrapped_error.Wrapped e ->
let s = Wrapped_error.to_string e in
Some s
| _ -> None)