-
-
Notifications
You must be signed in to change notification settings - Fork 449
Make use of the max-message-size SDP attribute in datachannels
#722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cb7913b
b67f443
9b0d40b
7c64b80
03adf5c
fe10149
3da802a
49781f7
8303917
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,30 @@ pub struct ReplayProtection { | |
| pub srtcp: usize, | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
| pub enum SctpMaxMessageSize { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am personally not a huge fan of how the crate makes use of |
||
| Bounded(u32), | ||
| Unbounded, | ||
| } | ||
|
|
||
| impl SctpMaxMessageSize { | ||
| pub const DEFAULT_MESSAGE_SIZE: u32 = 65536; | ||
| pub fn as_u32(&self) -> u32 { | ||
| match self { | ||
| Self::Bounded(result) => *result, | ||
| Self::Unbounded => 0, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Default for SctpMaxMessageSize { | ||
| fn default() -> Self { | ||
| // https://datatracker.ietf.org/doc/html/rfc8841#section-6.1-4 | ||
| // > If the SDP "max-message-size" attribute is not present, the default value is 64K. | ||
| Self::Bounded(Self::DEFAULT_MESSAGE_SIZE) | ||
| } | ||
| } | ||
|
|
||
| /// SettingEngine allows influencing behavior in ways that are not | ||
| /// supported by the WebRTC API. This allows us to support additional | ||
| /// use-cases without deviating from the WebRTC API elsewhere. | ||
|
|
@@ -79,6 +103,8 @@ pub struct SettingEngine { | |
| pub(crate) receive_mtu: usize, | ||
| pub(crate) mid_generator: Option<Arc<dyn Fn(isize) -> String + Send + Sync>>, | ||
| pub(crate) enable_sender_rtx: bool, | ||
| /// Determines the max size of any message that may be sent through an SCTP transport. | ||
| pub(crate) sctp_max_message_size_can_send: SctpMaxMessageSize, | ||
| } | ||
|
|
||
| impl SettingEngine { | ||
|
|
@@ -342,4 +368,11 @@ impl SettingEngine { | |
| pub fn enable_sender_rtx(&mut self, is_enabled: bool) { | ||
| self.enable_sender_rtx = is_enabled; | ||
| } | ||
|
|
||
| pub fn set_sctp_max_message_size_can_send( | ||
| &mut self, | ||
| max_message_size_can_send: SctpMaxMessageSize, | ||
| ) { | ||
| self.sctp_max_message_size_can_send = max_message_size_can_send | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ use waitgroup::WaitGroup; | |
|
|
||
| use super::*; | ||
| use crate::api::media_engine::MediaEngine; | ||
| use crate::api::setting_engine::SctpMaxMessageSize; | ||
| use crate::api::{APIBuilder, API}; | ||
| use crate::data_channel::data_channel_init::RTCDataChannelInit; | ||
| //use log::LevelFilter; | ||
|
|
@@ -1375,6 +1376,203 @@ async fn test_data_channel_non_standard_session_description() -> Result<()> { | |
| Ok(()) | ||
| } | ||
|
|
||
| async fn create_data_channel_with_max_message_size( | ||
| remote_max_message_size: Option<u32>, | ||
| can_send_max_message_size: Option<SctpMaxMessageSize>, | ||
| ) -> Result<Arc<RTCDataChannel>> { | ||
| let mut m = MediaEngine::default(); | ||
| let mut s: SettingEngine = SettingEngine::default(); | ||
| s.detach_data_channels(); | ||
| m.register_default_codecs()?; | ||
| let api_builder = APIBuilder::new().with_media_engine(m); | ||
|
|
||
| if let Some(can_send_max_message_size) = can_send_max_message_size { | ||
| s.set_sctp_max_message_size_can_send(can_send_max_message_size); | ||
| } | ||
|
|
||
| let api = api_builder.with_setting_engine(s).build(); | ||
|
|
||
| let (offer_pc, answer_pc) = new_pair(&api).await?; | ||
| let (data_channel_tx, mut data_channel_rx) = mpsc::channel::<Arc<RTCDataChannel>>(1); | ||
| let data_channel_tx = Arc::new(data_channel_tx); | ||
| answer_pc.on_data_channel(Box::new(move |dc: Arc<RTCDataChannel>| { | ||
| let data_channel_tx2 = Arc::clone(&data_channel_tx); | ||
| Box::pin(async move { | ||
| data_channel_tx2.send(dc).await.unwrap(); | ||
| }) | ||
| })); | ||
|
|
||
| let _ = offer_pc.create_data_channel("foo", None).await?; | ||
|
|
||
| let offer = offer_pc.create_offer(None).await?; | ||
| let mut offer_gathering_complete = offer_pc.gathering_complete_promise().await; | ||
| offer_pc.set_local_description(offer).await?; | ||
| let _ = offer_gathering_complete.recv().await; | ||
| let mut offer = offer_pc.local_description().await.unwrap(); | ||
|
|
||
| if let Some(remote_max_message_size) = remote_max_message_size { | ||
| offer | ||
| .sdp | ||
| .push_str(format!("a=max-message-size:{}\r\n", remote_max_message_size).as_str()); | ||
| } | ||
|
|
||
| answer_pc.set_remote_description(offer).await?; | ||
|
|
||
| let answer = answer_pc.create_answer(None).await?; | ||
|
|
||
| let mut answer_gathering_complete = answer_pc.gathering_complete_promise().await; | ||
| answer_pc.set_local_description(answer).await?; | ||
| let _ = answer_gathering_complete.recv().await; | ||
|
|
||
| let answer = answer_pc.local_description().await.unwrap(); | ||
| offer_pc.set_remote_description(answer).await?; | ||
|
|
||
| Ok(data_channel_rx.recv().await.unwrap()) | ||
| } | ||
|
|
||
| // 128 KB | ||
| const EXPECTED_MAX_MESSAGE_SIZE: u32 = 131072; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_data_channel_max_message_size_respected_on_send() -> Result<()> { | ||
| let data_channel = create_data_channel_with_max_message_size( | ||
| Some(EXPECTED_MAX_MESSAGE_SIZE), | ||
| Some(SctpMaxMessageSize::Unbounded), | ||
| ) | ||
| .await?; | ||
|
|
||
| // A buffer with a size greater than the default size of 64KB. | ||
| let buffer = vec![0; 68000]; | ||
| let bytes = bytes::Bytes::copy_from_slice(buffer.as_slice()); | ||
| data_channel.send(&bytes).await.unwrap(); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_given_remote_max_message_size_is_none_when_data_channel_can_send_max_message_size_respected_on_send( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like using the given, when, then naming pattern for certain tests. If you're not a fan of this, feel free to suggest alternative test names.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The following tests whether the |
||
| ) -> Result<()> { | ||
| const EXPECTED_CAN_SEND_MAX_MESSAGE_SIZE: u32 = 1024; | ||
| let data_channel = create_data_channel_with_max_message_size( | ||
| None, | ||
| Some(SctpMaxMessageSize::Bounded( | ||
| EXPECTED_CAN_SEND_MAX_MESSAGE_SIZE, | ||
| )), | ||
| ) | ||
| .await?; | ||
|
|
||
| let buffer = vec![0; 65536]; | ||
| let bytes = bytes::Bytes::copy_from_slice(buffer.as_slice()); | ||
|
|
||
| let actual = data_channel.send(&bytes).await; | ||
|
|
||
| assert!(matches!( | ||
| actual, | ||
| Err(Error::Data(data::Error::Sctp( | ||
| sctp::Error::ErrOutboundPacketTooLarge | ||
| ))) | ||
| )); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| async fn run_data_channel_config_max_message_size( | ||
| remote_max_message_size: Option<u32>, | ||
| can_send_max_message_size: Option<SctpMaxMessageSize>, | ||
| ) -> Result<u32> { | ||
| let data_channel = create_data_channel_with_max_message_size( | ||
| remote_max_message_size, | ||
| can_send_max_message_size, | ||
| ) | ||
| .await?; | ||
| let data_channel = data_channel.detach().await?; | ||
| Ok(data_channel.config.max_message_size) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_data_channel_max_message_size_reflected_on_data_channel_config() -> Result<()> { | ||
| assert_eq!( | ||
| run_data_channel_config_max_message_size( | ||
| Some(EXPECTED_MAX_MESSAGE_SIZE), | ||
| Some(SctpMaxMessageSize::Unbounded) | ||
| ) | ||
| .await?, | ||
| EXPECTED_MAX_MESSAGE_SIZE | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_can_send_max_message_size_unspecified_then_remote_default_value_is_respected( | ||
| ) -> Result<()> { | ||
| assert_eq!( | ||
| run_data_channel_config_max_message_size(Some(EXPECTED_MAX_MESSAGE_SIZE), None).await?, | ||
| SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_given_can_send_channel_max_message_size_less_than_remote_max_message_size_respect_send_channel_max_message_size( | ||
| ) -> Result<()> { | ||
| let remote_max_message_size = 1024; | ||
| let can_send_channel_max_message_size = 256; | ||
| assert_eq!( | ||
| run_data_channel_config_max_message_size( | ||
| Some(remote_max_message_size), | ||
| Some(SctpMaxMessageSize::Bounded( | ||
| can_send_channel_max_message_size | ||
| )) | ||
| ) | ||
| .await?, | ||
| can_send_channel_max_message_size | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_can_send_max_message_size_respected_on_data_channel_config() -> Result<()> { | ||
| let can_send_channel_max_message_size = 1024; | ||
| assert_eq!( | ||
| run_data_channel_config_max_message_size( | ||
| None, | ||
| Some(SctpMaxMessageSize::Bounded( | ||
| can_send_channel_max_message_size | ||
| )) | ||
| ) | ||
| .await?, | ||
| can_send_channel_max_message_size | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_given_no_remote_message_size_or_can_send_max_message_size_max_size_is_65536( | ||
| ) -> Result<()> { | ||
| assert_eq!( | ||
| run_data_channel_config_max_message_size(None, None).await?, | ||
| SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_respect_default_remote_max_message_size_when_can_send_max_message_size_is_greater_than_default( | ||
| ) -> Result<()> { | ||
| assert_eq!( | ||
| run_data_channel_config_max_message_size(None, Some(SctpMaxMessageSize::Bounded(70000))) | ||
| .await?, | ||
| SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| struct TestOrtcStack { | ||
| //api *API | ||
| gatherer: Arc<RTCIceGatherer>, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ use std::collections::VecDeque; | |
| use std::sync::Weak; | ||
|
|
||
| use super::*; | ||
| use crate::api::setting_engine::SctpMaxMessageSize; | ||
| use crate::rtp_transceiver::{create_stream_info, PayloadType}; | ||
| use crate::stats::stats_collector::StatsCollector; | ||
| use crate::stats::{ | ||
|
|
@@ -290,7 +291,16 @@ impl PeerConnectionInternal { | |
| if let Some(remote_port) = get_application_media_section_sctp_port(parsed_remote) { | ||
| if let Some(local_port) = get_application_media_section_sctp_port(parsed_local) | ||
| { | ||
| self.start_sctp(local_port, remote_port).await; | ||
| // TODO: Reuse the MediaDescription retrieved when looking for the message size. | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A small nitpick of the code from my end, if you agree with the proposal, which is retrieving the MediaDescription once, and reusing it, I can implement this in another PR. |
||
| let max_message_size = | ||
| get_application_media_section_max_message_size(parsed_remote) | ||
| .unwrap_or(SctpMaxMessageSize::DEFAULT_MESSAGE_SIZE); | ||
| self.start_sctp( | ||
| local_port, | ||
| remote_port, | ||
| SCTPTransportCapabilities { max_message_size }, | ||
| ) | ||
| .await; | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -460,17 +470,16 @@ impl PeerConnectionInternal { | |
| } | ||
|
|
||
| /// Start SCTP subsystem | ||
| async fn start_sctp(&self, local_port: u16, remote_port: u16) { | ||
| async fn start_sctp( | ||
| &self, | ||
| local_port: u16, | ||
| remote_port: u16, | ||
| sctp_transport_capabilities: SCTPTransportCapabilities, | ||
| ) { | ||
| // Start sctp | ||
| if let Err(err) = self | ||
| .sctp_transport | ||
| .start( | ||
| SCTPTransportCapabilities { | ||
| max_message_size: 0, | ||
| }, | ||
| local_port, | ||
| remote_port, | ||
| ) | ||
| .start(sctp_transport_capabilities, local_port, remote_port) | ||
| .await | ||
| { | ||
| log::warn!("Failed to start SCTP: {err}"); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1049,6 +1049,15 @@ pub(crate) fn get_application_media_section_sctp_port(desc: &SessionDescription) | |
| None | ||
| } | ||
|
|
||
| pub(crate) fn get_application_media_section_max_message_size( | ||
| desc: &SessionDescription, | ||
| ) -> Option<u32> { | ||
| get_application_media(desc)? | ||
| .attribute(ATTR_KEY_MAX_MESSAGE_SIZE)?? | ||
| .parse() | ||
| .ok() | ||
| } | ||
|
|
||
| pub(crate) fn get_by_mid<'a>( | ||
| search_mid: &str, | ||
| desc: &'a session_description::RTCSessionDescription, | ||
|
|
@@ -1065,18 +1074,17 @@ pub(crate) fn get_by_mid<'a>( | |
| None | ||
| } | ||
|
|
||
| pub(crate) fn get_application_media(desc: &SessionDescription) -> Option<&MediaDescription> { | ||
| desc.media_descriptions | ||
| .iter() | ||
| .find(|media_description| media_description.media_name.media == MEDIA_SECTION_APPLICATION) | ||
| } | ||
|
|
||
| /// have_data_channel return MediaDescription with MediaName equal application | ||
| pub(crate) fn have_data_channel( | ||
| desc: &session_description::RTCSessionDescription, | ||
| ) -> Option<&MediaDescription> { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've rewritten this function in particular to use If you agree with the refactor, I can do it in another PR. |
||
| if let Some(parsed) = &desc.parsed { | ||
| for d in &parsed.media_descriptions { | ||
| if d.media_name.media == MEDIA_SECTION_APPLICATION { | ||
| return Some(d); | ||
| } | ||
| } | ||
| } | ||
| None | ||
| get_application_media(desc.parsed.as_ref()?) | ||
| } | ||
|
|
||
| pub(crate) fn codecs_from_media_description( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to keep refactors to the minimum in order to keep this PR as small as possible. If you do agree with this comment, I can tackle it in another PR, otherwise, I will remove said comment.