123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869openCoreopenAsynctype'aconsumer='a->unittype'aproducer=unit->'atypecannot_receivetype('b,'a)t={mutablenow:'a;(* the value at this node *)producer:unit->'a;(* produces the value at this node *)mutableto_list:('aconsumer)list;(* push updates to these consumers *)mutablefrom_list:('bproducer)list;(* combine the values from these producers *)receive:'b->unit(* recalculate at this node when this function is applied *)}letpropagate(node:('b,'a)t):unit=letv=node.nowinList.iternode.to_list~f:(funf->fv)letcreate(init:'a)(f:'blist->'a):('b,'a)t=letrecnode={now=init;producer=(fun()->node.now);to_list=[];from_list=[];receive=(funb->node.now<-f(List.mapnode.from_list~f:(funf->f()));propagatenode)}innodeletcreate_source(init:'a):(cannot_receive,'a)t=letrecnode={now=init;producer=(fun()->node.now);to_list=[];from_list=[];receive=(fun_->failwith"impossible: create_source node received a value")}innodeletpush(x:'a)(node:('b,'a)t):unit=node.now<-x;propagatenodeletattach(src:('a,'b)t)(dst:('b,'c)t):unit=src.to_list<-dst.receive::src.to_list;dst.from_list<-src.producer::dst.from_listletto_pipe(node:('b,'a)t):'a*'aPipe.Reader.t=let(r,w)=Pipe.create()inletconsumeb=Pipe.write_without_pushbackwbinnode.to_list<-consume::node.to_list;(node.producer(),r)letfrom_pipe(init:'a)(reader:'aPipe.Reader.t):(cannot_receive,'a)t=letrecnode={now=init;producer=(fun()->node.now);to_list=[];from_list=[];receive=(fun_->failwith"impossible: from_pipe node received a value")}inlet_=Pipe.iter_without_pushbackreader~f:(funx->node.now<-x;List.iternode.to_list~f:(funf->fx))innode