Source file sequencer_table.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
open! Core
open! Async_kernel
open! Import
let debug_on_find_state = ref ignore
module Make (Key : sig
type t [@@deriving sexp_of, hash, compare]
end) =
struct
module Tag = struct
type 'job_tag t =
| User_job of 'job_tag option
| Prior_jobs_done
[@@deriving sexp]
end
module Job = struct
type ('state, 'job_tag) t =
{ tag : 'job_tag Tag.t
; run : 'state option -> unit Deferred.t
}
let sexp_of_t _ sexp_of_job_tag t = t.tag |> [%sexp_of: job_tag Tag.t]
end
type ('state, 'job_tag) t =
{ states : (Key.t, 'state) Hashtbl.t
; jobs : (Key.t, ('state, 'job_tag) Job.t Queue.t) Hashtbl.t
}
[@@deriving sexp_of]
let create () =
{ states = Hashtbl.create (module Key); jobs = Hashtbl.create (module Key) }
;;
let rec run_jobs_until_none_remain t ~key (queue : (_, _) Job.t Queue.t) =
match Queue.peek queue with
| None -> Hashtbl.remove t.jobs key
| Some job ->
let state = Hashtbl.find t.states key in
!debug_on_find_state ();
job.run state
>>> fun () ->
assert (phys_equal (Queue.dequeue_exn queue) job);
run_jobs_until_none_remain t ~key queue
;;
let set_state t ~key = function
| None -> Hashtbl.remove t.states key
| Some state -> Hashtbl.set t.states ~key ~data:state
;;
let enqueue t ~key ?tag f =
Deferred.create (fun ivar ->
let run state_opt =
Monitor.try_with ~rest:`Log ~run:`Now (fun () -> f state_opt)
>>| Ivar.fill_exn ivar
in
let job = { Job.tag = Tag.User_job tag; run } in
match Hashtbl.find t.jobs key with
| Some queue -> Queue.enqueue queue job
| None ->
let queue = Queue.create () in
Queue.enqueue queue job;
Hashtbl.set t.jobs ~key ~data:queue;
upon Deferred.unit (fun () -> run_jobs_until_none_remain t ~key queue))
>>| function
| Error exn -> raise (Monitor.extract_exn exn)
| Ok res -> res
;;
let find_state t key = Hashtbl.find t.states key
let num_unfinished_jobs t key =
match Hashtbl.find t.jobs key with
| None -> 0
| Some queue -> Queue.length queue
;;
let mem t key = Hashtbl.mem t.states key || Hashtbl.mem t.jobs key
let all_keys t =
let all_keys =
Hash_set.create (module Key) ~size:(Hashtbl.length t.jobs + Hashtbl.length t.states)
in
Hashtbl.iteri t.jobs ~f:(fun ~key ~data:_ -> Hash_set.add all_keys key);
Hashtbl.iteri t.states ~f:(fun ~key ~data:_ -> Hash_set.add all_keys key);
all_keys
;;
let fold t ~init ~f =
Hash_set.fold (all_keys t) ~init ~f:(fun acc key ->
f acc ~key (Hashtbl.find t.states key))
;;
let exists t ~f =
Hash_set.exists (all_keys t) ~f:(fun key -> f key (Hashtbl.find t.states key))
;;
let prior_jobs_done t =
Hashtbl.fold t.jobs ~init:[] ~f:(fun ~key:_ ~data:queue acc ->
let this_key_done =
Deferred.create (fun ivar ->
Queue.enqueue
queue
{ tag = Tag.Prior_jobs_done
; run =
(fun _ ->
Ivar.fill_exn ivar ();
Deferred.unit)
})
in
this_key_done :: acc)
|> Deferred.all_unit
;;
end
module For_testing = struct
let debug_on_find_state = debug_on_find_state
end