NATS Client Async

nats-client-async provides a Jane Street Async client for NATS. It builds on the nats-client protocol package and adds TCP connection management, publish/subscribe, request/reply, keepalive pings, and reconnect handling.

Connecting

open Core
open Async

let connect_local () =
  Nats_client_async.connect
    (Some (Uri.of_string "nats://127.0.0.1:4222"))

Passing None creates a disabled client. This is useful when a service should keep the same call path with or without NATS configured.

Publishing

let publish_created client =
  Nats_client_async.publish
    client
    ~subject:"events.created"
    "created"

publish and publish_json are fire-and-forget. Use publish_result when the caller needs to know whether the message was queued or dropped.

Subscribing

let read_one client =
  Nats_client_async.subscribe client ~subject:"events.*" ()
  >>= function
  | Error error -> Deferred.return (Error error)
  | Ok subscription ->
      Pipe.read subscription.messages
      >>| function
      | `Eof -> Or_error.error_string "subscription closed"
      | `Ok message -> Ok message.Nats_client.Protocol.payload

Subscriptions are replayed after reconnect. Incoming messages carry the subject, subscription id, optional reply subject, payload, and optional headers.

Request Reply

let request_status client =
  Nats_client_async.request
    client
    ~subject:"service.status"
    ~timeout:(Time_ns.Span.of_sec 1.)
    "status"

The request API creates a private inbox subscription, publishes with that inbox as the reply subject, waits for one response, and then unsubscribes.

Modules