···
3
-
(* Include shared types module *)
4
-
module Requests_types = Requests_types
6
-
(* Include cache module *)
7
-
module Requests_cache = Requests_cache
9
-
(* Initialize the RNG on module load for OAuth and other crypto operations *)
10
-
let () = Mirage_crypto_rng_unix.use_default ()
14
-
| Http_error of { status : Cohttp.Code.status_code; body : string; headers : Cohttp.Header.t }
15
-
| Connection_error of string
17
-
| Too_many_redirects
18
-
| Max_retry_error of { url : Uri.t; reason : string }
20
-
| Pool_error of string
21
-
| Proxy_error of string
22
-
| Protocol_error of string
23
-
| Header_parsing_error of string
24
-
| Certificate_verification_error of string
26
-
let pp_error ppf = function
27
-
| Http_error { status; body; _ } ->
28
-
Format.fprintf ppf "HTTP error %s: %s"
29
-
(Cohttp.Code.string_of_status status) body
30
-
| Connection_error msg -> Format.fprintf ppf "Connection error: %s" msg
31
-
| Timeout_error -> Format.fprintf ppf "Request timeout"
32
-
| Too_many_redirects -> Format.fprintf ppf "Too many redirects"
33
-
| Max_retry_error { url; reason } ->
34
-
Format.fprintf ppf "Max retries exceeded for %a: %s" Uri.pp url reason
35
-
| Pool_exhausted -> Format.fprintf ppf "Connection pool exhausted"
36
-
| Pool_error msg -> Format.fprintf ppf "Pool error: %s" msg
37
-
| Proxy_error msg -> Format.fprintf ppf "Proxy error: %s" msg
38
-
| Protocol_error msg -> Format.fprintf ppf "Protocol error: %s" msg
39
-
| Header_parsing_error msg -> Format.fprintf ppf "Header parsing error: %s" msg
40
-
| Certificate_verification_error msg -> Format.fprintf ppf "Certificate error: %s" msg
42
-
exception Request_error of error
45
-
let log_src = Logs.Src.create "requests" ~doc:"HTTP requests library"
46
-
module Log = (val Logs.src_log log_src : Logs.LOG)
58
-
(* Authentication mechanisms - defined early for use in Config *)
59
-
module Auth = struct
62
-
| Basic of { username : string; password : string }
63
-
| DigestAuth of { username : string; password : string; challenge : string option ref }
64
-
| Bearer of { token : string }
66
-
consumer_key : string;
67
-
consumer_secret : string;
68
-
token : string option;
69
-
token_secret : string option;
70
-
signature_method : [`HMAC_SHA1 | `HMAC_SHA256 | `PLAINTEXT];
73
-
client_id : string option;
74
-
client_secret : string option;
75
-
token_type : string;
76
-
access_token : string;
78
-
| Custom of (meth -> Uri.t -> Cohttp.Header.t -> Cohttp.Header.t)
82
-
let basic ~username ~password = Basic { username; password }
84
-
let digest ~username ~password = DigestAuth { username; password; challenge = ref (None : string option) }
86
-
let bearer ~token = Bearer { token }
88
-
let oauth1 ~consumer_key ~consumer_secret ?token ?token_secret
89
-
?(signature_method=`HMAC_SHA1) () =
90
-
OAuth1 { consumer_key; consumer_secret; token; token_secret; signature_method }
92
-
let oauth2 ?client_id ?client_secret ?(token_type="Bearer") ~access_token () =
93
-
OAuth2 { client_id; client_secret; token_type; access_token }
95
-
let custom f = Custom f
97
-
let apply auth meth uri headers =
100
-
| Basic { username; password } ->
101
-
let encoded = Base64.encode_string (Printf.sprintf "%s:%s" username password) in
102
-
Cohttp.Header.add headers "Authorization" (Printf.sprintf "Basic %s" encoded)
103
-
| DigestAuth { username; password; challenge } ->
104
-
(* Use stored challenge if available, otherwise headers unchanged (will trigger 401) *)
105
-
(match !challenge with
106
-
| None -> headers (* No challenge yet, will get 401 response *)
107
-
| Some challenge_header ->
108
-
(* Apply digest auth with challenge *)
109
-
Digest_auth.apply_digest_auth ~username ~password ~meth ~uri ~headers ~body:None ~challenge_header)
110
-
| Bearer { token } ->
111
-
Cohttp.Header.add headers "Authorization" (Printf.sprintf "Bearer %s" token)
112
-
| OAuth1 { consumer_key; consumer_secret; token; token_secret; signature_method } ->
113
-
let timestamp = Printf.sprintf "%.0f" (Unix.gettimeofday ()) in
114
-
(* Generate cryptographically secure nonce using mirage-crypto-rng *)
115
-
let nonce_bytes = Mirage_crypto_rng.generate 16 in
116
-
let nonce = Base64.encode_string nonce_bytes in
118
-
let signature_method_str = match signature_method with
119
-
| `HMAC_SHA1 -> "HMAC-SHA1"
120
-
| `HMAC_SHA256 -> "HMAC-SHA256"
121
-
| `PLAINTEXT -> "PLAINTEXT" in
123
-
let oauth_params = [
124
-
("oauth_consumer_key", consumer_key);
125
-
("oauth_nonce", nonce);
126
-
("oauth_signature_method", signature_method_str);
127
-
("oauth_timestamp", timestamp);
128
-
("oauth_version", "1.0");
129
-
] @ (match token with
130
-
| Some t -> [("oauth_token", t)]
133
-
(* Build signature base string *)
134
-
let method_str = match meth with
135
-
| `GET -> "GET" | `POST -> "POST" | `PUT -> "PUT"
136
-
| `DELETE -> "DELETE" | `HEAD -> "HEAD" | `OPTIONS -> "OPTIONS"
137
-
| `PATCH -> "PATCH" in
139
-
let normalized_url =
140
-
let u = Uri.with_port (Uri.with_fragment uri None) None in
143
-
let params_for_sig =
145
-
(Uri.query uri |> List.map (fun (k, vs) ->
146
-
List.map (fun v -> (k, v)) vs) |> List.flatten) in
148
-
let sorted_params = List.sort (fun (k1, v1) (k2, v2) ->
149
-
match String.compare k1 k2 with
150
-
| 0 -> String.compare v1 v2
151
-
| n -> n) params_for_sig in
153
-
let param_string = sorted_params
154
-
|> List.map (fun (k, v) ->
155
-
Printf.sprintf "%s=%s" (Uri.pct_encode k) (Uri.pct_encode v))
156
-
|> String.concat "&" in
158
-
let base_string = String.concat "&" [
159
-
Uri.pct_encode method_str;
160
-
Uri.pct_encode normalized_url;
161
-
Uri.pct_encode param_string
164
-
(* Generate signature *)
165
-
let signature = match signature_method with
167
-
Printf.sprintf "%s&%s"
168
-
(Uri.pct_encode consumer_secret)
169
-
(Uri.pct_encode (Option.value ~default:"" token_secret))
171
-
let signing_key = Printf.sprintf "%s&%s"
172
-
(Uri.pct_encode consumer_secret)
173
-
(Uri.pct_encode (Option.value ~default:"" token_secret)) in
174
-
let raw_sig = Digestif.SHA1.hmac_string ~key:signing_key base_string in
175
-
Base64.encode_string (Digestif.SHA1.to_raw_string raw_sig)
177
-
let signing_key = Printf.sprintf "%s&%s"
178
-
(Uri.pct_encode consumer_secret)
179
-
(Uri.pct_encode (Option.value ~default:"" token_secret)) in
180
-
let raw_sig = Digestif.SHA256.hmac_string ~key:signing_key base_string in
181
-
Base64.encode_string (Digestif.SHA256.to_raw_string raw_sig)
184
-
let full_oauth_params = ("oauth_signature", signature) :: oauth_params in
185
-
let oauth_header = "OAuth " ^ String.concat ", "
186
-
(List.map (fun (k, v) ->
187
-
Printf.sprintf "%s=\"%s\"" k (Uri.pct_encode v))
188
-
(List.sort (fun (k1,_) (k2,_) -> String.compare k1 k2) full_oauth_params)) in
189
-
Cohttp.Header.add headers "Authorization" oauth_header
190
-
| OAuth2 { client_id; client_secret; token_type; access_token } ->
191
-
(* OAuth2 can use client credentials in headers for some flows *)
193
-
match client_id, client_secret with
194
-
| Some id, Some secret when id <> "" && secret <> "" ->
195
-
(* Add client credentials as basic auth for token endpoint requests *)
196
-
let encoded = Base64.encode_string (Printf.sprintf "%s:%s" id secret) in
197
-
Cohttp.Header.add headers "X-Client-Authorization" (Printf.sprintf "Basic %s" encoded)
200
-
Cohttp.Header.add headers "Authorization" (Printf.sprintf "%s %s" token_type access_token)
201
-
| Custom f -> f meth uri headers
204
-
module Response = Requests_types.Response
206
-
(* Retry Implementation *)
207
-
module Retry = struct
217
-
error : exn option;
218
-
status : int option;
219
-
redirect_location : string option;
224
-
connect : int option;
226
-
redirect : int option;
227
-
status : int option;
228
-
other : int option;
229
-
allowed_methods : meth list;
230
-
status_forcelist : int list;
232
-
raise_on_redirect : bool;
233
-
raise_on_status : bool;
234
-
respect_retry_after : bool;
235
-
remove_headers_on_redirect : string list;
236
-
history : history list;
237
-
mutable retry_count : int;
240
-
let default_backoff = { factor = 0.0; jitter = 0.0; max = 120.0 }
241
-
let default_allowed_methods = [`HEAD; `GET; `PUT; `DELETE; `OPTIONS]
242
-
let default_remove_headers = ["Cookie"; "Authorization"; "Proxy-Authorization"]
251
-
allowed_methods = default_allowed_methods;
252
-
status_forcelist = [];
253
-
backoff = default_backoff;
254
-
raise_on_redirect = true;
255
-
raise_on_status = true;
256
-
respect_retry_after = true;
257
-
remove_headers_on_redirect = default_remove_headers;
262
-
let create ?total ?(connect=None) ?(read=None) ?(redirect=None) ?(status=None) ?(other=None)
263
-
?(allowed_methods=default_allowed_methods)
264
-
?(status_forcelist=[])
265
-
?(backoff=default_backoff)
266
-
?(raise_on_redirect=true)
267
-
?(raise_on_status=true)
268
-
?(respect_retry_after=true)
269
-
?(remove_headers_on_redirect=default_remove_headers) () =
270
-
let total = Option.value total ~default:10 in
271
-
{ total; connect; read; redirect; status; other;
272
-
allowed_methods; status_forcelist; backoff;
273
-
raise_on_redirect; raise_on_status; respect_retry_after;
274
-
remove_headers_on_redirect; history = []; retry_count = 0 }
276
-
let disabled = { default with total = 0 }
278
-
let get_history t = t.history
280
-
let increment t ~method_ ~url ?response ?error () =
281
-
let status = Option.map (fun r ->
282
-
Cohttp.Code.code_of_status (Response.status r)) response in
283
-
let redirect_location = match response with
284
-
| Some r -> Cohttp.Header.get (Response.headers r) "location"
287
-
let history_entry = { method_; url; error; status; redirect_location } in
288
-
{ t with history = history_entry :: t.history; retry_count = t.retry_count + 1 }
290
-
let is_retry t ~method_ ~status_code =
291
-
if t.retry_count >= t.total then false
292
-
else if not (List.mem method_ t.allowed_methods) then false
293
-
else List.mem status_code t.status_forcelist
295
-
let get_backoff_time t =
296
-
if t.backoff.factor = 0.0 then 0.0
298
-
let base_time = t.backoff.factor *. (2.0 ** float_of_int t.retry_count) in
299
-
(* Use Mirage crypto RNG for jitter calculation *)
300
-
let rand_bytes = Mirage_crypto_rng.generate 4 in
301
-
let rand_cstruct = Cstruct.of_string rand_bytes in
302
-
let rand_uint32 = Cstruct.LE.get_uint32 rand_cstruct 0 in
303
-
(* Convert to float in [0, 1) range *)
304
-
let normalized = Int32.to_float rand_uint32 /. (2.0 ** 32.0) in
305
-
let jittered = base_time +. (normalized *. t.backoff.jitter) in
306
-
min jittered t.backoff.max
308
-
let sleep ~clock t response =
310
-
match t.respect_retry_after, response with
311
-
| true, Some resp ->
312
-
(match Cohttp.Header.get (Response.headers resp) "retry-after" with
313
-
| Some retry_after ->
314
-
(try float_of_string retry_after with _ -> get_backoff_time t)
315
-
| None -> get_backoff_time t)
316
-
| _ -> get_backoff_time t
318
-
if backoff_time > 0.0 then
319
-
Eio.Time.sleep clock backoff_time
322
-
module Config = struct
324
-
headers : Cohttp.Header.t;
325
-
timeout : float option;
326
-
follow_redirects : bool;
327
-
max_redirects : int;
332
-
let create ?(headers=Cohttp.Header.init ()) ?timeout ?(follow_redirects=true)
333
-
?(max_redirects=10) ?(verify_tls=true) ?(auth=Auth.none) () =
334
-
{ headers; timeout; follow_redirects; max_redirects; verify_tls; auth }
336
-
let default = create ()
338
-
let with_headers t headers = { t with headers }
340
-
let add_header key value t =
341
-
{ t with headers = Cohttp.Header.add t.headers key value }
343
-
let with_timeout t timeout = { t with timeout = Some timeout }
344
-
let with_follow_redirects t follow_redirects = { t with follow_redirects }
345
-
let with_max_redirects t max_redirects = { t with max_redirects }
346
-
let with_verify_tls t verify_tls = { t with verify_tls }
347
-
let _with_auth t auth = { t with auth }
350
-
Format.fprintf ppf "@[<v>Config:@,Redirects: %b (max %d)@,Timeout: %a@,TLS verify: %b@]"
351
-
t.follow_redirects t.max_redirects
352
-
(fun ppf -> function None -> Format.fprintf ppf "none" | Some f -> Format.fprintf ppf "%.2fs" f) t.timeout
356
-
module Tls = struct
359
-
| WithCaCerts of X509.Authenticator.t
360
-
| Custom of Tls.Config.client
363
-
let default () = Default
365
-
let with_ca_certs auth = WithCaCerts auth
367
-
let with_custom config = Custom config
369
-
let insecure () = Insecure
371
-
let _pp_config ppf = function
372
-
| Default -> Format.fprintf ppf "Default TLS"
373
-
| WithCaCerts _ -> Format.fprintf ppf "Custom CA certs"
374
-
| Custom _ -> Format.fprintf ppf "Custom TLS config"
375
-
| Insecure -> Format.fprintf ppf "Insecure (no verification)"
377
-
let to_tls_config : config -> (Tls.Config.client, [> `Msg of string ]) result = function
379
-
(match Ca_certs.authenticator () with
380
-
| Ok authenticator ->
381
-
Tls.Config.client ~authenticator ()
382
-
| Error _ as e -> e)
383
-
| WithCaCerts auth ->
384
-
Tls.Config.client ~authenticator:auth ()
388
-
let authenticator ?ip:_ ~host:_ _ = Ok None in
389
-
Tls.Config.client ~authenticator ()
392
-
type clock = Clock : _ Eio.Time.clock -> clock
397
-
tls_config : Tls.config;
398
-
default_headers : Cohttp.Header.t;
399
-
cache : Requests_cache.t option;
400
-
} constraint 'a = [> `Generic] Net.ty
402
-
let create ?(tls_config=Tls.default ()) ?(default_headers=Cohttp.Header.init ()) ?cache ~clock net =
403
-
{ net; clock = Clock clock; tls_config; default_headers; cache }
405
-
let create_with_cache ~sw ?(tls_config=Tls.default ()) ?(default_headers=Cohttp.Header.init ())
406
-
~cache_dir ~clock net =
407
-
let cache = Requests_cache.create ~sw ~enabled:true ~cache_dir () in
408
-
{ net; clock = Clock clock; tls_config; default_headers; cache = Some cache }
411
-
let make_client net tls_config =
412
-
match Tls.to_tls_config tls_config with
414
-
let https_fn uri socket =
417
-
|> Option.map (fun x -> Domain_name.(host_exn (of_string_exn x)))
419
-
Tls_eio.client_of_flow ?host tls_config socket
421
-
Cohttp_eio.Client.make ~https:(Some https_fn) net
422
-
| Error (`Msg msg) ->
423
-
failwith ("TLS configuration error: " ^ msg)
425
-
let merge_headers base_headers request_headers =
426
-
Cohttp.Header.fold (fun key value acc ->
427
-
Cohttp.Header.add acc key value
428
-
) request_headers base_headers
430
-
let rec request_with_redirects ~sw client config uri redirect_count meth body =
431
-
if redirect_count > config.Config.max_redirects then
432
-
raise (Request_error Too_many_redirects);
434
-
let headers = config.Config.headers in
435
-
let resp, response_body =
437
-
| `GET -> Cohttp_eio.Client.get ~sw client uri ~headers
439
-
let body = match body with
440
-
| Some b -> Flow.string_source b
441
-
| None -> Flow.string_source ""
443
-
Cohttp_eio.Client.post ~sw client uri ~headers ~body
445
-
let body = match body with
446
-
| Some b -> Flow.string_source b
447
-
| None -> Flow.string_source ""
449
-
Cohttp_eio.Client.put ~sw client uri ~headers ~body
450
-
| `DELETE -> Cohttp_eio.Client.delete ~sw client uri ~headers
452
-
let response = Cohttp_eio.Client.head ~sw client uri ~headers in
453
-
(response, Cohttp_eio.Body.of_string "")
455
-
Cohttp_eio.Client.call ~sw client `OPTIONS uri ~headers
457
-
let body = match body with
458
-
| Some b -> Flow.string_source b
459
-
| None -> Flow.string_source ""
461
-
Cohttp_eio.Client.call ~sw client `PATCH uri ~headers ~body
464
-
let status = Cohttp.Response.status resp in
465
-
let headers = Cohttp.Response.headers resp in
467
-
if config.Config.follow_redirects && Response.is_redirect { Response.status; headers; body = ""; body_stream = None } then
468
-
match Cohttp.Header.get headers "location" with
470
-
let new_uri = Uri.resolve "" uri (Uri.of_string location) in
471
-
request_with_redirects ~sw client config new_uri (redirect_count + 1) meth body
473
-
let body = Eio.Flow.read_all response_body in
474
-
{ Response.status; headers; body; body_stream = None }
476
-
let body = Eio.Flow.read_all response_body in
477
-
{ Response.status; headers; body; body_stream = None }
479
-
let rec request_with_retries ~sw t ?(config=Config.default) ?body ~meth uri retry_state =
480
-
(* Check cache first for GET/HEAD requests *)
481
-
let check_cache () =
482
-
match t.cache, meth with
483
-
| Some cache, (`GET | `HEAD) ->
484
-
Requests_cache.get cache ~method_:meth ~url:uri ~headers:config.Config.headers
488
-
match check_cache () with
489
-
| Some cached_response ->
490
-
Log.debug (fun m -> m "Using cached response for %s" (Uri.to_string uri));
493
-
let client = make_client t.net t.tls_config in
494
-
let merged_headers = merge_headers t.default_headers config.Config.headers in
495
-
(* Apply authentication *)
496
-
let merged_headers = Auth.apply config.Config.auth meth uri merged_headers in
497
-
let config = { config with Config.headers = merged_headers } in
500
-
let result = request_with_redirects ~sw client config uri 0 meth body in
502
-
(* Handle Digest auth challenge if we get a 401 *)
503
-
let result = match config.Config.auth with
504
-
| DigestAuth { username = _; password = _; challenge } when result.Response.status = `Unauthorized ->
505
-
(match Cohttp.Header.get result.Response.headers "www-authenticate" with
506
-
| Some www_auth when String.starts_with ~prefix:"Digest" www_auth ->
507
-
(* Store the challenge *)
508
-
challenge := Some www_auth;
509
-
Log.debug (fun m -> m "Got Digest challenge, retrying with auth");
511
-
(* Retry request with digest auth *)
512
-
let merged_headers = merge_headers t.default_headers config.Config.headers in
513
-
let merged_headers = Auth.apply config.Config.auth meth uri merged_headers in
514
-
let config = { config with Config.headers = merged_headers } in
515
-
request_with_redirects ~sw client config uri 0 meth body
520
-
(* Store successful responses in cache *)
521
-
(match t.cache with
522
-
| Some cache when Response.is_success result ->
523
-
Requests_cache.put cache ~method_:meth ~url:uri
524
-
~request_headers:config.Config.headers ~response:result
527
-
if not (Response.is_success result) then
528
-
let status = Cohttp.Code.code_of_status result.Response.status in
529
-
if Retry.is_retry retry_state ~method_:meth ~status_code:status then begin
530
-
Log.info (fun m -> m "Retrying request to %a (attempt %d/%d)"
531
-
Uri.pp uri (retry_state.Retry.retry_count + 1) retry_state.Retry.total);
532
-
let retry_state = Retry.increment retry_state ~method_:meth ~url:uri
533
-
~response:result () in
534
-
(match t.clock with Clock c -> Retry.sleep ~clock:c retry_state (Some result));
535
-
request_with_retries ~sw t ~config ?body ~meth uri retry_state
537
-
raise (Request_error (Http_error {
538
-
status = result.Response.status;
539
-
body = result.Response.body;
540
-
headers = result.Response.headers
545
-
| Request_error _ as e -> raise e
547
-
(* Check if we should retry on connection errors *)
548
-
if retry_state.Retry.retry_count < retry_state.Retry.total &&
549
-
List.mem meth retry_state.Retry.allowed_methods then begin
550
-
Log.info (fun m -> m "Retrying request to %a after error: %s (attempt %d/%d)"
551
-
Uri.pp uri (Printexc.to_string e)
552
-
(retry_state.Retry.retry_count + 1) retry_state.Retry.total);
553
-
let retry_state = Retry.increment retry_state ~method_:meth ~url:uri
554
-
~error:(Request_error (Connection_error (Printexc.to_string e))) () in
555
-
(match t.clock with Clock c -> Retry.sleep ~clock:c retry_state None);
556
-
request_with_retries ~sw t ~config ?body ~meth uri retry_state
558
-
raise (Request_error (Connection_error (Printexc.to_string e)))
560
-
let request ~sw t ?(config=Config.default) ?body ~meth uri =
561
-
let retry_state = Retry.default in
562
-
request_with_retries ~sw t ~config ?body ~meth uri retry_state
564
-
let get ~sw t ?config uri =
565
-
request ~sw t ?config ~meth:`GET uri
567
-
let post ~sw t ?config ?body uri =
568
-
request ~sw t ?config ?body ~meth:`POST uri
570
-
let put ~sw t ?config ?body uri =
571
-
request ~sw t ?config ?body ~meth:`PUT uri
573
-
let delete ~sw t ?config uri =
574
-
request ~sw t ?config ~meth:`DELETE uri
576
-
let head ~sw t ?config uri =
577
-
request ~sw t ?config ~meth:`HEAD uri
579
-
let patch ~sw t ?config ?body uri =
580
-
request ~sw t ?config ?body ~meth:`PATCH uri
582
-
module Json = struct
583
-
let get ~sw t ?config uri =
584
-
let response = get ~sw t ?config uri in
585
-
Yojson.Safe.from_string response.Response.body
587
-
let post ~sw t ?config json uri =
588
-
let body = Yojson.Safe.to_string json in
589
-
let config = match config with
590
-
| Some c -> Some (Config.add_header "Content-Type" "application/json" c)
591
-
| None -> Some (Config.add_header "Content-Type" "application/json" Config.default)
593
-
let response = post ~sw t ?config ~body uri in
594
-
Yojson.Safe.from_string response.Response.body
596
-
let put ~sw t ?config json uri =
597
-
let body = Yojson.Safe.to_string json in
598
-
let config = match config with
599
-
| Some c -> Some (Config.add_header "Content-Type" "application/json" c)
600
-
| None -> Some (Config.add_header "Content-Type" "application/json" Config.default)
602
-
let response = put ~sw t ?config ~body uri in
603
-
Yojson.Safe.from_string response.Response.body
606
-
module Form = struct
607
-
type t = (string * string list) list
611
-
|> List.map (fun (key, values) ->
612
-
List.map (fun value ->
613
-
Printf.sprintf "%s=%s" (Uri.pct_encode key) (Uri.pct_encode value)
617
-
|> String.concat "&"
620
-
let post_form ~sw t ?config ~form uri =
621
-
let body = Form.encode form in
622
-
let config = match config with
623
-
| Some c -> Some (Config.add_header "Content-Type" "application/x-www-form-urlencoded" c)
624
-
| None -> Some (Config.add_header "Content-Type" "application/x-www-form-urlencoded" Config.default)
626
-
post ~sw t ?config ~body uri
628
-
module Session = struct
632
-
domain : string option;
633
-
path : string option;
634
-
expires : float option; (* Unix timestamp *)
639
-
type 'a session = {
641
-
cookies : cookie list ref;
642
-
} constraint 'a = [> `Generic] Net.ty
644
-
type 'a t = 'a session constraint 'a = [> `Generic] Net.ty
646
-
let create ?tls_config ?default_headers ~clock net =
647
-
{ client = create ?tls_config ?default_headers ~clock net;
650
-
let parse_cookie_header cookie_str =
651
-
let parts = String.split_on_char ';' cookie_str |> List.map String.trim in
655
-
match String.split_on_char '=' kv with
657
-
let name = String.trim k in
658
-
let value = String.trim v in
659
-
let rec parse_attrs attrs cookie =
664
-
match String.lowercase_ascii attr with
665
-
| "secure" -> { cookie with secure = true }
666
-
| "httponly" -> { cookie with http_only = true }
667
-
| s when String.starts_with ~prefix:"domain=" s ->
668
-
let domain = String.sub s 7 (String.length s - 7) in
669
-
{ cookie with domain = Some domain }
670
-
| s when String.starts_with ~prefix:"path=" s ->
671
-
let path = String.sub s 5 (String.length s - 5) in
672
-
{ cookie with path = Some path }
673
-
| s when String.starts_with ~prefix:"expires=" s ->
674
-
(* Simple expiry parsing - could be improved *)
675
-
{ cookie with expires = Some (Unix.gettimeofday () +. 3600.0) }
678
-
parse_attrs rest cookie'
680
-
let base_cookie = {
682
-
domain = None; path = None; expires = None;
683
-
secure = false; http_only = false
685
-
Some (parse_attrs attrs base_cookie)
688
-
let update_cookies t headers =
689
-
let new_cookies = Cohttp.Header.get_multi headers "set-cookie"
690
-
|> List.filter_map parse_cookie_header
692
-
(* Replace existing cookies with same name *)
693
-
let updated = List.fold_left (fun acc new_cookie ->
694
-
let filtered = List.filter (fun c -> c.name <> new_cookie.name) acc in
695
-
new_cookie :: filtered
696
-
) !(t.cookies) new_cookies in
697
-
t.cookies := updated
699
-
let add_cookies config cookies =
700
-
if cookies = [] then config else
701
-
(* Filter out expired cookies *)
702
-
let now = Unix.gettimeofday () in
703
-
let valid_cookies = cookies |> List.filter (fun c ->
704
-
match c.expires with
705
-
| Some exp when exp < now -> false
708
-
if valid_cookies = [] then config else
709
-
let cookie_header =
711
-
|> List.map (fun c -> Printf.sprintf "%s=%s" c.name c.value)
712
-
|> String.concat "; "
714
-
Config.add_header "Cookie" cookie_header config
716
-
let request_with_cookies ~sw t ?config ~meth ?body uri =
719
-
| Some c -> add_cookies c !(t.cookies)
720
-
| None -> add_cookies Config.default !(t.cookies)
722
-
let response = request ~sw t.client ~config ?body ~meth uri in
723
-
update_cookies t response.Response.headers;
726
-
let get ~sw t ?config uri =
727
-
request_with_cookies ~sw t ?config ~meth:`GET uri
729
-
let post ~sw t ?config ?body uri =
730
-
request_with_cookies ~sw t ?config ~meth:`POST ?body uri
733
-
(* Return valid cookies as (name, value) pairs for compatibility *)
734
-
let now = Unix.gettimeofday () in
736
-
|> List.filter (fun c ->
737
-
match c.expires with
738
-
| Some exp when exp < now -> false
740
-
|> List.map (fun c -> (c.name, c.value))
742
-
let clear_cookies t = t.cookies := []
746
-
let stream_response ~sw t ?config uri f =
747
-
let client = make_client t.net t.tls_config in
748
-
let headers = match config with
749
-
| Some c -> c.Config.headers
750
-
| None -> Cohttp.Header.init ()
752
-
let merged_headers = merge_headers t.default_headers headers in
753
-
let merged_headers = match config with
754
-
| Some c -> Auth.apply c.Config.auth `GET uri merged_headers
755
-
| None -> merged_headers
757
-
let _resp, body = Cohttp_eio.Client.get ~sw client uri ~headers:merged_headers in
758
-
let buf_reader = Eio.Buf_read.of_flow ~max_size:(16 * 1024 * 1024) body in
761
-
(* Connection Pool Implementation *)
762
-
module ConnectionPool = struct
763
-
type connection_state =
768
-
type connection = {
769
-
client : Cohttp_eio.Client.t;
770
-
mutable state : connection_state;
771
-
mutable last_used : float;
772
-
mutable request_count : int;
781
-
tls_config : Tls.config option;
783
-
mutable connections : connection Queue.t;
784
-
mutable active_connections : int;
785
-
mutable total_connections_created : int;
786
-
mutable total_requests : int;
787
-
mutex : Eio.Mutex.t;
788
-
available : Eio.Condition.t;
795
-
timeout : float option;
796
-
max_requests_per_connection : int option;
797
-
connection_timeout : float;
1
+
(** OCaml HTTP client library with streaming support *)
800
-
let default_config = {
805
-
max_requests_per_connection = Some 100;
806
-
connection_timeout = 60.0;
3
+
(* Re-export all modules *)
4
+
module Method = Method
6
+
module Headers = Headers
8
+
module Timeout = Timeout
11
+
module Response = Response
12
+
module Client = Client
13
+
module Stream = Stream
809
-
let create ~sw ?(config=default_config) ?tls_config ~scheme ~host ~port net =
810
-
Log.debug (fun m -> m "Creating connection pool for %s://%s:%d" scheme host port);
819
-
connections = Queue.create ();
820
-
active_connections = 0;
821
-
total_connections_created = 0;
822
-
total_requests = 0;
823
-
mutex = Eio.Mutex.create ();
824
-
available = Eio.Condition.create ();
827
-
let create_new_connection net tls_config =
828
-
let client = make_client net (Option.value ~default:(Tls.default ()) tls_config) in
832
-
last_used = Unix.gettimeofday ();
836
-
let is_connection_valid conn config =
837
-
match conn.state with
840
-
let now = Unix.gettimeofday () in
841
-
let age = now -. conn.last_used in
842
-
age < config.connection_timeout &&
843
-
(match config.max_requests_per_connection with
845
-
| Some max -> conn.request_count < max)
847
-
let rec get_connection ~sw (Pool t as pool) =
848
-
Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
849
-
(* Try to get an existing connection *)
850
-
let rec find_valid_connection () =
851
-
if Queue.is_empty t.connections then
854
-
let conn = Queue.pop t.connections in
855
-
if is_connection_valid conn t.config then
858
-
conn.state <- Closed;
859
-
find_valid_connection ()
863
-
match find_valid_connection () with
865
-
conn.state <- Active;
866
-
conn.last_used <- Unix.gettimeofday ();
867
-
conn.request_count <- conn.request_count + 1;
868
-
t.active_connections <- t.active_connections + 1;
869
-
t.total_requests <- t.total_requests + 1;
872
-
if t.active_connections >= t.config.maxsize then
873
-
if t.config.block then (
874
-
(* Wait for a connection to become available *)
875
-
Eio.Condition.await t.available t.mutex;
876
-
get_connection ~sw pool
878
-
raise (Request_error Pool_exhausted)
880
-
(* Create a new connection *)
881
-
t.active_connections <- t.active_connections + 1;
882
-
t.total_connections_created <- t.total_connections_created + 1;
883
-
t.total_requests <- t.total_requests + 1;
884
-
let conn = create_new_connection t.net t.tls_config in
885
-
conn.state <- Active;
886
-
conn.request_count <- 1;
891
-
let put_connection (Pool t) _client =
892
-
(* Since we can't track connection metadata with just the client,
893
-
we'll simply decrement the active count *)
894
-
Eio.Mutex.use_rw ~protect:false t.mutex (fun () ->
895
-
t.active_connections <- t.active_connections - 1;
896
-
Eio.Condition.broadcast t.available
899
-
let num_connections (Pool t) =
900
-
Eio.Mutex.use_ro t.mutex (fun () ->
901
-
Queue.length t.connections + t.active_connections
904
-
let num_requests (Pool t) = t.total_requests
906
-
let clear (Pool t) =
907
-
Eio.Mutex.use_rw ~protect:false t.mutex (fun () ->
908
-
Queue.iter (fun conn -> conn.state <- Closed) t.connections;
909
-
Queue.clear t.connections;
910
-
t.active_connections <- 0;
911
-
Eio.Condition.broadcast t.available
915
-
(* Advanced Timeout *)
916
-
module Timeout = struct
918
-
connect : float option;
919
-
read : float option;
920
-
total : float option;
921
-
start_time : float option;
924
-
let default = { connect = None; read = None; total = None; start_time = None }
925
-
let create ?connect ?read ?total () = { connect; read; total; start_time = None }
926
-
let from_float f = { connect = Some f; read = Some f; total = None; start_time = None }
928
-
let start_connect t = { t with start_time = Some (Unix.gettimeofday ()) }
929
-
let get_connect_timeout t = t.connect
930
-
let get_read_timeout t = t.read
931
-
let clone t = { t with start_time = None }
934
-
(* Cache Implementation *)
935
-
module Cache = struct
936
-
type cache_control = {
939
-
max_age : int option;
940
-
s_maxage : int option;
941
-
must_revalidate : bool;
947
-
let parse_cache_control header =
949
-
no_cache = false; no_store = false; max_age = None;
950
-
s_maxage = None; must_revalidate = false;
951
-
public = false; private_ = false; immutable = false;
953
-
let directives = String.split_on_char ',' header |> List.map String.trim in
954
-
List.fold_left (fun acc directive ->
955
-
match String.split_on_char '=' directive with
956
-
| ["no-cache"] -> { acc with no_cache = true }
957
-
| ["no-store"] -> { acc with no_store = true }
958
-
| ["max-age"; v] -> { acc with max_age = try Some (int_of_string v) with _ -> None }
960
-
) default directives
962
-
module Memory = struct
965
-
mutable cache : (string, (float * Response.t)) Hashtbl.t;
966
-
mutable hits : int;
967
-
mutable misses : int;
970
-
let create ~max_size () = {
972
-
cache = Hashtbl.create max_size;
978
-
module File = struct
980
-
cache_dir : string;
982
-
mutable size : int64;
983
-
mutable hits : int;
984
-
mutable misses : int;
987
-
let default_cache_dir () =
988
-
(* Use XDG cache directory for storing HTTP cache *)
989
-
let xdg = Xdg.create ~env:Sys.getenv_opt () in
990
-
let cache_home = Xdg.cache_dir xdg in
991
-
let cache_dir = Filename.concat cache_home "ocaml-requests" in
992
-
(* Ensure cache directory exists *)
993
-
if not (Sys.file_exists cache_dir) then
994
-
Unix.mkdir cache_dir 0o755;
997
-
let create ?(cache_dir = default_cache_dir ()) ~max_size () = {
998
-
cache_dir; max_size; size = 0L; hits = 0; misses = 0;
1003
-
| `Memory of Memory.storage
1004
-
| `File of File.storage
1007
-
type 'a t = { storage : storage } constraint 'a = [> `Generic] Net.ty
1009
-
type stats = { hits : int; misses : int; size : int64; entries : int }
1011
-
let create storage = { storage }
1013
-
let is_cacheable ~method_ ~response =
1014
-
method_ = `GET && Response.is_success response
1016
-
let make_cache_key ~method_ ~url =
1017
-
Printf.sprintf "%s:%s"
1018
-
(match method_ with
1019
-
| `GET -> "GET" | `POST -> "POST" | `PUT -> "PUT"
1020
-
| `DELETE -> "DELETE" | `HEAD -> "HEAD"
1021
-
| `OPTIONS -> "OPTIONS" | `PATCH -> "PATCH")
1022
-
(Uri.to_string url)
1024
-
let get t ~method_ ~url ~headers:_ =
1025
-
let key = make_cache_key ~method_ ~url in
1026
-
match t.storage with
1027
-
| `Memory storage ->
1028
-
(match Hashtbl.find_opt storage.cache key with
1029
-
| Some (expiry, response) when expiry > Unix.gettimeofday () ->
1030
-
storage.hits <- storage.hits + 1;
1033
-
storage.misses <- storage.misses + 1;
1037
-
let put t ~method_ ~url ~response =
1038
-
if is_cacheable ~method_ ~response then
1039
-
let key = make_cache_key ~method_ ~url in
1040
-
let expiry = Unix.gettimeofday () +. 3600.0 in
1041
-
match t.storage with
1042
-
| `Memory storage ->
1043
-
if Hashtbl.length storage.cache < storage.max_size then
1044
-
Hashtbl.replace storage.cache key (expiry, response)
1048
-
match t.storage with
1049
-
| `Memory storage ->
1050
-
Hashtbl.clear storage.cache;
1051
-
storage.hits <- 0;
1052
-
storage.misses <- 0
1056
-
match t.storage with
1057
-
| `Memory storage ->
1058
-
{ hits = storage.Memory.hits;
1059
-
misses = storage.Memory.misses;
1060
-
size = Int64.of_int (Hashtbl.length storage.Memory.cache * 1024);
1061
-
entries = Hashtbl.length storage.Memory.cache }
1062
-
| `File storage ->
1063
-
{ hits = storage.File.hits;
1064
-
misses = storage.File.misses;
1065
-
size = storage.File.size;
1069
-
(* Pool Manager *)
1070
-
module PoolManager = struct
1071
-
type 'a pool_manager = {
1075
-
pools : (string, ConnectionPool.t) Hashtbl.t;
1077
-
headers : Cohttp.Header.t;
1078
-
retries : Retry.t;
1079
-
timeout : Timeout.t;
1080
-
pool_config : ConnectionPool.config;
1081
-
tls_config : Tls.config option;
1082
-
cache : 'a Cache.t option;
1083
-
} constraint 'a = [> `Generic] Net.ty
1085
-
type 'a t = 'a pool_manager constraint 'a = [> `Generic] Net.ty
1087
-
let create ~sw ~clock ?(num_pools=10) ?(headers=Cohttp.Header.init ())
1088
-
?(retries=Retry.default) ?(timeout=Timeout.default)
1089
-
?(pool_config=ConnectionPool.default_config) ?tls_config ?cache net =
1090
-
let cache = Option.map Cache.create cache in
1091
-
{ sw; net; clock = Clock clock; pools = Hashtbl.create num_pools; num_pools;
1092
-
headers; retries; timeout; pool_config; tls_config; cache }
1094
-
let get_pool t ~scheme ~host ~port =
1095
-
let key = Printf.sprintf "%s://%s:%d" scheme host port in
1096
-
match Hashtbl.find_opt t.pools key with
1097
-
| Some pool -> pool
1099
-
let pool = ConnectionPool.create ~sw:t.sw ~config:t.pool_config
1100
-
?tls_config:t.tls_config ~scheme ~host ~port t.net in
1101
-
Hashtbl.add t.pools key pool;
1104
-
let urlopen ~sw t ~method_ ~url ?body ?headers ?(retries=t.retries)
1105
-
?(timeout=t.timeout) ?(redirect=true) ?(assert_same_host=false)
1106
-
?(preload_content=true) ?(decode_content=true) ?chunk_size () =
1108
-
(* Validate same host if required *)
1109
-
if assert_same_host && redirect then
1110
-
Log.warn (fun m -> m "assert_same_host is set with redirects enabled");
1112
-
(* Check cache first for GET requests *)
1113
-
let cached_response =
1114
-
match t.cache, method_ with
1115
-
| Some cache, `GET ->
1116
-
Cache.get cache ~method_ ~url
1117
-
~headers:(Option.value headers ~default:t.headers)
1121
-
match cached_response with
1122
-
| Some response -> response
1124
-
let scheme = Uri.scheme url |> Option.value ~default:"http" in
1125
-
let host = Uri.host url |> Option.value ~default:"localhost" in
1126
-
let port = Uri.port url |> Option.value ~default:
1127
-
(if scheme = "https" then 443 else 80) in
1129
-
let pool = get_pool t ~scheme ~host ~port in
1130
-
let conn = ConnectionPool.get_connection ~sw pool in
1133
-
match headers with
1134
-
| Some h -> Cohttp.Header.fold (fun k v acc ->
1135
-
Cohttp.Header.add acc k v) h t.headers
1136
-
| None -> t.headers
1139
-
(* Add chunked transfer encoding if chunk_size is specified *)
1140
-
let headers = match chunk_size with
1141
-
| Some _ -> Cohttp.Header.add headers "Transfer-Encoding" "chunked"
1145
-
let create_client = fun ?tls_config ?default_headers ~clock net ->
1146
-
{ net; clock; tls_config = Option.value ~default:(Tls.default ()) tls_config;
1147
-
default_headers = Option.value ~default:(Cohttp.Header.init ()) default_headers;
1150
-
let req_client = create_client ?tls_config:t.tls_config ~default_headers:t.headers ~clock:t.clock t.net in
1151
-
let config = Config.create ~headers ~follow_redirects:redirect () in
1153
-
(* Wrap request with timeout if specified *)
1154
-
let make_request () =
1155
-
match timeout.Timeout.total with
1156
-
| Some _timeout_sec ->
1157
-
(* We need access to a clock - assuming t has one or using Eio.Stdenv.clock *)
1158
-
(* For now, just make the request without timeout wrapper since we don't have clock access *)
1159
-
Result.Ok (request ~sw req_client ~config ?body ~meth:method_ url)
1161
-
Result.Ok (request ~sw req_client ~config ?body ~meth:method_ url)
1164
-
(* Execute with retries *)
1165
-
let rec execute_with_retries attempt =
1166
-
match make_request () with
1167
-
| Result.Ok response ->
1168
-
(* Process response based on flags *)
1170
-
if decode_content then
1171
-
(* Check for content encoding and decode if needed *)
1172
-
match Cohttp.Header.get response.Response.headers "content-encoding" with
1173
-
| Some "gzip" | Some "deflate" ->
1174
-
Log.info (fun m -> m "Content encoding detected but not yet implemented");
1181
-
if preload_content then
1182
-
(* Content is already loaded in response.body *)
1185
-
(* For streaming, we'd need to return a different type *)
1189
-
(match t.cache, method_ with
1190
-
| Some cache, `GET -> Cache.put cache ~method_ ~url ~response
1193
-
ConnectionPool.put_connection pool conn;
1196
-
| Result.Error `Timeout ->
1197
-
if attempt < retries.Retry.total then (
1198
-
Log.info (fun m -> m "Request timeout, retry %d/%d" (attempt + 1) retries.Retry.total);
1199
-
(match t.clock with Clock c -> Eio.Time.sleep c 1.0);
1200
-
execute_with_retries (attempt + 1)
1202
-
ConnectionPool.put_connection pool conn;
1203
-
raise (Request_error Timeout_error)
1207
-
execute_with_retries 0
1209
-
let request ~sw t ~method_ ~url ?body ?headers () =
1210
-
urlopen ~sw t ~method_ ~url ?body ?headers ()
1213
-
Hashtbl.iter (fun _ pool -> ConnectionPool.clear pool) t.pools;
1214
-
Hashtbl.clear t.pools
1216
-
let connection_pool_stats t =
1217
-
Hashtbl.fold (fun key pool acc -> (key, pool) :: acc) t.pools []
1221
-
module FilePost = struct
1223
-
| Text of { name : string; data : string }
1226
-
filename : string option;
1227
-
data : Flow.source_ty Eio.Resource.t;
1228
-
content_type : string option;
1231
-
let choose_boundary () =
1232
-
(* Use Mirage crypto RNG for boundary generation *)
1233
-
let rand_bytes = Mirage_crypto_rng.generate 8 in
1234
-
let rand_cstruct = Cstruct.of_string rand_bytes in
1235
-
let rand_hex = Cstruct.to_hex_string rand_cstruct in
1236
-
Printf.sprintf "----OCamlBoundary%s" rand_hex
1238
-
let encode_multipart_formdata ~fields ~boundary =
1239
-
let boundary = Option.value boundary ~default:(choose_boundary ()) in
1240
-
let content_type = Printf.sprintf "multipart/form-data; boundary=%s" boundary in
1242
-
let buf = Buffer.create 1024 in
1244
-
List.iter (fun field ->
1245
-
Buffer.add_string buf (Printf.sprintf "--%s\r\n" boundary);
1247
-
| Text { name; data } ->
1248
-
Buffer.add_string buf (Printf.sprintf "Content-Disposition: form-data; name=\"%s\"\r\n\r\n" name);
1249
-
Buffer.add_string buf data;
1250
-
Buffer.add_string buf "\r\n"
1251
-
| File { name; filename; content_type; data } ->
1252
-
let filename_str = Option.value ~default:"file" filename in
1253
-
let content_type_str = Option.value ~default:"application/octet-stream" content_type in
1254
-
Buffer.add_string buf (Printf.sprintf "Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"\r\n" name filename_str);
1255
-
Buffer.add_string buf (Printf.sprintf "Content-Type: %s\r\n\r\n" content_type_str);
1256
-
(* For now, just read the file data as a string *)
1257
-
let file_content = Eio.Flow.read_all data in
1258
-
Buffer.add_string buf file_content;
1259
-
Buffer.add_string buf "\r\n"
1262
-
Buffer.add_string buf (Printf.sprintf "--%s--\r\n" boundary);
1264
-
let body_content = Buffer.contents buf in
1265
-
(content_type, Flow.string_source body_content)
1268
-
(* Progress tracking *)
1269
-
module Progress = struct
1271
-
mutable total : int64 option;
1272
-
desc : string option;
1275
-
mutable current : int64;
1278
-
let create ?total ?desc ?(unit="B") ?(width=40) () =
1279
-
{ total; desc; unit_=unit; width; current = 0L }
1281
-
let update t amount =
1282
-
t.current <- Int64.add t.current amount;
1283
-
Log.info (fun m -> m "Progress: %Ld %s" t.current t.unit_)
1286
-
Log.info (fun m -> m "Progress complete: %Ld %s" t.current t.unit_)
1288
-
let track_source ~sw:_ _t source =
1289
-
(* Wrap source to track upload progress *)
1290
-
Log.info (fun m -> m "Progress tracking for uploads enabled");
1291
-
(* Since we can't easily wrap a Flow source, we'll just pass it through *)
1292
-
(* In a real implementation, we'd need to create a custom Flow wrapper *)
1295
-
let track_response t response f =
1296
-
(* Track download progress while processing response *)
1297
-
let body = Response.body response in
1298
-
let total_size = String.length body |> Int64.of_int in
1300
-
(* Update progress to show total if not set *)
1301
-
(match t.total with
1302
-
| None -> t.total <- Some total_size
1305
-
(* Process body in chunks and track progress *)
1306
-
let chunk_size = 8192 in
1307
-
let processed = ref 0L in
1309
-
let process_chunks () =
1310
-
let rec iter pos acc =
1311
-
if pos >= String.length body then (
1313
-
String.concat "" (List.rev acc)
1315
-
let len = min chunk_size (String.length body - pos) in
1316
-
let chunk = String.sub body pos len in
1317
-
processed := Int64.add !processed (Int64.of_int len);
1318
-
update t (Int64.of_int len);
1320
-
(* Show percentage if total is known *)
1321
-
(match t.total with
1322
-
| Some total when total > 0L ->
1323
-
let pct = Int64.to_float !processed *. 100.0 /. Int64.to_float total in
1324
-
Log.info (fun m -> m "Progress: %.1f%% (%Ld/%Ld %s)"
1325
-
pct !processed total t.unit_)
1328
-
iter (pos + len) (chunk :: acc)
1334
-
let tracked_body = process_chunks () in
1335
-
f ~chunk:tracked_body
1339
-
module Util = struct
1340
-
let make_headers ?keep_alive ?accept_encoding ?user_agent
1341
-
?basic_auth ?proxy_basic_auth ?disable_cache () =
1342
-
let h = Cohttp.Header.init () in
1343
-
let h = match user_agent with
1344
-
| Some ua -> Cohttp.Header.add h "User-Agent" ua
1345
-
| None -> Cohttp.Header.add h "User-Agent" "OCaml-Requests/1.0"
1347
-
let h = match accept_encoding with
1348
-
| Some enc -> Cohttp.Header.add h "Accept-Encoding" (String.concat ", " enc)
1351
-
let h = match basic_auth with
1352
-
| Some (user, pass) ->
1353
-
let encoded = Base64.encode_string (Printf.sprintf "%s:%s" user pass) in
1354
-
Cohttp.Header.add h "Authorization" (Printf.sprintf "Basic %s" encoded)
1357
-
let h = match proxy_basic_auth with
1358
-
| Some (user, pass) ->
1359
-
let encoded = Base64.encode_string (Printf.sprintf "%s:%s" user pass) in
1360
-
Cohttp.Header.add h "Proxy-Authorization" (Printf.sprintf "Basic %s" encoded)
1363
-
let h = match keep_alive with
1364
-
| Some true -> Cohttp.Header.add h "Connection" "keep-alive"
1365
-
| Some false -> Cohttp.Header.add h "Connection" "close"
1368
-
let h = match disable_cache with
1371
-
|> (fun h -> Cohttp.Header.add h "Cache-Control" "no-cache, no-store, must-revalidate")
1372
-
|> (fun h -> Cohttp.Header.add h "Pragma" "no-cache")
1373
-
|> (fun h -> Cohttp.Header.add h "Expires" "0")
1378
-
let parse_url url = Uri.of_string url
1380
-
let getproxies_environment () =
1381
-
let vars = ["http_proxy"; "https_proxy"; "ftp_proxy"; "no_proxy"] in
1382
-
List.filter_map (fun var ->
1383
-
match Sys.getenv_opt var with
1384
-
| Some value -> Some (var, value)
1388
-
let proxy_bypass_environment host =
1389
-
match Sys.getenv_opt "no_proxy" with
1390
-
| Some no_proxy ->
1391
-
let hosts = String.split_on_char ',' no_proxy |> List.map String.trim in
1392
-
List.exists (fun h -> h = host || String.ends_with ~suffix:h host) hosts
1395
-
let urlencode ?(safe="") params =
1396
-
(* Custom encoder that respects the safe characters *)
1397
-
let encode_with_safe str =
1399
-
Uri.pct_encode str
1401
-
(* Encode character by character, skipping safe ones *)
1403
-
|> Seq.map (fun c ->
1404
-
let s = String.make 1 c in
1405
-
if String.contains safe c then s
1406
-
else Uri.pct_encode s)
1408
-
|> String.concat ""
1411
-
|> List.map (fun (k, v) ->
1412
-
Printf.sprintf "%s=%s" (encode_with_safe k) (encode_with_safe v))
1413
-
|> String.concat "&"
1415
-
let current_time () = Unix.gettimeofday ()
1416
-
let parse_retry_after header = try Some (float_of_string header) with _ -> None
1419
-
(* Streaming support *)
1420
-
module Stream = struct
1421
-
let upload ~sw t ?config ?(chunk_size=8192) ~meth uri ~body =
1422
-
Log.debug (fun m -> m "Streaming upload to %s with chunk size %d" (Uri.to_string uri) chunk_size);
1423
-
let config = Option.value config ~default:Config.default in
1424
-
let headers = Config.(config.headers) in
1425
-
let headers = Cohttp.Header.add headers "Transfer-Encoding" "chunked" in
1426
-
let config = { config with Config.headers } in
1428
-
(* For now, just read the entire body and send it *)
1429
-
(* A proper implementation would need to create a Flow wrapper *)
1430
-
let body_content = Flow.read_all body in
1432
-
(* Use the regular request with the body *)
1433
-
request ~sw t ~config ~body:body_content ~meth uri
1435
-
let download ~sw t ?config ?(chunk_size=8192) uri ~sink =
1436
-
Log.debug (fun m -> m "Streaming download from %s with chunk size %d" (Uri.to_string uri) chunk_size);
1437
-
(* Create client and perform streaming download directly *)
1438
-
let client = make_client t.net t.tls_config in
1439
-
let headers = match config with
1440
-
| Some c -> c.Config.headers
1441
-
| None -> Cohttp.Header.init ()
1443
-
let merged_headers = merge_headers t.default_headers headers in
1444
-
let merged_headers = match config with
1445
-
| Some c -> Auth.apply c.Config.auth `GET uri merged_headers
1446
-
| None -> merged_headers
1449
-
let _resp, body = Cohttp_eio.Client.get ~sw client uri ~headers:merged_headers in
1450
-
let buf_reader = Eio.Buf_read.of_flow ~max_size:(16 * 1024 * 1024) body in
1452
-
(* Stream data in chunks to the sink *)
1453
-
let rec stream_chunks () =
1455
-
let chunk = Eio.Buf_read.take chunk_size buf_reader in
1456
-
if String.length chunk > 0 then (
1457
-
Flow.copy_string chunk sink;
1461
-
| End_of_file -> ()
1462
-
| Eio.Buf_read.Buffer_limit_exceeded ->
1463
-
(* Read in smaller chunks when buffer limit exceeded *)
1464
-
let smaller_chunk_size = min chunk_size 1024 in
1465
-
let rec read_smaller () =
1467
-
let chunk = Eio.Buf_read.take smaller_chunk_size buf_reader in
1468
-
if String.length chunk > 0 then (
1469
-
Flow.copy_string chunk sink;
1472
-
with End_of_file -> ()
1479
-
let iter_response ?(chunk_size=8192) response ~f =
1480
-
(* Use the body_stream if available, otherwise process the body string *)
1481
-
match response.Response.body_stream with
1482
-
| Some buf_reader ->
1483
-
let rec iter_stream () =
1485
-
let chunk = Eio.Buf_read.take chunk_size buf_reader in
1486
-
if String.length chunk > 0 then (
1491
-
| End_of_file -> ()
1492
-
| Eio.Buf_read.Buffer_limit_exceeded ->
1493
-
let smaller_chunk_size = min chunk_size 1024 in
1494
-
let rec read_smaller () =
1496
-
let chunk = Eio.Buf_read.take smaller_chunk_size buf_reader in
1497
-
if String.length chunk > 0 then (
1501
-
with End_of_file -> ()
1507
-
(* Fallback to processing the body string in chunks *)
1508
-
let body = Response.body response in
1509
-
let rec iter pos =
1510
-
if pos < String.length body then
1511
-
let len = min chunk_size (String.length body - pos) in
1512
-
let chunk = String.sub body pos len in
1518
-
let lines ?(chunk_size=8192) ?(keep_ends=false) response =
1519
-
let body = Response.body response in
1520
-
(* Process body in chunks to find lines efficiently *)
1521
-
let acc = ref [] in
1522
-
let buffer = Buffer.create 256 in
1523
-
let rec extract_lines pos =
1524
-
if pos >= String.length body then (
1525
-
(* Add any remaining buffer content *)
1526
-
if Buffer.length buffer > 0 then
1527
-
acc := Buffer.contents buffer :: !acc
1529
-
let len = min chunk_size (String.length body - pos) in
1530
-
let chunk = String.sub body pos len in
1531
-
(* Process chunk character by character to find line breaks *)
1532
-
String.iter (fun c ->
1533
-
if c = '\n' then (
1534
-
let line = Buffer.contents buffer in
1535
-
Buffer.clear buffer;
1537
-
acc := (line ^ "\n") :: !acc
1539
-
acc := line :: !acc
1541
-
Buffer.add_char buffer c
1543
-
extract_lines (pos + len)
1547
-
List.rev !acc |> List.to_seq
1549
-
let json_stream ?(chunk_size=8192) response =
1550
-
lines ~chunk_size response
1551
-
|> Seq.filter (fun line -> String.trim line <> "")
1552
-
|> Seq.map (fun line ->
1553
-
try Yojson.Safe.from_string line
1555
-
Log.warn (fun m -> m "Failed to parse JSON line: %s" line);
1559
-
(* Download utility functions using Stream module *)
1560
-
let download_file ~sw t ?config uri ~path =
1561
-
Log.debug (fun m -> m "Downloading file from %s to %s" (Uri.to_string uri) (Eio.Path.native_exn path));
1562
-
(* Use streaming download to avoid loading entire file into memory *)
1563
-
Eio.Path.with_open_out ~create:(`Or_truncate 0o644) path (fun file ->
1564
-
let sink = (file :> Eio.Flow.sink_ty Eio.Resource.t) in
1565
-
Stream.download ~sw t ?config uri ~sink
1568
-
(* Add function for range request support with caching *)
1569
-
let download_file_range ~sw t ?config uri ~path ~start_byte ~end_byte =
1570
-
Log.debug (fun m -> m "Downloading file range %Ld-%Ld from %s to %s"
1571
-
start_byte end_byte (Uri.to_string uri) (Eio.Path.native_exn path));
1573
-
(* Check if we have this range in cache *)
1574
-
let try_cache () =
1575
-
match t.cache with
1577
-
let range = Requests_cache.Range.{ start = start_byte; end_ = Some end_byte } in
1578
-
Requests_cache.download_range cache ~sw ~url:uri ~range
1579
-
~on_chunk:(fun _data -> ())
1583
-
match try_cache () with
1585
-
Log.debug (fun m -> m "Using cached data for range %Ld-%Ld" start_byte end_byte)
1586
-
| Some false | None ->
1587
-
(* Fallback to regular range request *)
1588
-
let range_header = Printf.sprintf "bytes=%Ld-%Ld" start_byte end_byte in
1589
-
let config = match config with
1590
-
| Some c -> Some (Config.add_header "Range" range_header c)
1591
-
| None -> Some (Config.add_header "Range" range_header Config.default)
1594
-
(* Download and cache the chunk *)
1595
-
let response = get ~sw t ?config uri in
1597
-
(* Store the chunk in cache if we have one *)
1598
-
(match t.cache with
1600
-
let range = Requests_cache.Range.{ start = start_byte; end_ = Some end_byte } in
1601
-
Requests_cache.put_chunk cache ~url:uri ~range ~data:response.Response.body
1604
-
(* Write to file *)
1605
-
Eio.Path.with_open_out ~create:(`Or_truncate 0o644) path (fun file ->
1606
-
Flow.copy_string response.Response.body (file :> Eio.Flow.sink_ty Eio.Resource.t)
1609
-
(* Add function for resumable downloads *)
1610
-
let download_file_resume ~sw t ?config uri ~path =
1611
-
Log.debug (fun m -> m "Attempting resumable download from %s to %s" (Uri.to_string uri) (Eio.Path.native_exn path));
1614
-
let stat = Eio.Path.stat ~follow:false path in
1615
-
Optint.Int63.to_int64 stat.size
1620
-
if start_byte > 0L then (
1621
-
Log.info (fun m -> m "Resuming download from byte %Ld" start_byte);
1622
-
let range_header = Printf.sprintf "bytes=%Ld-" start_byte in
1623
-
let config = match config with
1624
-
| Some c -> Some (Config.add_header "Range" range_header c)
1625
-
| None -> Some (Config.add_header "Range" range_header Config.default)
1628
-
Eio.Path.with_open_out ~append:true ~create:(`Or_truncate 0o644) path (fun file ->
1629
-
let sink = (file :> Eio.Flow.sink_ty Eio.Resource.t) in
1630
-
Stream.download ~sw t ?config uri ~sink
1633
-
download_file ~sw t ?config uri ~path
1636
-
(* Global defaults *)
1637
-
module Defaults = struct
1638
-
let user_agent = ref "OCaml-Requests/1.0"
1639
-
let socket_timeout = ref None
1640
-
let retry = ref Retry.default
1641
-
let pool_maxsize = ref 10
1644
-
(* Additional exceptions *)
1645
-
(* Unused exceptions - kept for potential future use
1646
-
exception MaxRetryError of { url : Uri.t; reason : string }
1647
-
exception PoolError of string
1648
-
exception Pool_exhausted *)
15
+
(* Re-export exceptions from Stream module *)
16
+
exception TimeoutError = Stream.Timeout
17
+
exception TooManyRedirects = Stream.TooManyRedirects
18
+
exception ConnectionError = Stream.ConnectionError
19
+
exception HTTPError = Stream.HTTPError