|
| 1 | +use crate::get_content_length; |
1 | 2 | use crate::p3::bindings::http::types::ErrorCode; |
2 | | -use crate::p3::body::Body; |
| 3 | +use crate::p3::body::{Body, GuestBody}; |
| 4 | +use crate::p3::{WasiHttpCtxView, WasiHttpView}; |
3 | 5 | use bytes::Bytes; |
4 | 6 | use core::time::Duration; |
| 7 | +use http::header::HOST; |
5 | 8 | use http::uri::{Authority, PathAndQuery, Scheme}; |
6 | | -use http::{HeaderMap, Method}; |
| 9 | +use http::{HeaderMap, HeaderValue, Method, Uri}; |
7 | 10 | use http_body_util::BodyExt as _; |
8 | 11 | use http_body_util::combinators::UnsyncBoxBody; |
9 | 12 | use std::sync::Arc; |
10 | 13 | use tokio::sync::oneshot; |
| 14 | +use tracing::debug; |
| 15 | +use wasmtime::AsContextMut; |
11 | 16 |
|
12 | 17 | /// The concrete type behind a `wasi:http/types.request-options` resource. |
13 | 18 | #[derive(Copy, Clone, Debug, Default)] |
@@ -119,6 +124,114 @@ impl Request { |
119 | 124 | body.map_err(Into::into).boxed_unsync(), |
120 | 125 | ) |
121 | 126 | } |
| 127 | + |
| 128 | + /// Convert this [`Request`] into an [`http::Request<BoxBody<Bytes, ErrorCode>>`]. |
| 129 | + /// |
| 130 | + /// The specified future `fut` can be used to communicate a request processing |
| 131 | + /// error, if any, back to the caller (e.g., if this request was constructed |
| 132 | + /// through `wasi:http/types.request#new`). |
| 133 | + pub fn into_http<T: WasiHttpView + 'static>( |
| 134 | + self, |
| 135 | + store: impl AsContextMut<Data = T>, |
| 136 | + fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static, |
| 137 | + ) -> wasmtime::Result<http::Request<UnsyncBoxBody<Bytes, ErrorCode>>> { |
| 138 | + self.into_http_with_getter(store, fut, T::http) |
| 139 | + } |
| 140 | + |
| 141 | + /// Like [`Self::into_http`], but uses a custom getter for obtaining the [`WasiHttpCtxView`]. |
| 142 | + pub fn into_http_with_getter<T: 'static>( |
| 143 | + self, |
| 144 | + mut store: impl AsContextMut<Data = T>, |
| 145 | + fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static, |
| 146 | + getter: fn(&mut T) -> WasiHttpCtxView<'_>, |
| 147 | + ) -> wasmtime::Result<http::Request<UnsyncBoxBody<Bytes, ErrorCode>>> { |
| 148 | + let Request { |
| 149 | + method, |
| 150 | + scheme, |
| 151 | + authority, |
| 152 | + path_with_query, |
| 153 | + headers, |
| 154 | + options: _, |
| 155 | + body, |
| 156 | + } = self; |
| 157 | + let content_length = match get_content_length(&headers) { |
| 158 | + Ok(content_length) => content_length, |
| 159 | + Err(err) => { |
| 160 | + body.drop(&mut store); |
| 161 | + return Err(ErrorCode::InternalError(Some(format!("{err:#}"))).into()); |
| 162 | + } |
| 163 | + }; |
| 164 | + // This match must appear before any potential errors handled with '?' |
| 165 | + // (or errors have to explicitly be addressed and drop the body, as above), |
| 166 | + // as otherwise the Body::Guest resources will not be cleaned up when dropped. |
| 167 | + // see: https://github.com/bytecodealliance/wasmtime/pull/11440#discussion_r2326139381 |
| 168 | + // for additional context. |
| 169 | + let body = match body { |
| 170 | + Body::Guest { |
| 171 | + contents_rx, |
| 172 | + trailers_rx, |
| 173 | + result_tx, |
| 174 | + } => GuestBody::new( |
| 175 | + &mut store, |
| 176 | + contents_rx, |
| 177 | + trailers_rx, |
| 178 | + result_tx, |
| 179 | + fut, |
| 180 | + content_length, |
| 181 | + ErrorCode::HttpRequestBodySize, |
| 182 | + getter, |
| 183 | + ) |
| 184 | + .boxed_unsync(), |
| 185 | + Body::Host { body, result_tx } => { |
| 186 | + _ = result_tx.send(Box::new(fut)); |
| 187 | + body |
| 188 | + } |
| 189 | + }; |
| 190 | + let mut headers = Arc::unwrap_or_clone(headers); |
| 191 | + let mut store_ctx = store.as_context_mut(); |
| 192 | + let WasiHttpCtxView { ctx, table: _ } = getter(store_ctx.data_mut()); |
| 193 | + if ctx.set_host_header() { |
| 194 | + let host = if let Some(authority) = authority.as_ref() { |
| 195 | + HeaderValue::try_from(authority.as_str()) |
| 196 | + .map_err(|err| ErrorCode::InternalError(Some(err.to_string())))? |
| 197 | + } else { |
| 198 | + HeaderValue::from_static("") |
| 199 | + }; |
| 200 | + headers.insert(HOST, host); |
| 201 | + } |
| 202 | + let scheme = match scheme { |
| 203 | + None => ctx.default_scheme().ok_or(ErrorCode::HttpProtocolError)?, |
| 204 | + Some(scheme) if ctx.is_supported_scheme(&scheme) => scheme, |
| 205 | + Some(..) => return Err(ErrorCode::HttpProtocolError.into()), |
| 206 | + }; |
| 207 | + let mut uri = Uri::builder().scheme(scheme); |
| 208 | + if let Some(authority) = authority { |
| 209 | + uri = uri.authority(authority) |
| 210 | + }; |
| 211 | + if let Some(path_with_query) = path_with_query { |
| 212 | + uri = uri.path_and_query(path_with_query) |
| 213 | + }; |
| 214 | + let uri = uri.build().map_err(|err| { |
| 215 | + debug!(?err, "failed to build request URI"); |
| 216 | + ErrorCode::HttpRequestUriInvalid |
| 217 | + })?; |
| 218 | + let mut req = http::Request::builder(); |
| 219 | + if let Some(headers_mut) = req.headers_mut() { |
| 220 | + *headers_mut = headers; |
| 221 | + } else { |
| 222 | + return Err(ErrorCode::InternalError(Some( |
| 223 | + "failed to get mutable headers from request builder".to_string(), |
| 224 | + )) |
| 225 | + .into()); |
| 226 | + } |
| 227 | + let req = req |
| 228 | + .method(method) |
| 229 | + .uri(uri) |
| 230 | + .body(body) |
| 231 | + .map_err(|err| ErrorCode::InternalError(Some(err.to_string())))?; |
| 232 | + let (req, body) = req.into_parts(); |
| 233 | + Ok(http::Request::from_parts(req, body)) |
| 234 | + } |
122 | 235 | } |
123 | 236 |
|
124 | 237 | /// The default implementation of how an outgoing request is sent. |
@@ -348,3 +461,123 @@ pub async fn default_send_request( |
348 | 461 | conn.await.map_err(ErrorCode::from_hyper_response_error) |
349 | 462 | })) |
350 | 463 | } |
| 464 | + |
| 465 | +#[cfg(test)] |
| 466 | +mod tests { |
| 467 | + use super::*; |
| 468 | + use crate::p3::WasiHttpCtx; |
| 469 | + use anyhow::Result; |
| 470 | + use http_body_util::{BodyExt, Empty, Full}; |
| 471 | + use std::future::Future; |
| 472 | + use std::str::FromStr; |
| 473 | + use std::task::{Context, Waker}; |
| 474 | + use wasmtime::{Engine, Store}; |
| 475 | + use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView}; |
| 476 | + |
| 477 | + struct TestHttpCtx; |
| 478 | + struct TestCtx { |
| 479 | + table: ResourceTable, |
| 480 | + wasi: WasiCtx, |
| 481 | + http: TestHttpCtx, |
| 482 | + } |
| 483 | + |
| 484 | + impl TestCtx { |
| 485 | + fn new() -> Self { |
| 486 | + Self { |
| 487 | + table: ResourceTable::default(), |
| 488 | + wasi: WasiCtxBuilder::new().build(), |
| 489 | + http: TestHttpCtx, |
| 490 | + } |
| 491 | + } |
| 492 | + } |
| 493 | + |
| 494 | + impl WasiView for TestCtx { |
| 495 | + fn ctx(&mut self) -> WasiCtxView<'_> { |
| 496 | + WasiCtxView { |
| 497 | + ctx: &mut self.wasi, |
| 498 | + table: &mut self.table, |
| 499 | + } |
| 500 | + } |
| 501 | + } |
| 502 | + |
| 503 | + impl WasiHttpCtx for TestHttpCtx {} |
| 504 | + |
| 505 | + impl WasiHttpView for TestCtx { |
| 506 | + fn http(&mut self) -> WasiHttpCtxView<'_> { |
| 507 | + WasiHttpCtxView { |
| 508 | + ctx: &mut self.http, |
| 509 | + table: &mut self.table, |
| 510 | + } |
| 511 | + } |
| 512 | + } |
| 513 | + |
| 514 | + #[tokio::test] |
| 515 | + async fn test_request_into_http_schemes() -> Result<()> { |
| 516 | + let schemes = vec![Some(Scheme::HTTP), Some(Scheme::HTTPS), None]; |
| 517 | + let engine = Engine::default(); |
| 518 | + |
| 519 | + for scheme in schemes { |
| 520 | + let (req, fut) = Request::new( |
| 521 | + Method::POST, |
| 522 | + scheme.clone(), |
| 523 | + Some(Authority::from_static("example.com")), |
| 524 | + Some(PathAndQuery::from_static("/path?query=1")), |
| 525 | + HeaderMap::new(), |
| 526 | + None, |
| 527 | + Full::new(Bytes::from_static(b"body")) |
| 528 | + .map_err(|x| match x {}) |
| 529 | + .boxed_unsync(), |
| 530 | + ); |
| 531 | + let mut store = Store::new(&engine, TestCtx::new()); |
| 532 | + let http_req = req.into_http(&mut store, async { Ok(()) }).unwrap(); |
| 533 | + assert_eq!(http_req.method(), Method::POST); |
| 534 | + let expected_scheme = scheme.unwrap_or(Scheme::HTTPS); // default scheme |
| 535 | + assert_eq!( |
| 536 | + http_req.uri(), |
| 537 | + &http::Uri::from_str(&format!( |
| 538 | + "{}://example.com/path?query=1", |
| 539 | + expected_scheme.as_str() |
| 540 | + )) |
| 541 | + .unwrap() |
| 542 | + ); |
| 543 | + let body_bytes = http_req.into_body().collect().await?; |
| 544 | + assert_eq!(*body_bytes.to_bytes(), *b"body"); |
| 545 | + let mut cx = Context::from_waker(Waker::noop()); |
| 546 | + let mut fut = Box::pin(fut); |
| 547 | + let result = fut.as_mut().poll(&mut cx); |
| 548 | + assert!(matches!(result, futures::task::Poll::Ready(Ok(())))); |
| 549 | + } |
| 550 | + |
| 551 | + Ok(()) |
| 552 | + } |
| 553 | + |
| 554 | + #[tokio::test] |
| 555 | + async fn test_request_into_http_uri_error() -> Result<()> { |
| 556 | + let (req, fut) = Request::new( |
| 557 | + Method::GET, |
| 558 | + Some(Scheme::HTTP), |
| 559 | + Some(Authority::from_static("example.com")), |
| 560 | + None, // <-- should fail, must be Some(_) when authority is set |
| 561 | + HeaderMap::new(), |
| 562 | + None, |
| 563 | + Empty::new().map_err(|x| match x {}).boxed_unsync(), |
| 564 | + ); |
| 565 | + let mut store = Store::new(&Engine::default(), TestCtx::new()); |
| 566 | + let result = req.into_http(&mut store, async { |
| 567 | + Err(ErrorCode::InternalError(Some("uh oh".to_string()))) |
| 568 | + }); |
| 569 | + assert!(result.is_err()); |
| 570 | + assert!(matches!( |
| 571 | + result.unwrap_err().downcast_ref::<ErrorCode>(), |
| 572 | + Some(ErrorCode::HttpRequestUriInvalid) |
| 573 | + )); |
| 574 | + let mut cx = Context::from_waker(Waker::noop()); |
| 575 | + let result = Box::pin(fut).as_mut().poll(&mut cx); |
| 576 | + assert!(matches!( |
| 577 | + result, |
| 578 | + futures::task::Poll::Ready(Err(ErrorCode::InternalError(Some(_)))) |
| 579 | + )); |
| 580 | + |
| 581 | + Ok(()) |
| 582 | + } |
| 583 | +} |
0 commit comments