Source file container_writer.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
type 'a t = {
channel: out_channel;
codec: 'a Codec.t;
compression: string;
compress: bytes -> bytes;
sync_marker: bytes;
sync_interval: int;
mutable buffer: 'a list;
mutable objects_written: int;
}
let generate_sync_marker () =
Random.self_init ();
let marker = Bytes.create 16 in
for i = 0 to 15 do
Bytes.set marker i (Char.chr (Random.int 256))
done;
marker
let channel schema compression metadata sync_marker =
output_string channel "Obj\x01";
let meta_items =
("avro.schema", Fingerprint.to_canonical_json schema) ::
("avro.codec", compression) ::
metadata
in
let out = Output.create () in
Output.write_long out (Int64.of_int (List.length meta_items));
List.iter (fun (key, value) ->
Output.write_string out key;
Output.write_bytes out (Bytes.of_string value)
) meta_items;
Output.write_long out 0L;
output_bytes channel (Output.to_bytes out);
output_bytes channel sync_marker
let create ~path ~codec ?(compression="null") ?(metadata=[]) ?(sync_interval=4000) () =
let channel = open_out_bin path in
let sync_marker = generate_sync_marker () in
let compress =
match Codec_registry.get compression with
| Some (module C : Codec_registry.CODEC) ->
let compressor = C.create () in
C.compress compressor
| None ->
failwith (Printf.sprintf "Unknown compression codec: %s" compression)
in
write_header channel codec.Codec.schema compression metadata sync_marker;
{
channel;
codec;
compression;
compress;
sync_marker;
sync_interval;
buffer = [];
objects_written = 0;
}
let flush_block t =
if t.buffer = [] then ()
else begin
let objects = List.rev t.buffer in
let count = List.length objects in
let out = Output.create () in
List.iter (fun obj -> t.codec.Codec.encode obj out) objects;
let serialized = Output.to_bytes out in
let compressed = t.compress serialized in
let block_out = Output.create () in
Output.write_long block_out (Int64.of_int count);
Output.write_long block_out (Int64.of_int (Bytes.length compressed));
output_bytes t.channel (Output.to_bytes block_out);
output_bytes t.channel compressed;
output_bytes t.channel t.sync_marker;
t.buffer <- [];
t.objects_written <- t.objects_written + count
end
let write t value =
t.buffer <- value :: t.buffer;
if List.length t.buffer >= t.sync_interval then
flush_block t
let write_block t values =
flush_block t;
let acc = ref [] in
for i = 0 to Array.length values - 1 do
acc := values.(i) :: !acc
done;
t.buffer <- !acc;
flush_block t
let flush t =
flush_block t;
flush t.channel
let close t =
flush_block t;
close_out t.channel