Skip to content

Commit 7b964fc

Browse files
committed
Implements streams-quic-client
1 parent 77be5df commit 7b964fc

File tree

7 files changed

+253
-0
lines changed

7 files changed

+253
-0
lines changed

Cargo.lock

Lines changed: 41 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
[package]
2+
name = "streams-quic-client"
3+
version = "0.1.0"
4+
5+
authors.workspace = true
6+
categories.workspace = true
7+
edition.workspace = true
8+
license.workspace = true
9+
repository.workspace = true
10+
11+
[dependencies]
12+
anyhow = { workspace = true, features = ["std"] }
13+
bytes = { workspace = true }
14+
clap = { workspace = true, features = [
15+
"color",
16+
"derive",
17+
"error-context",
18+
"help",
19+
"std",
20+
"suggestions",
21+
"usage",
22+
] }
23+
futures = { workspace = true }
24+
quinn = { workspace = true, features = [
25+
"log",
26+
"platform-verifier",
27+
"ring",
28+
"runtime-tokio",
29+
"rustls",
30+
] }
31+
rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] }
32+
rustls = { workspace = true }
33+
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
34+
tokio-stream = { workspace = true, features = ["time"] }
35+
tracing = { workspace = true }
36+
tracing-subscriber = { workspace = true, features = [
37+
"ansi",
38+
"env-filter",
39+
"fmt",
40+
] }
41+
url = { workspace = true }
42+
wit-bindgen-wrpc = { workspace = true }
43+
wrpc-transport = { workspace = true }
44+
wrpc-transport-quic = { workspace = true }
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
use core::time::Duration;
2+
use std::sync::Arc;
3+
4+
use anyhow::Context as _;
5+
use bytes::Bytes;
6+
use clap::Parser;
7+
use core::net::SocketAddr;
8+
use futures::{stream, StreamExt as _};
9+
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
10+
use rustls::{
11+
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
12+
pki_types::{CertificateDer, ServerName, UnixTime},
13+
version::TLS13,
14+
DigitallySignedStruct, SignatureScheme,
15+
};
16+
use tokio::{time, try_join};
17+
use tokio_stream::wrappers::IntervalStream;
18+
use tracing::debug;
19+
20+
mod bindings {
21+
wit_bindgen_wrpc::generate!({
22+
with: {
23+
"wrpc-examples:streams/handler": generate
24+
}
25+
});
26+
}
27+
28+
use bindings::wrpc_examples::streams::handler::{echo, Req};
29+
30+
#[derive(Parser, Debug)]
31+
#[command(author, version, about, long_about = None)]
32+
struct Args {
33+
/// Address to invoke `wrpc-examples:streams/handler.echo` on
34+
#[arg(default_value = "127.0.0.1:4433")]
35+
addr: SocketAddr,
36+
}
37+
38+
#[derive(Debug)]
39+
struct Insecure;
40+
41+
impl ServerCertVerifier for Insecure {
42+
fn verify_server_cert(
43+
&self,
44+
_end_entity: &CertificateDer<'_>,
45+
_intermediates: &[CertificateDer<'_>],
46+
_server_name: &ServerName<'_>,
47+
_ocsp_response: &[u8],
48+
_now: UnixTime,
49+
) -> Result<ServerCertVerified, rustls::Error> {
50+
Ok(ServerCertVerified::assertion())
51+
}
52+
53+
fn verify_tls12_signature(
54+
&self,
55+
_message: &[u8],
56+
_cert: &CertificateDer<'_>,
57+
_dss: &DigitallySignedStruct,
58+
) -> Result<HandshakeSignatureValid, rustls::Error> {
59+
Ok(HandshakeSignatureValid::assertion())
60+
}
61+
62+
fn verify_tls13_signature(
63+
&self,
64+
_message: &[u8],
65+
_cert: &CertificateDer<'_>,
66+
_dss: &DigitallySignedStruct,
67+
) -> Result<HandshakeSignatureValid, rustls::Error> {
68+
Ok(HandshakeSignatureValid::assertion())
69+
}
70+
71+
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
72+
vec![SignatureScheme::ECDSA_NISTP256_SHA256]
73+
}
74+
}
75+
76+
#[tokio::main]
77+
async fn main() -> anyhow::Result<()> {
78+
tracing_subscriber::fmt().init();
79+
80+
let Args { addr } = Args::parse();
81+
82+
let conf = rustls::ClientConfig::builder_with_protocol_versions(&[&TLS13])
83+
.dangerous()
84+
.with_custom_certificate_verifier(Arc::new(Insecure))
85+
.with_no_client_auth();
86+
87+
let conf: ClientConfig = ClientConfig::new(Arc::new(QuicClientConfig::try_from(conf)?));
88+
// Bind to any IPv4 or IPv6 address (dual stack, if supported).
89+
let ep = Endpoint::client((std::net::Ipv6Addr::UNSPECIFIED, 0).into())?;
90+
91+
// Connect using the rustls client configuration, addr, and server name
92+
let connection = ep
93+
.connect_with(conf, addr, "localhost")?
94+
.await
95+
.context("failed to connect to server")?;
96+
let wrpc = wrpc_transport_quic::Client::from(connection);
97+
98+
let numbers = Box::pin(
99+
stream::iter(1..)
100+
.take(10)
101+
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
102+
.map(|(i, _)| i)
103+
.ready_chunks(10),
104+
);
105+
106+
// `stream<u8>` items are chunked using [`Bytes`]
107+
let bytes = Box::pin(
108+
stream::iter(b"foo bar baz")
109+
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
110+
.map(|(i, _)| *i)
111+
.ready_chunks(10)
112+
.map(Bytes::from),
113+
);
114+
115+
let (mut numbers, mut bytes, io) = echo(&wrpc, (), Req { numbers, bytes })
116+
.await
117+
.context("failed to invoke `wrpc-examples:streams/handler.echo`")?;
118+
try_join!(
119+
async {
120+
if let Some(io) = io {
121+
debug!("performing async I/O");
122+
io.await.context("failed to complete async I/O")
123+
} else {
124+
Ok(())
125+
}
126+
},
127+
async {
128+
while let Some(item) = numbers.next().await {
129+
eprintln!("numbers: {item:?}");
130+
}
131+
Ok(())
132+
},
133+
async {
134+
while let Some(item) = bytes.next().await {
135+
eprintln!("bytes: {item:?}");
136+
}
137+
Ok(())
138+
}
139+
)?;
140+
Ok(())
141+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[streams]
2+
path = "../../../wit/streams"
3+
sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f"
4+
sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
streams = "../../../wit/streams"
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package wrpc-examples:streams;
2+
3+
interface handler {
4+
record req {
5+
numbers: stream<u64>,
6+
bytes: stream<u8>,
7+
}
8+
echo: func(r: req) -> (numbers: stream<u64>, bytes: stream<u8>);
9+
}
10+
11+
world client {
12+
import handler;
13+
}
14+
15+
world server {
16+
export handler;
17+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package wrpc-examples:streams-rust-client;
2+
3+
world client {
4+
include wrpc-examples:streams/client;
5+
}

0 commit comments

Comments
 (0)