Source file util.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
open! Values
open! Core
open Ppx_yojson_conv_lib.Yojson_conv.Primitives
open! Async

module Source = struct
  let default_chunk_size = Byte_units.of_megabytes 8.

  module File = struct
    let slice ~total ~file_size ~chunk_size (i : int64) =
      let open Int64 in
      let i = i - ((total - file_size) / chunk_size) in
      i * chunk_size, min (chunk_size * (i + one)) file_size - one
    ;;

    let read_slice ~start ~end_ fn =
      let open Int64 in
      let len = end_ - start + one |> Int64.to_int_exn in
      let buf = Bytes.create len in
      In_channel.with_file fn ~f:(fun ic ->
        In_channel.seek ic start;
        match In_channel.really_input ic ~buf ~pos:0 ~len with
        | None -> assert false
        | Some () -> ());
      buf |> Bytes.to_string
    ;;

    let min_part_size = 0xa00000L
    let default_num_parts = 10000L

    type stat =
      { chunk_size : int64
      ; file_size : int64
      ; partitions : int64
      }
    [@@deriving yojson]

    let params_of_file_size ~chunk_size ~file_size =
      let open Int64 in
      let chunk_size = Byte_units.bytes_int64 chunk_size in
      let partitions =
        match file_size <= min_part_size with
        | false -> min ((file_size / chunk_size) + one) default_num_parts
        | true -> one
      in
      let chunk_size =
        Int64.of_int
          (int_of_float
             (Float.round ~dir:`Up (to_float file_size /. to_float partitions)))
      in
      let chunk_size = max one chunk_size in
      { chunk_size; file_size; partitions }
    ;;

    let%expect_test "params_of_file_size" =
      let test ?(chunk_size = default_chunk_size) file_size =
        let r = params_of_file_size ~chunk_size ~file_size in
        print_endline (Yojson.Safe.to_string (yojson_of_stat r))
      in
      test 1024L;
      [%expect {| {"chunk_size":1024,"file_size":1024,"partitions":1} |}];
      test 2048L;
      [%expect {| {"chunk_size":2048,"file_size":2048,"partitions":1} |}];
      test Int64.(3L * min_part_size / 2L);
      [%expect {| {"chunk_size":7864320,"file_size":15728640,"partitions":2} |}];
      test ~chunk_size:(Byte_units.of_bytes_int 1) Int64.(2L * min_part_size);
      [%expect {| {"chunk_size":2098,"file_size":20971520,"partitions":10000} |}];
      test 0L;
      [%expect {| {"chunk_size":1,"file_size":0,"partitions":1} |}];
      test ~chunk_size:(Byte_units.of_bytes_int 0) 0L;
      [%expect {| {"chunk_size":1,"file_size":0,"partitions":1} |}];
      return ()
    ;;

    let stat ?(chunk_size = default_chunk_size) file =
      Unix.stat file
      >>| fun { Unix.Stats.size = file_size; _ } ->
      params_of_file_size ~chunk_size ~file_size
    ;;

    let load_all t =
      let filename = Uri.path (Uri.of_string t) in
      let data = In_channel.with_file filename ~f:In_channel.input_all in
      let total = String.length data |> Int64.of_int in
      data, total
    ;;
  end
end

let put_object cfg ~bucket ~key body =
  let key = ObjectKey.make key in
  let body = Body.of_string body in
  let request = PutObjectRequest.make ~body ~bucket ~key () in
  Io.put_object ~cfg request
  >>| function
  | Error e -> Error (`Put_object e)
  | Ok response -> (
    match response.eTag with
    | Some etag -> Ok etag
    | None -> Error `Missing_etag)
;;

let delete_object cfg ~bucket ~key =
  Io.delete_object
    ~cfg
    (DeleteObjectRequest.make ~bucket ~key:(ObjectKey.make key) ())
;;

let put_file cfg ~bucket ~key file =
  Reader.with_file file ~f:(fun reader ->
    let%bind body = Reader.contents reader in
    put_object cfg ~bucket ~key body)
;;

let get_object cfg ?(range : Awso.Http.Range.t option) ~bucket ~key () =
  let key = ObjectKey.make key in
  let range = Option.map ~f:Awso.Http.Range.to_header_value range in
  let request = GetObjectRequest.make ?range ~bucket ~key () in
  Io.get_object ~cfg request
;;

type ('acc, 'error) callback =
  'acc
  -> total:int64
  -> loaded:int64
  -> key:string
  -> part:int64
  -> num_parts:int64
  -> [ `Complete of ETag.t
     | `Initial of MultipartUploadId.t
     | `Partition of ETag.t
     ]
  -> ('acc, 'error) Deferred.Result.t

let initialize_multipart cfg ~bucket ~key =
  let key_obj = ObjectKey.make key in
  let req = CreateMultipartUploadRequest.make ~bucket ~key:key_obj () in
  Awso_async.Import.with_retries
  @@ fun () ->
  Io.create_multipart_upload ~cfg req
  >>| function
  | Ok { CreateMultipartUploadOutput.uploadId = Some uploadId; _ } ->
    Ok (`Upload_id uploadId)
  | Ok { CreateMultipartUploadOutput.uploadId = None; _ } ->
    Error `Missing_upload_id
  | Error e -> Error (`Create_multipart_upload e)
;;

let multipart
  cfg
  ?chunk_size
  ?(part = 0)
  ?(file_offset = Int64.zero)
  ~bucket
  ~key
  ~init
  ~(cb : ('acc, 'error) callback)
  ~upload_id
  file
  =
  Source.File.stat ?chunk_size file
  >>= fun { chunk_size; file_size; partitions = rem_parts } ->
  let key_obj = ObjectKey.make key in
  let total = Int64.(file_size + file_offset) in
  let loaded = Int64.(of_int part * chunk_size) in
  let part = Int64.of_int part in
  let num_parts = Int64.((file_offset / chunk_size) + rem_parts) in
  cb init ~total ~loaded ~key ~part ~num_parts (`Initial upload_id)
  >>| (function
        | Result.Ok acc -> `Ok (acc, upload_id)
        | Result.Error e -> `Callback_error (init, [], e))
  >>= function
  | `Callback_error _ as e -> return (Error e)
  | `Ok (acc, upload_id) -> (
    let upload_part acc (i : int64) =
      let open Int64 in
      let start, end_ = Source.File.slice ~total ~file_size ~chunk_size i in
      let part = Source.File.read_slice ~start ~end_ file in
      let i' = i + one in
      let contentLength = String.length part |> Int64.of_int in
      let contentMD5 = Awso.Client.content_md5_insecure part in
      let upload_part_request =
        UploadPartRequest.make
          ~bucket
          ~uploadId:upload_id
          ~partNumber:(to_int_exn i')
          ~body:(Body.of_string part)
          ~contentLength
          ~key:key_obj
          ~contentMD5
          ()
      in
      Io.upload_part ~cfg upload_part_request
      >>= function
      | Error e -> return (Error (`Upload_part e))
      | Ok uploadPartResp -> (
        let eTag = Option.value_exn uploadPartResp.UploadPartOutput.eTag in
        cb
          acc
          ~total
          ~loaded:(Int64.succ end_ + file_offset)
          ~key
          ~part:(i + one)
          ~num_parts
          (`Partition eTag)
        >>= function
        | Result.Error e -> return (Error (`Callback_error e))
        | Result.Ok acc ->
          return
          @@ Ok (acc, CompletedPart.make ~eTag ~partNumber:(to_int_exn i') ()))
    in
    num_parts
    |> Int64.to_int_exn
    |> List.range (part |> Int64.to_int_exn)
    |> List.map ~f:Int64.of_int
    |> Deferred.List.fold
         ~init:(Ok (acc, []))
         ~f:(fun acc_etags part ->
           match acc_etags with
           | Error _ as e -> return e
           | Ok (acc, etags) -> (
             upload_part acc part
             >>= function
             | Ok (acc', etag) -> return @@ Ok (acc', etag :: etags)
             | Error (`Callback_error e) ->
               return @@ Error (`Callback_error (acc, etags, e))
             | Error (`Upload_part e) -> return @@ Error (`Upload_part e)))
    >>= function
    | Error e -> return (Error e)
    | Ok (acc, rev_etags) -> (
      let multipartUpload =
        CompletedMultipartUpload.make ~parts:(List.rev rev_etags) ()
      in
      let req =
        CompleteMultipartUploadRequest.make
          ~multipartUpload
          ~bucket
          ~key:key_obj
          ~uploadId:upload_id
          ()
      in
      Io.complete_multipart_upload ~cfg req
      >>= function
      | Error e -> return (Error (`Complete_multipart_upload e))
      | Ok completedUpload -> (
        cb
          acc
          ~total
          ~loaded:total
          ~key
          ~part:num_parts
          ~num_parts
          (`Complete (Option.value_exn completedUpload.eTag))
        >>| function
        | Result.Ok acc' -> Ok (acc', List.rev rev_etags)
        | Result.Error e -> Error (`Callback_error (acc, List.rev rev_etags, e)))))
;;

let map_bucket cfg ~bucket ~f =
  let rec loop ?nextContinuationToken () =
    match%bind
      Io.list_objects_v2
        ~cfg
        (ListObjectsV2Request.make
           ?delimiter:None
           ?encodingType:None
           ?maxKeys:None
           ?prefix:None
           ?continuationToken:nextContinuationToken
           ?fetchOwner:None
           ?startAfter:None
           ?requestPayer:None
           ~bucket
           ())
    with
    | Error a -> return (Error a)
    | Ok
        { isTruncated
        ; contents
        ; name = _
        ; prefix = _
        ; delimiter = _
        ; maxKeys = _
        ; commonPrefixes = _
        ; encodingType = _
        ; keyCount = _
        ; continuationToken = _
        ; nextContinuationToken
        ; startAfter = _
        ; _
        } -> (
      match contents with
      | None -> return (Ok [])
      | Some contents -> (
        let%bind result = Deferred.List.map ~how:`Sequential contents ~f in
        match Option.value isTruncated ~default:false with
        | false -> return (Ok result)
        | true -> (
          let%map rest = loop ?nextContinuationToken () in
          match rest with
          | Ok rest -> Ok (result @ rest)
          | Error e -> Error e)))
  in
  loop ()
;;

let iter_bucket cfg ~bucket ~f =
  match%map map_bucket cfg ~bucket ~f with
  | Ok (_ : unit list) -> Ok ()
  | Error e -> Error e
;;