Skip to content

Commit 2535e9a

Browse files
committed
Implement streams-quic-server
1 parent 7b964fc commit 2535e9a

File tree

8 files changed

+245
-2
lines changed

8 files changed

+245
-2
lines changed

examples/rust/hello-quic-client/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ mod bindings {
2424
#[command(author, version, about, long_about = None)]
2525
struct Args {
2626
/// Address to invoke `wrpc-examples:hello/handler.hello` on
27-
#[arg(default_value = "127.0.0.1:4433")]
27+
#[arg(default_value = "[::1]:4433")]
2828
addr: SocketAddr,
2929
}
3030

examples/rust/streams-quic-client/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use bindings::wrpc_examples::streams::handler::{echo, Req};
3131
#[command(author, version, about, long_about = None)]
3232
struct Args {
3333
/// Address to invoke `wrpc-examples:streams/handler.echo` on
34-
#[arg(default_value = "127.0.0.1:4433")]
34+
#[arg(default_value = "[::1]:4433")]
3535
addr: SocketAddr,
3636
}
3737

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
[package]
2+
name = "streams-quic-server"
3+
version = "0.1.0"
4+
5+
authors.workspace = true
6+
categories.workspace = true
7+
edition.workspace = true
8+
license.workspace = true
9+
repository.workspace = true
10+
11+
[dependencies]
12+
anyhow = { workspace = true, features = ["std"] }
13+
bytes = { workspace = true }
14+
clap = { workspace = true, features = [
15+
"color",
16+
"derive",
17+
"error-context",
18+
"help",
19+
"std",
20+
"suggestions",
21+
"usage",
22+
] }
23+
futures = { workspace = true }
24+
quinn = { workspace = true, features = [
25+
"log",
26+
"platform-verifier",
27+
"ring",
28+
"runtime-tokio",
29+
"rustls",
30+
] }
31+
rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] }
32+
rustls = { workspace = true }
33+
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
34+
tracing = { workspace = true }
35+
tracing-subscriber = { workspace = true, features = [
36+
"ansi",
37+
"env-filter",
38+
"fmt",
39+
] }
40+
url = { workspace = true }
41+
wit-bindgen-wrpc = { workspace = true }
42+
wrpc-transport = { workspace = true }
43+
wrpc-transport-quic = { workspace = true }
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
use core::net::SocketAddr;
2+
use core::pin::{pin, Pin};
3+
4+
use std::sync::Arc;
5+
6+
use anyhow::Context as _;
7+
use bytes::Bytes;
8+
use clap::Parser;
9+
use futures::stream::select_all;
10+
use futures::{Stream, StreamExt as _};
11+
use quinn::crypto::rustls::QuicServerConfig;
12+
use quinn::{Endpoint, ServerConfig};
13+
use rcgen::{generate_simple_self_signed, CertifiedKey};
14+
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
15+
use rustls::version::TLS13;
16+
use tokio::task::JoinSet;
17+
use tokio::{select, signal};
18+
use tracing::{debug, error, info, warn};
19+
20+
mod bindings {
21+
wit_bindgen_wrpc::generate!({
22+
with: {
23+
"wrpc-examples:streams/handler": generate,
24+
}
25+
});
26+
}
27+
28+
use bindings::exports::wrpc_examples::streams::handler::Req;
29+
30+
#[derive(Parser, Debug)]
31+
#[command(author, version, about, long_about = None)]
32+
struct Args {
33+
/// Address to serve `wrpc-examples:streams/handler.echo` on
34+
#[arg(default_value = "[::1]:4433")]
35+
addr: SocketAddr,
36+
}
37+
38+
#[derive(Clone, Copy)]
39+
struct Server;
40+
41+
impl bindings::exports::wrpc_examples::streams::handler::Handler<()> for Server {
42+
async fn echo(
43+
&self,
44+
_ctx: (),
45+
Req { numbers, bytes }: Req,
46+
) -> anyhow::Result<(
47+
Pin<Box<dyn Stream<Item = Vec<u64>> + Send>>,
48+
Pin<Box<dyn Stream<Item = Bytes> + Send>>,
49+
)> {
50+
Ok((numbers, bytes))
51+
}
52+
}
53+
54+
#[tokio::main]
55+
async fn main() -> anyhow::Result<()> {
56+
tracing_subscriber::fmt().init();
57+
58+
let Args { addr } = Args::parse();
59+
60+
let CertifiedKey { cert, key_pair } = generate_simple_self_signed([
61+
"localhost".to_string(),
62+
"::1".to_string(),
63+
"127.0.0.1".to_string(),
64+
])
65+
.context("failed to generate server certificate")?;
66+
let cert = CertificateDer::from(cert);
67+
68+
let conf = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13])
69+
.with_no_client_auth() // TODO: verify client cert
70+
.with_single_cert(
71+
vec![cert],
72+
PrivatePkcs8KeyDer::from(key_pair.serialize_der()).into(),
73+
)
74+
.context("failed to create server config")?;
75+
76+
let conf: QuicServerConfig = conf
77+
.try_into()
78+
.context("failed to convert rustls client config to QUIC server config")?;
79+
80+
let ep = Endpoint::server(ServerConfig::with_crypto(Arc::new(conf)), addr)
81+
.context("failed to create server endpoint")?;
82+
83+
let srv = Arc::new(wrpc_transport_quic::Server::new());
84+
let accept = tokio::spawn({
85+
let mut tasks = JoinSet::<anyhow::Result<()>>::new();
86+
let srv = Arc::clone(&srv);
87+
async move {
88+
loop {
89+
select! {
90+
Some(conn) = ep.accept() => {
91+
let srv = Arc::clone(&srv);
92+
tasks.spawn(async move {
93+
let conn = conn
94+
.accept()
95+
.context("failed to accept QUIC connection")?;
96+
let conn = conn.await.context("failed to establish QUIC connection")?;
97+
let wrpc = wrpc_transport_quic::Client::from(conn);
98+
loop {
99+
srv.accept(&wrpc)
100+
.await
101+
.context("failed to accept wRPC connection")?;
102+
}
103+
});
104+
}
105+
Some(res) = tasks.join_next() => {
106+
match res {
107+
Ok(Ok(())) => {}
108+
Ok(Err(err)) => {
109+
warn!(?err, "failed to serve connection")
110+
}
111+
Err(err) => {
112+
error!(?err, "failed to join task")
113+
}
114+
}
115+
}
116+
else => {
117+
return;
118+
}
119+
}
120+
}
121+
}
122+
});
123+
124+
let invocations = bindings::serve(srv.as_ref(), Server)
125+
.await
126+
.context("failed to serve `wasi:keyvalue`")?;
127+
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
128+
// to customize this, iterate over the returned `invocations` and set up custom handling per export
129+
let mut invocations = select_all(
130+
invocations
131+
.into_iter()
132+
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
133+
);
134+
let shutdown = signal::ctrl_c();
135+
let mut shutdown = pin!(shutdown);
136+
let mut tasks = JoinSet::new();
137+
loop {
138+
select! {
139+
Some((instance, name, res)) = invocations.next() => {
140+
match res {
141+
Ok(fut) => {
142+
debug!(instance, name, "invocation accepted");
143+
tasks.spawn(async move {
144+
if let Err(err) = fut.await {
145+
warn!(?err, "failed to handle invocation");
146+
} else {
147+
info!(instance, name, "invocation successfully handled");
148+
}
149+
});
150+
}
151+
Err(err) => {
152+
warn!(?err, instance, name, "failed to accept invocation");
153+
}
154+
}
155+
}
156+
Some(res) = tasks.join_next() => {
157+
if let Err(err) = res {
158+
error!(?err, "failed to join task")
159+
}
160+
}
161+
res = &mut shutdown => {
162+
accept.abort();
163+
// wait for all invocations to complete
164+
while let Some(res) = tasks.join_next().await {
165+
if let Err(err) = res {
166+
error!(?err, "failed to join task")
167+
}
168+
}
169+
return res.context("failed to listen for ^C")
170+
}
171+
}
172+
}
173+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[streams]
2+
path = "../../../wit/streams"
3+
sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f"
4+
sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
streams = "../../../wit/streams"
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package wrpc-examples:streams;
2+
3+
interface handler {
4+
record req {
5+
numbers: stream<u64>,
6+
bytes: stream<u8>,
7+
}
8+
echo: func(r: req) -> (numbers: stream<u64>, bytes: stream<u8>);
9+
}
10+
11+
world client {
12+
import handler;
13+
}
14+
15+
world server {
16+
export handler;
17+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package wrpc-examples:streams-rust-server;
2+
3+
world server {
4+
include wrpc-examples:streams/server;
5+
}

0 commit comments

Comments
 (0)