12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485letsrc=Logs.Src.create"mnet.fragments"moduleLog=(valLogs.src_logsrc:Logs.LOG)moduleSBstr=Slice_bstrtypepayload=SliceofSBstr.t|StringofstringmoduleMake(Key:Hashtbl.HashedType)=structmoduleValue=structtypet={to_expire:int;fragment:Fragment.t;count:int}letweight{fragment;_}=Fragment.weightfragmentendmoduleCache=Lru.M.Make(Key)(Value)typet={cache:Cache.t;to_expire:int}letmax_expiration=Int64.to_int(Duration.of_sec10)letcreate?(to_expire=max_expiration)()={cache=Cache.create(1024*256);to_expire}letcatch~on_exnfn=tryfn()withexn->on_exnexnletinsert~nowtkey?(last=false)~off~lenslice=match(off,last,Cache.findkeyt.cache)with|0,true,None->Log.debug(funm->m"receive unfragmented packet");Some(key,Slice(SBstr.subslice~off:0~len))(* unfragmented packet *)|_,_,None->Log.debug(funm->m"receive new fragment");(* NOTE(dinosaure): we have an new fragment which is not recorded
into our cache. We [add] this new fragment and [trim] our
cache to avoid an OOM. *)letfragment=Fragment.singleton~off~len~limit:lastsliceinletto_expire=now+t.to_expireinletvalue={Value.to_expire;count=1;fragment}inCache.addkeyvaluet.cache;Cache.trimt.cache;None|_,_,Some{count;_}whencount>16->Log.debug(funm->m"Too many fragments received");(* NOTE(dinosaure): from @hannesm, if we have more than 16
fragments, we just delete our entry from our cache. *)Cache.removekeyt.cache;None|_,_,Some{to_expire;_}whento_expire<now->Log.debug(funm->m"Too old fragment");(* NOTE(dinosaure): from @hannesm, if we found an entry and get a new
fragment [max_expiration]ns (10secs), we delete the old entry
and create a new one. *)letfragment=Fragment.singleton~off~len~limit:lastsliceinletto_expire=now+t.to_expireinletvalue={Value.to_expire;count=1;fragment}inCache.addkeyvaluet.cache;None|_,_,Some{fragment;count;to_expire}->Log.debug(funm->m"receive a fragment which completes an existing packet");(* NOTE(dinosaure): the basic execution path. If the fragment does not
fit into our entry, we remove it. Otherwise, we insert the new
incoming fragment. If the resulted entry is fullfilled, we returns
the result. Otherwise, we update our cache with our new entry and
[trim] our cache to avoid an OOM.
NOTE(dinosaure): [Cache.add] does a promotion of our entry into our
cache also. *)leton_exn_exn=Cache.removekeyt.cache;Noneincatch~on_exn@@fun()->letstr=SBstr.sub_string~off:0~lensliceinletfragment=Fragment.insertfragment~off~limit:laststrinifFragment.is_completefragmentthenbeginletstr=Fragment.reassemble_exnfragmentinCache.removekeyt.cache;Some(key,Stringstr)endelsebeginletvalue={Value.fragment;count=count+1;to_expire}inCache.addkeyvaluet.cache;Cache.trimt.cache;Noneendend