protocol.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285type connect = { verbose : bool; pedantic : bool; tls_required : bool; auth_token : string option; user : string option; pass : string option; name : string option; lang : string option; version : string option; protocol : int; echo : bool; sig_ : string option; jwt : string option; no_responders : bool; headers : bool; nkey : string option; } type info = { server_id : string; server_name : string option; version : string; go : string option; host : string; port : int; headers : bool; max_payload : int; proto : int; client_id : int option; auth_required : bool; tls_required : bool; tls_verify : bool; tls_available : bool; connect_urls : string list; ws_connect_urls : string list; ldm : bool; git_commit : string option; jetstream : bool; ip : string option; client_ip : string option; nonce : string option; cluster : string option; domain : string option; } type msg_meta = { subject : string; sid : string; reply_to : string option; payload_size : int; } type hmsg_meta = { subject : string; sid : string; reply_to : string option; header_size : int; total_size : int; } type message = { subject : string; sid : string; reply_to : string option; payload : string; headers : Headers.t option; } type parsed_line = | Ping | Pong | Ok | Err of string | Info of info | Msg_meta of msg_meta | Hmsg_meta of hmsg_meta let default_connect = { verbose = false; pedantic = false; tls_required = false; auth_token = None; user = None; pass = None; name = None; lang = None; version = None; protocol = 0; echo = false; sig_ = None; jwt = None; no_responders = false; headers = false; nkey = None; } let encode_connect ?(connect = default_connect) () = let add_some name value fields = match value with | None -> fields | Some value -> (name, value) :: fields in let add_bool name enabled fields = if enabled then (name, `Bool true) :: fields else fields in let add_int name value fields = if value <> 0 then (name, `Int value) :: fields else fields in let fields = [] |> add_some "nkey" (Option.map (fun value -> `String value) connect.nkey) |> add_bool "headers" connect.headers |> add_bool "no_responders" connect.no_responders |> add_some "jwt" (Option.map (fun value -> `String value) connect.jwt) |> add_some "sig" (Option.map (fun value -> `String value) connect.sig_) |> add_bool "echo" connect.echo |> add_int "protocol" connect.protocol |> add_some "version" (Option.map (fun value -> `String value) connect.version) |> add_some "lang" (Option.map (fun value -> `String value) connect.lang) |> add_some "name" (Option.map (fun value -> `String value) connect.name) |> add_some "pass" (Option.map (fun value -> `String value) connect.pass) |> add_some "user" (Option.map (fun value -> `String value) connect.user) |> add_some "auth_token" (Option.map (fun value -> `String value) connect.auth_token) |> add_bool "tls_required" connect.tls_required |> fun fields -> ("pedantic", `Bool connect.pedantic) :: fields |> fun fields -> ("verbose", `Bool connect.verbose) :: fields in "CONNECT " ^ Yojson.Safe.to_string (`Assoc fields) ^ "\r\n" let header_block headers = let buffer = Buffer.create 64 in Buffer.add_string buffer "NATS/1.0\r\n"; List.iter (fun (name, value) -> Buffer.add_string buffer name; Buffer.add_string buffer ": "; Buffer.add_string buffer value; Buffer.add_string buffer "\r\n") (Headers.to_list headers); Buffer.add_string buffer "\r\n"; Buffer.contents buffer let encode_pub ~subject ?reply_to payload = Printf.sprintf "PUB %s%s %d\r\n%s\r\n" subject (match reply_to with None -> "" | Some reply_to -> " " ^ reply_to) (String.length payload) payload let encode_sub ~subject ?queue_group ~sid () = Printf.sprintf "SUB %s%s %s\r\n" subject (match queue_group with None -> "" | Some queue_group -> " " ^ queue_group) sid let encode_unsub ~sid ?max_msgs () = Printf.sprintf "UNSUB %s%s\r\n" sid (match max_msgs with None -> "" | Some max_msgs -> " " ^ string_of_int max_msgs) let encode_ping () = "PING\r\n" let encode_pong () = "PONG\r\n" let encode_hpub ~subject ?reply_to ~headers payload = let headers = header_block headers in Printf.sprintf "HPUB %s%s %d %d\r\n%s%s\r\n" subject (match reply_to with None -> "" | Some reply_to -> " " ^ reply_to) (String.length headers) (String.length headers + String.length payload) headers payload let invalid what = invalid_arg what let parse_err line = let len = String.length line in if len >= 8 && String.sub line 0 6 = "-ERR '" && String.get line (len - 1) = '\'' then String.sub line 6 (len - 7) else invalid "invalid -ERR line" let parse_info line = let prefix = "INFO " in let json = String.sub line (String.length prefix) (String.length line - String.length prefix) |> Yojson.Safe.from_string in let fields = match json with | `Assoc fields -> fields | _ -> invalid "invalid INFO json" in let member name = List.assoc_opt name fields in let require name convert = match member name with | Some value -> convert value | None -> invalid ("missing INFO field: " ^ name) in let optional name convert = match member name with | None -> None | Some value -> Some (convert value) in let default name convert default = match member name with | None -> default | Some value -> convert value in let string = function `String value -> value | _ -> invalid "invalid INFO string" in let int = function | `Int value -> value | `Intlit value -> int_of_string value | _ -> invalid "invalid INFO int" in let bool = function `Bool value -> value | _ -> invalid "invalid INFO bool" in let string_list = function | `List values -> List.map string values | _ -> invalid "invalid INFO string list" in { server_id = require "server_id" string; server_name = optional "server_name" string; version = require "version" string; go = optional "go" string; host = require "host" string; port = require "port" int; headers = default "headers" bool false; max_payload = require "max_payload" int; proto = default "proto" int 0; client_id = optional "client_id" int; auth_required = default "auth_required" bool false; tls_required = default "tls_required" bool false; tls_verify = default "tls_verify" bool false; tls_available = default "tls_available" bool false; connect_urls = default "connect_urls" string_list []; ws_connect_urls = default "ws_connect_urls" string_list []; ldm = default "ldm" bool false; git_commit = optional "git_commit" string; jetstream = default "jetstream" bool false; ip = optional "ip" string; client_ip = optional "client_ip" string; nonce = optional "nonce" string; cluster = optional "cluster" string; domain = optional "domain" string; } let parse_msg_meta line = match String.split_on_char ' ' line with | [ "MSG"; subject; sid; payload_size ] -> { subject; sid; reply_to = None; payload_size = int_of_string payload_size } | [ "MSG"; subject; sid; reply_to; payload_size ] -> { subject; sid; reply_to = Some reply_to; payload_size = int_of_string payload_size; } | _ -> invalid "invalid MSG line" let parse_hmsg_meta line = match String.split_on_char ' ' line with | [ "HMSG"; subject; sid; header_size; total_size ] -> { subject; sid; reply_to = None; header_size = int_of_string header_size; total_size = int_of_string total_size; } | [ "HMSG"; subject; sid; reply_to; header_size; total_size ] -> { subject; sid; reply_to = Some reply_to; header_size = int_of_string header_size; total_size = int_of_string total_size; } | _ -> invalid "invalid HMSG line" let parse_server_line = function | "PING" -> Ping | "PONG" -> Pong | "+OK" -> Ok | line when String.starts_with ~prefix:"-ERR " line -> Err (parse_err line) | line when String.starts_with ~prefix:"INFO " line -> Info (parse_info line) | line when String.starts_with ~prefix:"HMSG " line -> Hmsg_meta (parse_hmsg_meta line) | line when String.starts_with ~prefix:"MSG " line -> Msg_meta (parse_msg_meta line) | _ -> invalid "unknown server line"