Source file domainpc.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
(* SPDX-License-Identifier: AGPL-3.0-or-later *)

module IntMap = Map.Make (Int)

let cores_condition = Condition.create ()
let cores_mutex = Mutex.create ()
let crash = Atomic.make true
let wait_on_unavailable () = Atomic.set crash false

(** List of sets of cpus, where each distinct set of CPUs is linked to a unique
    core. *)
let cpus_per_core = Queue.create ()

let get_available_cores () = Queue.length cpus_per_core

(** Whether or not the queue of cpus_per_core was initialized. *)
let initialized = ref false

(** The mode in which the library is used. It is dermined from the first call to
    `spawn` or `spawn_n` *)
type mode = Unintialized | Isolated | NotIsolated

let check_mode =
  let mode = ref Unintialized in
  fun isolated ->
    match !mode with
    | Unintialized when isolated -> mode := Isolated
    | Unintialized -> mode := NotIsolated
    | Isolated when not isolated ->
        failwith
          "trying to run an unisolated domain when you previously ran an \
           isolated domain"
    | NotIsolated when isolated ->
        failwith
          "trying to run an isolated domain when you previously ran an \
           unisolated domain"
    | Isolated | NotIsolated -> ()

(** initalizes the queue of cores represented by their lists of cpus. *)
let init_cpus () =
  initialized := true;
  let m =
    List.fold_left
      (fun acc Processor.Cpu.{ id; core; _ } ->
        IntMap.update core
          (function None -> Some [ id ] | Some l -> Some (id :: l))
          acc)
      IntMap.empty Processor.Topology.t
  in
  match IntMap.to_list m with
  | [] -> failwith "unexpected: no CPUS/Cores found"
  | l -> List.iter (fun (_, cpus) -> Queue.add cpus cpus_per_core) l

(** fetches a set of cpus, if all sets of cpus are used by other domains, either
    waits for a set to be freed or crashes if [set_crash_on_unavailable ()] was
    called. *)
let get_cpus () =
  let rec aux () =
    if Queue.is_empty cpus_per_core then
      if Atomic.get crash then failwith "No free cores"
      else (
        Condition.wait cores_condition cores_mutex;
        aux ())
    else
      let cpus = Queue.pop cpus_per_core in
      cpus
  in
  Mutex.protect cores_mutex aux

(** When a domain is **properly** stopped (i.e. its `at_exit`) function
    executes, release the core (set of cpus) it uses, and signals to any waiting
    domain that a core has been free (and there are cpus that can be used). *)
let free_cpus cpus =
  Mutex.protect cores_mutex (fun () -> Queue.push cpus cpus_per_core);
  Condition.signal cores_condition

let spawn_aux f cpus =
  Processor.Affinity.set_ids cpus;
  Domain.at_exit (fun () -> free_cpus cpus);
  f ()

(** Ensure that the current domain runs a specific core. *)
let isolate_current () =
  if not !initialized then init_cpus ();
  check_mode true;
  let cpus = get_cpus () in
  Processor.Affinity.set_ids cpus

let spawn ?(isolated = true) f =
  if not !initialized then init_cpus ();
  check_mode isolated;
  if isolated then
    Domain.spawn (fun () ->
        let cpus = get_cpus () in
        spawn_aux f cpus)
  else Domain.spawn f

let spawn_n ?(isolated = true) ?n f =
  if not !initialized then init_cpus ();
  check_mode isolated;
  match n with
  | Some n ->
      if n <= 0 then
        failwith (Format.sprintf "spawn_n: expected n > 0, got n = %d" n)
      else if isolated then
        Mutex.protect cores_mutex (fun () ->
            let nb_free_cores = Queue.length cpus_per_core in
            let cpul =
              if n <= nb_free_cores then
                List.init n (fun _ -> Queue.pop cpus_per_core)
              else
                failwith
                  (Format.sprintf
                     "spawn_n: requested %d cores, but only %n are available" n
                     nb_free_cores)
            in
            Array.of_list
              (List.map
                 (fun cpus -> Domain.spawn (fun () -> spawn_aux f cpus))
                 cpul))
      else Array.of_list (List.init n (fun _ -> Domain.spawn f))
  | None ->
      if isolated then
        Mutex.protect cores_mutex (fun () ->
            let nb_free_cores = Queue.length cpus_per_core in
            let cpul =
              if nb_free_cores > 0 then
                List.init nb_free_cores (fun _ -> Queue.pop cpus_per_core)
              else failwith "spawn_n: no free cores"
            in
            Array.of_list
              (List.map
                 (fun cpus -> Domain.spawn (fun () -> spawn_aux f cpus))
                 cpul))
      else
        Array.of_list
          (List.init Processor.Query.core_count (fun _ -> Domain.spawn f))

module Domain = struct
  let spawn' = spawn

  include Domain

  let wait_on_unavailable = wait_on_unavailable
  let get_available_cores = get_available_cores
  let isolate_current = isolate_current
  let spawn = spawn'
  let spawn_n = spawn_n
end