123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125typehandlertypetopictypequeuetypepartition=inttypeoffset=int64typemessage=|Messageoftopic*partition*offset*string*stringoption(* topic, partition, offset, payload, optional key *)|PartitionEndoftopic*partition*offset(* topic, partition, offset *)typemsg_id=inttypeerror=(* Internal errors to rdkafka: *)|BAD_MSG(* Received message is incorrect *)|BAD_COMPRESSION(* Bad/unknown compression *)|DESTROY(* Broker is going away *)|FAIL(* Generic failure *)|TRANSPORT(* Broker transport error *)|CRIT_SYS_RESOURCE(* Critical system resource failure *)|RESOLVE(* Failed to resolve broker. *)|MSG_TIMED_OUT(* Produced message timed out. *)|UNKNOWN_PARTITION(* Permanent: Partition does not exist in cluster. *)|FS(* File or filesystem error *)|UNKNOWN_TOPIC(* Permanent: Topic does not exist in cluster. *)|ALL_BROKERS_DOWN(* All broker connections are down. *)|INVALID_ARG(* Invalid argument, or invalid configuration *)|TIMED_OUT(* Operation timed out *)|QUEUE_FULL(* Queue is full *)|ISR_INSUFF(* ISR count < required.acks *)(* Standard Kafka errors: *)|UNKNOWN|OFFSET_OUT_OF_RANGE|INVALID_MSG|UNKNOWN_TOPIC_OR_PART|INVALID_MSG_SIZE|LEADER_NOT_AVAILABLE|NOT_LEADER_FOR_PARTITION|REQUEST_TIMED_OUT|BROKER_NOT_AVAILABLE|REPLICA_NOT_AVAILABLE|MSG_SIZE_TOO_LARGE|STALE_CTRL_EPOCH|OFFSET_METADATA_TOO_LARGE(* Configuration errors *)|CONF_UNKNOWN(* Unknown configuration name. *)|CONF_INVALID(* Invalid configuration value. *)exceptionErroroferror*stringlet_=Callback.register_exception"kafka.error"(Error(UNKNOWN,"msg string"));externalnew_consumer:(string*string)list->handler="ocaml_kafka_new_consumer"externalnew_producer:?delivery_callback:(msg_id->erroroption->unit)->(string*string)list->handler="ocaml_kafka_new_producer"externaldestroy_handler:handler->unit="ocaml_kafka_destroy_handler"externalhandler_name:handler->string="ocaml_kafka_handler_name"externalnew_topic:?partitioner_callback:(int->string->partition)->handler->string->(string*string)list->topic="ocaml_kafka_new_topic"externaldestroy_topic:topic->unit="ocaml_kafka_destroy_topic"externaltopic_name:topic->string="ocaml_kafka_topic_name"[@@noalloc](*
Note that the id is restricted to be some int value.
While the underlying library, librdkafka, allows any void* msg_opaque data.
This is to avoid issues with the garbage collector
*)externalproduce_idmsg:topic->partition->?key:string->msg_id->string->unit="ocaml_kafka_produce"letproducetopicpartition?key?(msg_id=0)msg=produce_idmsgtopicpartition?keymsg_idmsgexternaloutq_len:handler->int="ocaml_kafka_outq_len"externalpoll:handler->int->int="ocaml_kafka_poll"letpoll_events?(timeout_ms=1000)handler=pollhandlertimeout_msletwait_delivery?(timeout_ms=100)?(max_outq_len=0)handler=letrecloop()=ifoutq_lenhandler>max_outq_lenthen(ignore(poll_events~timeout_mshandler);loop())else()inloop()externalconsume_start:topic->partition->offset->unit="ocaml_kafka_consume_start"externalconsume_stop:topic->partition->unit="ocaml_kafka_consume_stop"letpartition_unassigned=-1letoffset_beginning=-2Lletoffset_end=-1Lletoffset_stored=-1000Lletoffset_tailn=Int64.sub(-2000L)(Int64.of_intn)externalconsume:?timeout_ms:int->topic->partition->message="ocaml_kafka_consume"externalconsume_batch:?timeout_ms:int->?msg_count:int->topic->partition->messagelist="ocaml_kafka_consume_batch"externalstore_offset:topic->partition->offset->unit="ocaml_kafka_store_offset"externalnew_queue:handler->queue="ocaml_kafka_new_queue"externaldestroy_queue:queue->unit="ocaml_kafka_destroy_queue"externalconsume_start_queue:queue->topic->partition->offset->unit="ocaml_kafka_consume_start_queue"externalconsume_queue:?timeout_ms:int->queue->message="ocaml_kafka_consume_queue"externalconsume_batch_queue:?timeout_ms:int->?msg_count:int->queue->messagelist="ocaml_kafka_consume_batch_queue"moduleMetadata=structtypetopic_metadata={topic_name:string;topic_partitions:partitionlist;}endexternalget_topic_metadata:handler->topic->int->Metadata.topic_metadata="ocaml_kafka_get_topic_metadata"externalget_topics_metadata:handler->bool->int->Metadata.topic_metadatalist="ocaml_kafka_get_topics_metadata"lettopic_metadata?(timeout_ms=1000)handlertopic=get_topic_metadatahandlertopictimeout_msletlocal_topics_metadata?(timeout_ms=1000)handler=get_topics_metadatahandlerfalsetimeout_msletall_topics_metadata?(timeout_ms=1000)handler=get_topics_metadatahandlertruetimeout_ms