123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596(*
* Copyright (c) 2014-2015 Anil Madhavapeddy <anil@recoil.org>
* Copyright (c) 2015 Thomas Gazagnaire <thomas@gazagnaire.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
*)openSexplib0.Sexp_convtypedirect=[`Directofint*Vchan.Port.t]let(>>=)=Lwt.(>>=)let(/)=Filename.concatletfailfmt=Printf.ksprintffailwithfmtleterr_peer_not_found=fail"Conduit_xenstore: %s peer not found"leterr_no_entry_found()=fail"No /conduit Xenstore entry found. Run `xenstore-conduit-init`"leterr_port=fail"%s: invalid port"moduleMake(Xs:Xs_client_lwt.S)=structtypet={xs:(Xs.client[@sexp.opaque]);name:string}[@@derivingsexp_of]letget_my_idxs=Xs.(immediatexs(funh->readh"domid"))letxenstore_registerxsmyname=get_my_idxs>>=fundomid->Xs.(immediatexs(funh->writeh("/conduit"/myname)domid))letget_peer_idxsname=Lwt.catch(fun()->Xs.(immediatexs(funh->readh("/conduit"/name))))(fun_->err_peer_not_foundname)letreaddirhd=Xs.(directoryhd)>>=fundirs->letdirs=List.filter(funp->p<>"")dirsinmatchdirswith[]->raiseXs_protocol.Eagain|hd::_->Lwt.returnhdletregistername=Xs.make()>>=funxs->(* Check that a /conduit directory exists *)Lwt.catch(fun()->Xs.(immediatexs(funh->readh"/conduit"))>>=fun_->Lwt.return_unit)(fun_->err_no_entry_found())>>=fun()->xenstore_registerxsname>>=fun()->Lwt.return{xs;name}letaccept{xs;name}=letwaitfnh=readdirh("/conduit"/name)>>=funremote_name->readdirh("/conduit"/name/remote_name)>>=funport->Xs.readh("/conduit"/remote_name)>>=funremote_domid->letremote_domid=int_of_stringremote_domidinXs.rmh("/conduit"/name/remote_name)>>=fun()->matchVchan.Port.of_stringportwith|Error(`Msge)->err_porte|Okport->Lwt.return(`Direct(remote_domid,port))inXs.waitxswaitfnletlisten({name;_}asv)=(* TODO cancellation *)letconn,push_conn=Lwt_stream.create()inPrintf.printf"Conduit_xenstore: listen on %s\n%!"name;letrecloop()=acceptv>>=func->push_conn(Somec);loop()inLwt.ignore_result(loop());Lwt.returnconnletconnect{xs;name}~remote_name~port=letport_str=Vchan.Port.to_stringportinget_peer_idxsremote_name>>=funremote_domid->letremote_domid=int_of_stringremote_domidinletpath="/conduit"/remote_name/name/port_strinXs.(immediatexs(funh->writehpathport_str))>>=fun()->Lwt.return(`Direct(remote_domid,port))end