Source file synchronizer.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
type ('get, 'write) t =
{ mutex : Mutex.t
; cond : Condition.t
; getter : unit -> 'get option
; writer : 'write -> unit
; pledges_count : int ref
; closed : bool ref
}
let init getter writer =
{ mutex = Mutex.create ()
; cond = Condition.create ()
; getter
; writer
; pledges_count = ref 0
; closed = ref false
}
let new_pledge synchro =
Mutex.protect synchro.mutex (fun () ->
incr synchro.pledges_count;
Condition.broadcast synchro.cond )
let end_pledge synchro =
Mutex.protect synchro.mutex (fun () ->
decr synchro.pledges_count;
Condition.broadcast synchro.cond )
let close synchro =
Mutex.protect synchro.mutex (fun () ->
synchro.closed := true;
Condition.broadcast synchro.cond )
let get ~pledge synchro =
let rec inner_loop pledge synchro =
if !(synchro.closed) then None
else
match synchro.getter () with
| None ->
if !(synchro.pledges_count) = 0 then begin
Condition.broadcast synchro.cond;
None
end
else begin
Condition.wait synchro.cond synchro.mutex;
inner_loop pledge synchro
end
| next_element ->
if pledge then incr synchro.pledges_count;
Condition.broadcast synchro.cond;
next_element
in
Mutex.protect synchro.mutex (fun () -> inner_loop pledge synchro)
let write synchro v =
Mutex.protect synchro.mutex (fun () ->
if not !(synchro.closed) then begin
synchro.writer v
end;
Condition.signal synchro.cond )
let rec work_while f q =
match get ~pledge:true q with
| None -> ()
| Some v ->
Fun.protect
~finally:(fun () ->
end_pledge q )
(fun () -> f v (write q));
work_while f q