diff --git a/Cargo.lock b/Cargo.lock index 4b2bb58b4..65598c8fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3246,6 +3246,47 @@ dependencies = [ "wrpc-transport-nats", ] +[[package]] +name = "streams-quic-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "clap", + "futures", + "quinn", + "rcgen", + "rustls 0.23.35", + "tokio", + "tokio-stream", + "tracing", + "tracing-subscriber", + "url", + "wit-bindgen-wrpc", + "wrpc-transport", + "wrpc-transport-quic", +] + +[[package]] +name = "streams-quic-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "clap", + "futures", + "quinn", + "rcgen", + "rustls 0.23.35", + "tokio", + "tracing", + "tracing-subscriber", + "url", + "wit-bindgen-wrpc", + "wrpc-transport", + "wrpc-transport-quic", +] + [[package]] name = "strsim" version = "0.11.1" diff --git a/examples/rust/hello-quic-client/src/main.rs b/examples/rust/hello-quic-client/src/main.rs index 262c82441..92e5dc075 100644 --- a/examples/rust/hello-quic-client/src/main.rs +++ b/examples/rust/hello-quic-client/src/main.rs @@ -24,7 +24,7 @@ mod bindings { #[command(author, version, about, long_about = None)] struct Args { /// Address to invoke `wrpc-examples:hello/handler.hello` on - #[arg(default_value = "127.0.0.1:4433")] + #[arg(default_value = "[::1]:4433")] addr: SocketAddr, } diff --git a/examples/rust/streams-quic-client/Cargo.toml b/examples/rust/streams-quic-client/Cargo.toml new file mode 100644 index 000000000..890262414 --- /dev/null +++ b/examples/rust/streams-quic-client/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "streams-quic-client" +version = "0.1.0" + +authors.workspace = true +categories.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow = { workspace = true, features = ["std"] } +bytes = { workspace = true } +clap = { workspace = true, features = [ + "color", + "derive", + "error-context", + "help", + "std", + "suggestions", + "usage", +] } +futures = { workspace = true } +quinn = { workspace = true, features = [ + "log", + "platform-verifier", + "ring", + "runtime-tokio", + "rustls", +] } +rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] } +rustls = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "signal"] } +tokio-stream = { workspace = true, features = ["time"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = [ + "ansi", + "env-filter", + "fmt", +] } +url = { workspace = true } +wit-bindgen-wrpc = { workspace = true } +wrpc-transport = { workspace = true } +wrpc-transport-quic = { workspace = true } diff --git a/examples/rust/streams-quic-client/src/main.rs b/examples/rust/streams-quic-client/src/main.rs new file mode 100644 index 000000000..006a5610e --- /dev/null +++ b/examples/rust/streams-quic-client/src/main.rs @@ -0,0 +1,141 @@ +use core::time::Duration; +use std::sync::Arc; + +use anyhow::Context as _; +use bytes::Bytes; +use clap::Parser; +use core::net::SocketAddr; +use futures::{stream, StreamExt as _}; +use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint}; +use rustls::{ + client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}, + pki_types::{CertificateDer, ServerName, UnixTime}, + version::TLS13, + DigitallySignedStruct, SignatureScheme, +}; +use tokio::{time, try_join}; +use tokio_stream::wrappers::IntervalStream; +use tracing::debug; + +mod bindings { + wit_bindgen_wrpc::generate!({ + with: { + "wrpc-examples:streams/handler": generate + } + }); +} + +use bindings::wrpc_examples::streams::handler::{echo, Req}; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Address to invoke `wrpc-examples:streams/handler.echo` on + #[arg(default_value = "[::1]:4433")] + addr: SocketAddr, +} + +#[derive(Debug)] +struct Insecure; + +impl ServerCertVerifier for Insecure { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![SignatureScheme::ECDSA_NISTP256_SHA256] + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt().init(); + + let Args { addr } = Args::parse(); + + let conf = rustls::ClientConfig::builder_with_protocol_versions(&[&TLS13]) + .dangerous() + .with_custom_certificate_verifier(Arc::new(Insecure)) + .with_no_client_auth(); + + let conf: ClientConfig = ClientConfig::new(Arc::new(QuicClientConfig::try_from(conf)?)); + // Bind to any IPv4 or IPv6 address (dual stack, if supported). + let ep = Endpoint::client((std::net::Ipv6Addr::UNSPECIFIED, 0).into())?; + + // Connect using the rustls client configuration, addr, and server name + let connection = ep + .connect_with(conf, addr, "localhost")? + .await + .context("failed to connect to server")?; + let wrpc = wrpc_transport_quic::Client::from(connection); + + let numbers = Box::pin( + stream::iter(1..) + .take(10) + .zip(IntervalStream::new(time::interval(Duration::from_secs(1)))) + .map(|(i, _)| i) + .ready_chunks(10), + ); + + // `stream` items are chunked using [`Bytes`] + let bytes = Box::pin( + stream::iter(b"foo bar baz") + .zip(IntervalStream::new(time::interval(Duration::from_secs(1)))) + .map(|(i, _)| *i) + .ready_chunks(10) + .map(Bytes::from), + ); + + let (mut numbers, mut bytes, io) = echo(&wrpc, (), Req { numbers, bytes }) + .await + .context("failed to invoke `wrpc-examples:streams/handler.echo`")?; + try_join!( + async { + if let Some(io) = io { + debug!("performing async I/O"); + io.await.context("failed to complete async I/O") + } else { + Ok(()) + } + }, + async { + while let Some(item) = numbers.next().await { + eprintln!("numbers: {item:?}"); + } + Ok(()) + }, + async { + while let Some(item) = bytes.next().await { + eprintln!("bytes: {item:?}"); + } + Ok(()) + } + )?; + Ok(()) +} diff --git a/examples/rust/streams-quic-client/wit/deps.lock b/examples/rust/streams-quic-client/wit/deps.lock new file mode 100644 index 000000000..640642729 --- /dev/null +++ b/examples/rust/streams-quic-client/wit/deps.lock @@ -0,0 +1,4 @@ +[streams] +path = "../../../wit/streams" +sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f" +sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9" diff --git a/examples/rust/streams-quic-client/wit/deps.toml b/examples/rust/streams-quic-client/wit/deps.toml new file mode 100644 index 000000000..7ad7d00c7 --- /dev/null +++ b/examples/rust/streams-quic-client/wit/deps.toml @@ -0,0 +1 @@ +streams = "../../../wit/streams" diff --git a/examples/rust/streams-quic-client/wit/deps/streams/streams.wit b/examples/rust/streams-quic-client/wit/deps/streams/streams.wit new file mode 100644 index 000000000..8520741c2 --- /dev/null +++ b/examples/rust/streams-quic-client/wit/deps/streams/streams.wit @@ -0,0 +1,17 @@ +package wrpc-examples:streams; + +interface handler { + record req { + numbers: stream, + bytes: stream, + } + echo: func(r: req) -> (numbers: stream, bytes: stream); +} + +world client { + import handler; +} + +world server { + export handler; +} diff --git a/examples/rust/streams-quic-client/wit/world.wit b/examples/rust/streams-quic-client/wit/world.wit new file mode 100644 index 000000000..5f6997fdc --- /dev/null +++ b/examples/rust/streams-quic-client/wit/world.wit @@ -0,0 +1,5 @@ +package wrpc-examples:streams-rust-client; + +world client { + include wrpc-examples:streams/client; +} diff --git a/examples/rust/streams-quic-server/Cargo.toml b/examples/rust/streams-quic-server/Cargo.toml new file mode 100644 index 000000000..acef7fbd3 --- /dev/null +++ b/examples/rust/streams-quic-server/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "streams-quic-server" +version = "0.1.0" + +authors.workspace = true +categories.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow = { workspace = true, features = ["std"] } +bytes = { workspace = true } +clap = { workspace = true, features = [ + "color", + "derive", + "error-context", + "help", + "std", + "suggestions", + "usage", +] } +futures = { workspace = true } +quinn = { workspace = true, features = [ + "log", + "platform-verifier", + "ring", + "runtime-tokio", + "rustls", +] } +rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] } +rustls = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "signal"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = [ + "ansi", + "env-filter", + "fmt", +] } +url = { workspace = true } +wit-bindgen-wrpc = { workspace = true } +wrpc-transport = { workspace = true } +wrpc-transport-quic = { workspace = true } diff --git a/examples/rust/streams-quic-server/src/main.rs b/examples/rust/streams-quic-server/src/main.rs new file mode 100644 index 000000000..3c8a30afb --- /dev/null +++ b/examples/rust/streams-quic-server/src/main.rs @@ -0,0 +1,173 @@ +use core::net::SocketAddr; +use core::pin::{pin, Pin}; + +use std::sync::Arc; + +use anyhow::Context as _; +use bytes::Bytes; +use clap::Parser; +use futures::stream::select_all; +use futures::{Stream, StreamExt as _}; +use quinn::crypto::rustls::QuicServerConfig; +use quinn::{Endpoint, ServerConfig}; +use rcgen::{generate_simple_self_signed, CertifiedKey}; +use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; +use rustls::version::TLS13; +use tokio::task::JoinSet; +use tokio::{select, signal}; +use tracing::{debug, error, info, warn}; + +mod bindings { + wit_bindgen_wrpc::generate!({ + with: { + "wrpc-examples:streams/handler": generate, + } + }); +} + +use bindings::exports::wrpc_examples::streams::handler::Req; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Address to serve `wrpc-examples:streams/handler.echo` on + #[arg(default_value = "[::1]:4433")] + addr: SocketAddr, +} + +#[derive(Clone, Copy)] +struct Server; + +impl bindings::exports::wrpc_examples::streams::handler::Handler<()> for Server { + async fn echo( + &self, + _ctx: (), + Req { numbers, bytes }: Req, + ) -> anyhow::Result<( + Pin> + Send>>, + Pin + Send>>, + )> { + Ok((numbers, bytes)) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt().init(); + + let Args { addr } = Args::parse(); + + let CertifiedKey { cert, key_pair } = generate_simple_self_signed([ + "localhost".to_string(), + "::1".to_string(), + "127.0.0.1".to_string(), + ]) + .context("failed to generate server certificate")?; + let cert = CertificateDer::from(cert); + + let conf = rustls::ServerConfig::builder_with_protocol_versions(&[&TLS13]) + .with_no_client_auth() // TODO: verify client cert + .with_single_cert( + vec![cert], + PrivatePkcs8KeyDer::from(key_pair.serialize_der()).into(), + ) + .context("failed to create server config")?; + + let conf: QuicServerConfig = conf + .try_into() + .context("failed to convert rustls client config to QUIC server config")?; + + let ep = Endpoint::server(ServerConfig::with_crypto(Arc::new(conf)), addr) + .context("failed to create server endpoint")?; + + let srv = Arc::new(wrpc_transport_quic::Server::new()); + let accept = tokio::spawn({ + let mut tasks = JoinSet::>::new(); + let srv = Arc::clone(&srv); + async move { + loop { + select! { + Some(conn) = ep.accept() => { + let srv = Arc::clone(&srv); + tasks.spawn(async move { + let conn = conn + .accept() + .context("failed to accept QUIC connection")?; + let conn = conn.await.context("failed to establish QUIC connection")?; + let wrpc = wrpc_transport_quic::Client::from(conn); + loop { + srv.accept(&wrpc) + .await + .context("failed to accept wRPC connection")?; + } + }); + } + Some(res) = tasks.join_next() => { + match res { + Ok(Ok(())) => {} + Ok(Err(err)) => { + warn!(?err, "failed to serve connection") + } + Err(err) => { + error!(?err, "failed to join task") + } + } + } + else => { + return; + } + } + } + } + }); + + let invocations = bindings::serve(srv.as_ref(), Server) + .await + .context("failed to serve `wrpc-examples:streams/handler.echo`")?; + // NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`, + // to customize this, iterate over the returned `invocations` and set up custom handling per export + let mut invocations = select_all( + invocations + .into_iter() + .map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))), + ); + let shutdown = signal::ctrl_c(); + let mut shutdown = pin!(shutdown); + let mut tasks = JoinSet::new(); + loop { + select! { + Some((instance, name, res)) = invocations.next() => { + match res { + Ok(fut) => { + debug!(instance, name, "invocation accepted"); + tasks.spawn(async move { + if let Err(err) = fut.await { + warn!(?err, "failed to handle invocation"); + } else { + info!(instance, name, "invocation successfully handled"); + } + }); + } + Err(err) => { + warn!(?err, instance, name, "failed to accept invocation"); + } + } + } + Some(res) = tasks.join_next() => { + if let Err(err) = res { + error!(?err, "failed to join task") + } + } + res = &mut shutdown => { + accept.abort(); + // wait for all invocations to complete + while let Some(res) = tasks.join_next().await { + if let Err(err) = res { + error!(?err, "failed to join task") + } + } + return res.context("failed to listen for ^C") + } + } + } +} diff --git a/examples/rust/streams-quic-server/wit/deps.lock b/examples/rust/streams-quic-server/wit/deps.lock new file mode 100644 index 000000000..640642729 --- /dev/null +++ b/examples/rust/streams-quic-server/wit/deps.lock @@ -0,0 +1,4 @@ +[streams] +path = "../../../wit/streams" +sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f" +sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9" diff --git a/examples/rust/streams-quic-server/wit/deps.toml b/examples/rust/streams-quic-server/wit/deps.toml new file mode 100644 index 000000000..7ad7d00c7 --- /dev/null +++ b/examples/rust/streams-quic-server/wit/deps.toml @@ -0,0 +1 @@ +streams = "../../../wit/streams" diff --git a/examples/rust/streams-quic-server/wit/deps/streams/streams.wit b/examples/rust/streams-quic-server/wit/deps/streams/streams.wit new file mode 100644 index 000000000..8520741c2 --- /dev/null +++ b/examples/rust/streams-quic-server/wit/deps/streams/streams.wit @@ -0,0 +1,17 @@ +package wrpc-examples:streams; + +interface handler { + record req { + numbers: stream, + bytes: stream, + } + echo: func(r: req) -> (numbers: stream, bytes: stream); +} + +world client { + import handler; +} + +world server { + export handler; +} diff --git a/examples/rust/streams-quic-server/wit/world.wit b/examples/rust/streams-quic-server/wit/world.wit new file mode 100644 index 000000000..ba3ffa1b6 --- /dev/null +++ b/examples/rust/streams-quic-server/wit/world.wit @@ -0,0 +1,5 @@ +package wrpc-examples:streams-rust-server; + +world server { + include wrpc-examples:streams/server; +}