Skip to content

Commit 5c15a66

Browse files
committed
Add Async OHTTP client
Co-authored-by Alex Lewin <[email protected]> [OHTTP](https://datatracker.ietf.org/doc/rfc9458/) lets a client send encrypted requests through a relay so the server can’t see who sent them and the relay can’t see what they contain. The following commit adds optional configurations to enable clients to proxy their requests through an OHTTP relay and gateway. OHTTP functionality is feature flagged off behind `async-ohttp`. If a client provided OHTTP config it will attempt to use the relay instead of the target resource directly.
1 parent 9305e41 commit 5c15a66

File tree

4 files changed

+173
-1
lines changed

4 files changed

+173
-1
lines changed

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ reqwest = { version = "0.12", features = ["json"], default-features = false, op
2929

3030
# default async runtime
3131
tokio = { version = "1", features = ["time"], optional = true }
32+
bitcoin-ohttp = { version = "0.6.0", optional = true}
33+
url = {version = "2.5.7", optional = true}
34+
bhttp = { version = "0.6.1", optional = true}
35+
http = { version = "1.3.1", optional = true}
36+
3237

3338
[dev-dependencies]
3439
serde_json = "1.0"
@@ -43,6 +48,7 @@ blocking-https = ["blocking", "minreq/https"]
4348
blocking-https-rustls = ["blocking", "minreq/https-rustls"]
4449
blocking-https-native = ["blocking", "minreq/https-native"]
4550
blocking-https-bundled = ["blocking", "minreq/https-bundled"]
51+
async-ohttp = ["async", "bitcoin-ohttp", "bhttp", "reqwest", "tokio", "url", "http"]
4652

4753
tokio = ["dep:tokio"]
4854
async = ["reqwest", "reqwest/socks", "tokio?/time"]

src/async.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use log::{debug, error, info, trace};
2626

2727
use reqwest::{header, Client, Response};
2828

29+
#[cfg(feature = "async-ohttp")]
30+
use crate::ohttp::OhttpClient;
2931
use crate::{
3032
AddressStats, BlockInfo, BlockStatus, BlockSummary, Builder, Error, MempoolRecentTx,
3133
MempoolStats, MerkleProof, OutputStatus, ScriptHashStats, Tx, TxStatus, Utxo,
@@ -43,6 +45,9 @@ pub struct AsyncClient<S = DefaultSleeper> {
4345

4446
/// Marker for the type of sleeper used
4547
marker: PhantomData<S>,
48+
/// Ohttp config
49+
#[cfg(feature = "async-ohttp")]
50+
ohttp_client: Option<OhttpClient>,
4651
}
4752

4853
impl<S: Sleeper> AsyncClient<S> {
@@ -77,6 +82,8 @@ impl<S: Sleeper> AsyncClient<S> {
7782
client: client_builder.build()?,
7883
max_retries: builder.max_retries,
7984
marker: PhantomData,
85+
#[cfg(feature = "async-ohttp")]
86+
ohttp_client: None,
8087
})
8188
}
8289

@@ -86,9 +93,17 @@ impl<S: Sleeper> AsyncClient<S> {
8693
client,
8794
max_retries: crate::DEFAULT_MAX_RETRIES,
8895
marker: PhantomData,
96+
#[cfg(feature = "async-ohttp")]
97+
ohttp_client: None,
8998
}
9099
}
91100

101+
#[cfg(feature = "async-ohttp")]
102+
pub(crate) fn set_ohttp_client(mut self, ohttp_client: OhttpClient) -> Self {
103+
self.ohttp_client = Some(ohttp_client);
104+
self
105+
}
106+
92107
/// Make an HTTP GET request to given URL, deserializing to any `T` that
93108
/// implement [`bitcoin::consensus::Decodable`].
94109
///
@@ -557,12 +572,32 @@ impl<S: Sleeper> AsyncClient<S> {
557572
let mut attempts = 0;
558573

559574
loop {
560-
match self.client.get(url).send().await? {
575+
let res = {
576+
#[cfg(feature = "async-ohttp")]
577+
if let Some(ohttp_client) = &self.ohttp_client {
578+
let (body, ctx) = ohttp_client.ohttp_encapsulate("get", url, None)?;
579+
let res = self
580+
.client
581+
.post(ohttp_client.relay_url().to_string())
582+
.header("Content-Type", "message/ohttp-req")
583+
.body(body)
584+
.send()
585+
.await?;
586+
let body = res.bytes().await?.to_vec();
587+
ohttp_client.ohttp_decapsulate(ctx, body)?.into()
588+
} else {
589+
self.client.get(url).send().await?
590+
}
591+
#[cfg(not(feature = "async-ohttp"))]
592+
self.client.get(url).send().await?
593+
};
594+
match res {
561595
resp if attempts < self.max_retries && is_status_retryable(resp.status()) => {
562596
S::sleep(delay).await;
563597
attempts += 1;
564598
delay *= 2;
565599
}
600+
566601
resp => return Ok(resp),
567602
}
568603
}

src/lib.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ pub mod r#async;
8383
#[cfg(feature = "blocking")]
8484
pub mod blocking;
8585

86+
#[cfg(feature = "async-ohttp")]
87+
pub(crate) mod ohttp;
88+
8689
pub use api::*;
8790
#[cfg(feature = "blocking")]
8891
pub use blocking::BlockingClient;
@@ -195,6 +198,20 @@ impl Builder {
195198
pub fn build_async_with_sleeper<S: Sleeper>(self) -> Result<AsyncClient<S>, Error> {
196199
AsyncClient::from_builder(self)
197200
}
201+
202+
#[cfg(feature = "async-ohttp")]
203+
pub async fn build_async_with_ohttp(
204+
self,
205+
ohttp_relay_url: &str,
206+
ohttp_gateway_url: &str,
207+
) -> Result<AsyncClient, Error> {
208+
use crate::ohttp::OhttpClient;
209+
210+
let ohttp_client = OhttpClient::new(ohttp_relay_url, ohttp_gateway_url).await?;
211+
Ok(self
212+
.build_async_with_sleeper()?
213+
.set_ohttp_client(ohttp_client))
214+
}
198215
}
199216

200217
/// Errors that can happen during a request to `Esplora` servers.
@@ -230,6 +247,18 @@ pub enum Error {
230247
InvalidHttpHeaderValue(String),
231248
/// The server sent an invalid response
232249
InvalidResponse,
250+
/// Error from Ohttp library
251+
#[cfg(feature = "async-ohttp")]
252+
Ohttp(bitcoin_ohttp::Error),
253+
/// Error when reading and writing to bhttp payloads
254+
#[cfg(feature = "async-ohttp")]
255+
Bhttp(bhttp::Error),
256+
/// Error when converting the http response to and from bhttp response
257+
#[cfg(feature = "async-ohttp")]
258+
Http(http::Error),
259+
/// Error when parsing the URL
260+
#[cfg(feature = "async-ohttp")]
261+
UrlParsing(url::ParseError),
233262
}
234263

235264
impl fmt::Display for Error {

src/ohttp.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use crate::Error;
2+
use bitcoin_ohttp as ohttp;
3+
use reqwest::Client;
4+
use url::Url;
5+
6+
#[derive(Debug, Clone)]
7+
pub struct OhttpClient {
8+
key_config: ohttp::KeyConfig,
9+
relay_url: Url,
10+
}
11+
12+
impl OhttpClient {
13+
/// Will attempt to fetch the key config from the gateway and then create a new client.
14+
/// Keyconfig is fetched directly from the gateway thus revealing our network metadata.
15+
/// TODO: use the relay HTTP connect proxy to fetch to.
16+
pub(crate) async fn new(relay_url: &str, ohttp_gateway_url: &str) -> Result<Self, Error> {
17+
let gateway_url = Url::parse(ohttp_gateway_url).map_err(Error::UrlParsing)?;
18+
let res = Client::new()
19+
.get(gateway_url)
20+
.send()
21+
.await
22+
.map_err(Error::Reqwest)?;
23+
let body = res.bytes().await.map_err(Error::Reqwest)?;
24+
let key_config = ohttp::KeyConfig::decode(&body).map_err(Error::Ohttp)?;
25+
Ok(Self {
26+
key_config,
27+
relay_url: Url::parse(relay_url).map_err(Error::UrlParsing)?,
28+
})
29+
}
30+
31+
pub(crate) fn relay_url(&self) -> &Url {
32+
&self.relay_url
33+
}
34+
35+
pub(crate) fn ohttp_encapsulate(
36+
&self,
37+
method: &str,
38+
target_resource: &str,
39+
body: Option<&[u8]>,
40+
) -> Result<(Vec<u8>, ohttp::ClientResponse), Error> {
41+
use std::fmt::Write;
42+
43+
// Bitcoin-hpke takes keyconfig as mutable ref but it doesnt mutate it should fix it
44+
// upstream but for now we can clone it to avoid changing self to mutable self
45+
let mut key_config = self.key_config.clone();
46+
47+
let ctx = ohttp::ClientRequest::from_config(&mut key_config).map_err(Error::Ohttp)?;
48+
let url = url::Url::parse(target_resource).map_err(Error::UrlParsing)?;
49+
let authority_bytes = url.host().map_or_else(Vec::new, |host| {
50+
let mut authority = host.to_string();
51+
if let Some(port) = url.port() {
52+
write!(authority, ":{port}").unwrap();
53+
}
54+
authority.into_bytes()
55+
});
56+
let mut bhttp_message = bhttp::Message::request(
57+
method.as_bytes().to_vec(),
58+
url.scheme().as_bytes().to_vec(),
59+
authority_bytes,
60+
url.path().as_bytes().to_vec(),
61+
);
62+
// TODO: do we need to add headers?
63+
if let Some(body) = body {
64+
bhttp_message.write_content(body);
65+
}
66+
67+
let mut bhttp_req = Vec::new();
68+
bhttp_message
69+
.write_bhttp(bhttp::Mode::IndeterminateLength, &mut bhttp_req)
70+
.map_err(Error::Bhttp)?;
71+
let (encapsulated, ohttp_ctx) = ctx.encapsulate(&bhttp_req).map_err(Error::Ohttp)?;
72+
73+
Ok((encapsulated, ohttp_ctx))
74+
}
75+
76+
pub(crate) fn ohttp_decapsulate(
77+
&self,
78+
res_ctx: ohttp::ClientResponse,
79+
ohttp_body: Vec<u8>,
80+
) -> Result<http::Response<Vec<u8>>, Error> {
81+
let bhttp_body = res_ctx.decapsulate(&ohttp_body).map_err(Error::Ohttp)?;
82+
let mut r = std::io::Cursor::new(bhttp_body);
83+
let m: bhttp::Message = bhttp::Message::read_bhttp(&mut r).map_err(Error::Bhttp)?;
84+
let mut builder = http::Response::builder();
85+
for field in m.header().iter() {
86+
builder = builder.header(field.name(), field.value());
87+
}
88+
builder
89+
.status({
90+
let code = m
91+
.control()
92+
.status()
93+
.ok_or(bhttp::Error::InvalidStatus)
94+
.map_err(Error::Bhttp)?;
95+
http::StatusCode::from_u16(code.code())
96+
.map_err(|_| bhttp::Error::InvalidStatus)
97+
.map_err(Error::Bhttp)?
98+
})
99+
.body(m.content().to_vec())
100+
.map_err(Error::Http)
101+
}
102+
}

0 commit comments

Comments
 (0)