Source file mpsc_queue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
   This makes a good data structure for a scheduler's run queue.

   See: "Implementing lock-free queues"
   https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf

   It is simplified slightly because we don't need multiple consumers.
   Therefore [head] is not atomic. *)

exception Closed

module Node : sig
  type 'a t = { next : 'a opt Atomic.t; mutable value : 'a }
  and +'a opt

  val make : next:'a opt -> 'a -> 'a t

  val none : 'a opt
  (** [t.next = none] means that [t] is currently the last node. *)

  val closed : 'a opt
  (** [t.next = closed] means that [t] will always be the last node. *)

  val some : 'a t -> 'a opt
  val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b
end = struct
  (* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *)

  type +'a opt (* special | 'a t *)
  type 'a t = { next : 'a opt Atomic.t; mutable value : 'a }
  type special = Nothing | Closed

  let none : 'a. 'a opt = Obj.magic Nothing
  let closed : 'a. 'a opt = Obj.magic Closed
  let some (t : 'a t) : 'a opt = Obj.magic t

  let fold (opt : 'a opt) ~none:n ~some =
    if opt == none then n ()
    else if opt == closed then raise Closed
    else some (Obj.magic opt : 'a t)

  let make ~next value = { value; next = Atomic.make next }
end

type 'a t = { tail : 'a Node.t Atomic.t; mutable head : 'a Node.t }
(* [head] is the last node dequeued (or a dummy node, initially).
   [head.next] gives the real first node, if not [Node.none].
   If [tail.next] is [none] then it is the last node in the queue.
   Otherwise, [tail.next] is a node that is closer to the tail. *)

let push t x =
  let node = Node.(make ~next:none) x in
  let rec aux () =
    let p = Atomic.get t.tail in
    (* While [p.next == none], [p] is the last node in the queue. *)
    if Atomic.compare_and_set p.next Node.none (Node.some node) then
      (* [node] has now been added to the queue (and possibly even consumed).
         Update [tail], unless someone else already did it for us. *)
      ignore (Atomic.compare_and_set t.tail p node : bool)
    else
      (* Someone else added a different node first ([p.next] is not [none]).
         Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *)
      Node.fold (Atomic.get p.next)
        ~none:(fun () -> assert false)
        ~some:(fun p_next ->
          ignore (Atomic.compare_and_set t.tail p p_next : bool);
          aux ())
  in
  aux ()

let rec push_head t x =
  let p = t.head in
  let next = Atomic.get p.next in
  if next == Node.closed then raise Closed;
  let node = Node.make ~next x in
  if Atomic.compare_and_set p.next next (Node.some node) then
    if
      (* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *)
      next == Node.none
    then ignore (Atomic.compare_and_set t.tail p node : bool)
    else
      ( (* If the queue wasn't empty, there's nothing to do.
           Either tail isn't at head or there is some [push] thread working to update it.
           Either [push] will update it directly to the new tail, or will update it to [node]
           and then retry. Either way, it ends up at the real tail. *) )
  else (
    (* Someone else changed it first. This can only happen if the queue was empty. *)
    assert (next == Node.none);
    push_head t x)

let rec close (t : 'a t) =
  (* Mark the tail node as final. *)
  let p = Atomic.get t.tail in
  if not (Atomic.compare_and_set p.next Node.none Node.closed) then
    (* CAS failed because [p] is no longer the tail (or is already closed). *)
    Node.fold (Atomic.get p.next)
      ~none:(fun () -> assert false)
        (* Can't switch from another state to [none] *)
      ~some:(fun p_next ->
        (* Make [tail] more up-to-date if it hasn't changed already *)
        ignore (Atomic.compare_and_set t.tail p p_next : bool);
        (* Retry *)
        close t)

let pop t =
  let p = t.head in
  (* [p] is the previously-popped item. *)
  let node = Atomic.get p.next in
  Node.fold node
    ~none:(fun () -> None)
    ~some:(fun node ->
      t.head <- node;
      let v = node.value in
      node.value <- Obj.magic ();
      (* So it can be GC'd *)
      Some v)

let is_empty t =
  Node.fold (Atomic.get t.head.next)
    ~none:(fun () -> true)
    ~some:(fun _ -> false)

let create () =
  let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in
  { tail = Atomic.make dummy; head = dummy }