Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/rust/hello-quic-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod bindings {
#[command(author, version, about, long_about = None)]
struct Args {
/// Address to invoke `wrpc-examples:hello/handler.hello` on
#[arg(default_value = "127.0.0.1:4433")]
#[arg(default_value = "[::1]:4433")]
addr: SocketAddr,
}

Expand Down
44 changes: 44 additions & 0 deletions examples/rust/streams-quic-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "streams-quic-client"
version = "0.1.0"

authors.workspace = true
categories.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
"error-context",
"help",
"std",
"suggestions",
"usage",
] }
futures = { workspace = true }
quinn = { workspace = true, features = [
"log",
"platform-verifier",
"ring",
"runtime-tokio",
"rustls",
] }
rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] }
rustls = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
tokio-stream = { workspace = true, features = ["time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
] }
url = { workspace = true }
wit-bindgen-wrpc = { workspace = true }
wrpc-transport = { workspace = true }
wrpc-transport-quic = { workspace = true }
141 changes: 141 additions & 0 deletions examples/rust/streams-quic-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use core::time::Duration;
use std::sync::Arc;

use anyhow::Context as _;
use bytes::Bytes;
use clap::Parser;
use core::net::SocketAddr;
use futures::{stream, StreamExt as _};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
use rustls::{
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
pki_types::{CertificateDer, ServerName, UnixTime},
version::TLS13,
DigitallySignedStruct, SignatureScheme,
};
use tokio::{time, try_join};
use tokio_stream::wrappers::IntervalStream;
use tracing::debug;

mod bindings {
wit_bindgen_wrpc::generate!({
with: {
"wrpc-examples:streams/handler": generate
}
});
}

use bindings::wrpc_examples::streams::handler::{echo, Req};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Address to invoke `wrpc-examples:streams/handler.echo` on
#[arg(default_value = "[::1]:4433")]
addr: SocketAddr,
}

#[derive(Debug)]
struct Insecure;

impl ServerCertVerifier for Insecure {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp_response: &[u8],
_now: UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}

fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
vec![SignatureScheme::ECDSA_NISTP256_SHA256]
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();

let Args { addr } = Args::parse();

let conf = rustls::ClientConfig::builder_with_protocol_versions(&[&TLS13])
.dangerous()
.with_custom_certificate_verifier(Arc::new(Insecure))
.with_no_client_auth();

let conf: ClientConfig = ClientConfig::new(Arc::new(QuicClientConfig::try_from(conf)?));
// Bind to any IPv4 or IPv6 address (dual stack, if supported).
let ep = Endpoint::client((std::net::Ipv6Addr::UNSPECIFIED, 0).into())?;

// Connect using the rustls client configuration, addr, and server name
let connection = ep
.connect_with(conf, addr, "localhost")?
.await
.context("failed to connect to server")?;
let wrpc = wrpc_transport_quic::Client::from(connection);

let numbers = Box::pin(
stream::iter(1..)
.take(10)
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| i)
.ready_chunks(10),
);

// `stream<u8>` items are chunked using [`Bytes`]
let bytes = Box::pin(
stream::iter(b"foo bar baz")
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| *i)
.ready_chunks(10)
.map(Bytes::from),
);

let (mut numbers, mut bytes, io) = echo(&wrpc, (), Req { numbers, bytes })
.await
.context("failed to invoke `wrpc-examples:streams/handler.echo`")?;
try_join!(
async {
if let Some(io) = io {
debug!("performing async I/O");
io.await.context("failed to complete async I/O")
} else {
Ok(())
}
},
async {
while let Some(item) = numbers.next().await {
eprintln!("numbers: {item:?}");
}
Ok(())
},
async {
while let Some(item) = bytes.next().await {
eprintln!("bytes: {item:?}");
}
Ok(())
}
)?;
Ok(())
}
4 changes: 4 additions & 0 deletions examples/rust/streams-quic-client/wit/deps.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[streams]
path = "../../../wit/streams"
sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f"
sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9"
1 change: 1 addition & 0 deletions examples/rust/streams-quic-client/wit/deps.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
streams = "../../../wit/streams"
17 changes: 17 additions & 0 deletions examples/rust/streams-quic-client/wit/deps/streams/streams.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package wrpc-examples:streams;

interface handler {
record req {
numbers: stream<u64>,
bytes: stream<u8>,
}
echo: func(r: req) -> (numbers: stream<u64>, bytes: stream<u8>);
}

world client {
import handler;
}

world server {
export handler;
}
5 changes: 5 additions & 0 deletions examples/rust/streams-quic-client/wit/world.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package wrpc-examples:streams-rust-client;

world client {
include wrpc-examples:streams/client;
}
43 changes: 43 additions & 0 deletions examples/rust/streams-quic-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[package]
name = "streams-quic-server"
version = "0.1.0"

authors.workspace = true
categories.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
"error-context",
"help",
"std",
"suggestions",
"usage",
] }
futures = { workspace = true }
quinn = { workspace = true, features = [
"log",
"platform-verifier",
"ring",
"runtime-tokio",
"rustls",
] }
rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] }
rustls = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
] }
url = { workspace = true }
wit-bindgen-wrpc = { workspace = true }
wrpc-transport = { workspace = true }
wrpc-transport-quic = { workspace = true }
Loading
Loading