123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108(** Async compatibility layer *)openAmqp_client_libopenAsyncmoduleDeferred=structtype'at='aDeferred.tletall_unit=Deferred.all_unitlettry_withf=Monitor.try_with~extract_exn:truef>>=function|Core.Result.Okv->return(`Okv)|Core.Result.Errorexn->return(`Errorexn)moduleList=structletinit~fn=Deferred.List.init~fnletiter?(how:[`Sequential|`Parallel]=`Parallel)~fl=Deferred.List.iter~how:(how:>Async_kernel.Monad_sequence.how)~flendendlet(>>=)=(>>=)let(>>|)=(>>|)letreturna=returnaletafterms=after(Core.Time.Span.of_msms)letspawn?exn_handlert=don't_wait_for(matchexn_handlerwith|Somehandler->beginMonitor.try_with(fun()->t)>>=function|Ok()->return()|Errorexn->handlerexnend|None->t)letwith_timeoutmillisecondsdeferred=letduration=Core.Time.Span.of_ms(float_of_intmilliseconds)inClock.with_timeoutdurationdeferredmoduleIvar=structincludeIvarendmoduleReader=structtypet=Reader.tletclose=Reader.closeletreadtbuf=Reader.really_readtbufendmoduleWriter=structtypet=Writer.tletwritetbuf=Writer.writetbufletcloset=Writer.closetletflusht=Writer.flushedtendmoduleTcp=structletconnect~exn_handler?nodelayhostport=letaddr=Core.Host_and_port.create~host~port|>Tcp.Where_to_connect.of_host_and_portinletmonitor=Monitor.create~name:"Network"()inMonitor.Exported_for_scheduler.within'~monitor(fun()->Tcp.connect~buffer_age_limit:`Unlimitedaddr)>>=fun(s,r,w)->spawn(Monitor.detach_and_get_next_errormonitor>>=exn_handler);(matchnodelaywith|Some()->Socket.setoptsSocket.Opt.nodelaytrue|None->());return(r,w)endmoduleLog=struct(* Use of a predefiend tag allows the caller to disable logging if needed *)lettags=["library","amqp_client"]letdebugfmt=Log.Global.debug~tagsfmtletinfofmt=Log.Global.info~tagsfmtleterrorfmt=Log.Global.error~tagsfmtend(* Pipes *)modulePipe=structmoduleWriter=structtype'at='aPipe.Writer.tendmoduleReader=structtype'at='aPipe.Reader.tendletcreate()=Pipe.create()letset_size_budgett=Pipe.set_size_budgettletflusht=Pipe.downstream_flushedt>>=fun_->return()letinterleave_pipet=Pipe.interleave_pipetletwriterelm=Pipe.writerelmletwrite_without_pushbackrelm=Pipe.write_without_pushbackrelmlettransfer_in~fromt=Ocaml_lib.Queue.iter(write_without_pushbackt)from;return()letclose_without_pushbackt=Pipe.closetletcloset=Pipe.closet;flusht>>=fun_->return()letreadr=Pipe.readrletiterr~f=Pipe.iterr~fletiter_without_pushbackr~f=Pipe.iter_without_pushbackr~fendmoduleScheduler=structletgo()=Scheduler.go()|>ignoreletshutdownn=Shutdown.shutdownnend