Skip to content

Commit

Permalink
Merge pull request #4370 from wasmerio/wasix-http-client-chunk-timeout
Browse files Browse the repository at this point in the history
feat(wasix): Add optional chunk timeout to ReqwestHttpClient
  • Loading branch information
syrusakbary authored Dec 22, 2023
2 parents e6622f1 + 08cb9a7 commit 9964f49
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
4 changes: 2 additions & 2 deletions lib/wasix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ web-sys = { version = "0.3.64", features = ["Request", "RequestInit", "Window",
[target.'cfg(not(target_arch = "riscv64"))'.dependencies.reqwest]
version = "0.11"
default-features = false
features = ["rustls-tls", "json"]
features = ["rustls-tls", "json", "stream"]
optional = true

[target.'cfg(target_arch = "riscv64")'.dependencies.reqwest]
version = "0.11"
default-features = false
features = ["native-tls", "json"]
features = ["native-tls", "json", "stream"]
optional = true

[target.'cfg(unix)'.dependencies]
Expand Down
20 changes: 18 additions & 2 deletions lib/wasix/src/http/reqwest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use anyhow::Context;
use futures::future::BoxFuture;
use futures::{future::BoxFuture, TryStreamExt};
use std::convert::TryFrom;
use tokio::runtime::Handle;

Expand All @@ -11,13 +11,15 @@ use super::{HttpRequest, HttpResponse};
pub struct ReqwestHttpClient {
handle: Handle,
connect_timeout: Duration,
response_body_chunk_timeout: Option<std::time::Duration>,
}

impl Default for ReqwestHttpClient {
fn default() -> Self {
Self {
handle: Handle::current(),
connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT,
response_body_chunk_timeout: None,
}
}
}
Expand All @@ -30,6 +32,11 @@ impl ReqwestHttpClient {
self
}

pub fn with_response_body_chunk_timeout(mut self, timeout: std::time::Duration) -> Self {
self.response_body_chunk_timeout = Some(timeout);
self
}

async fn request(&self, request: HttpRequest) -> Result<HttpResponse, anyhow::Error> {
let method = reqwest::Method::try_from(request.method.as_str())
.with_context(|| format!("Invalid http method {}", request.method))?;
Expand Down Expand Up @@ -60,7 +67,16 @@ impl ReqwestHttpClient {
let headers = std::mem::take(response.headers_mut());

let status = response.status();
let data = response.bytes().await?.to_vec();
let data = if let Some(timeout) = self.response_body_chunk_timeout {
let mut stream = response.bytes_stream();
let mut buf = Vec::new();
while let Some(chunk) = tokio::time::timeout(timeout, stream.try_next()).await?? {
buf.extend_from_slice(&chunk);
}
buf
} else {
response.bytes().await?.to_vec()
};

Ok(HttpResponse {
status,
Expand Down

0 comments on commit 9964f49

Please sign in to comment.