nats-client-async provides a Jane Street Async client for NATS. It builds on the nats-client protocol package and adds TCP connection management, publish/subscribe, request/reply, keepalive pings, and reconnect handling.
open Core
open Async
let connect_local () =
Nats_client_async.connect
(Some (Uri.of_string "nats://127.0.0.1:4222"))Passing None creates a disabled client. This is useful when a service should keep the same call path with or without NATS configured.
let publish_created client =
Nats_client_async.publish
client
~subject:"events.created"
"created"publish and publish_json are fire-and-forget. Use publish_result when the caller needs to know whether the message was queued or dropped.
let read_one client =
Nats_client_async.subscribe client ~subject:"events.*" ()
>>= function
| Error error -> Deferred.return (Error error)
| Ok subscription ->
Pipe.read subscription.messages
>>| function
| `Eof -> Or_error.error_string "subscription closed"
| `Ok message -> Ok message.Nats_client.Protocol.payloadSubscriptions are replayed after reconnect. Incoming messages carry the subject, subscription id, optional reply subject, payload, and optional headers.
let request_status client =
Nats_client_async.request
client
~subject:"service.status"
~timeout:(Time_ns.Span.of_sec 1.)
"status"The request API creates a private inbox subscription, publishes with that inbox as the reply subject, waits for one response, and then unsubscribes.
Nats_client_async exposes the Async client API.Nats_client.Protocol.message is the message type delivered by subscriptions.Nats_client.Headers can be passed to publish and request calls.