11use std:: {
2+ future:: Future ,
23 ptr:: { copy, copy_nonoverlapping} ,
34 rc:: Rc ,
45 sync:: Arc ,
@@ -10,6 +11,7 @@ use monoio::{
1011 buf:: IoBufMut ,
1112 io:: { AsyncReadRent , AsyncReadRentExt , AsyncWriteRent , AsyncWriteRentExt , Splitable } ,
1213 net:: { TcpConnectOpts , TcpStream } ,
14+ BufResult ,
1315} ;
1416use monoio_rustls_fork_shadow_tls:: TlsConnector ;
1517use rand:: { prelude:: Distribution , seq:: SliceRandom , Rng } ;
@@ -159,11 +161,11 @@ impl ShadowTlsClient {
159161 v3 : V3Mode ,
160162 ) -> anyhow:: Result < Self > {
161163 let mut root_store = RootCertStore :: empty ( ) ;
162- root_store. add_server_trust_anchors ( webpki_roots:: TLS_SERVER_ROOTS . 0 . iter ( ) . map ( |ta| {
164+ root_store. add_server_trust_anchors ( webpki_roots:: TLS_SERVER_ROOTS . iter ( ) . map ( |ta| {
163165 OwnedTrustAnchor :: from_subject_spki_name_constraints (
164- ta. subject ,
165- ta. spki ,
166- ta. name_constraints ,
166+ ta. subject . as_ref ( ) ,
167+ ta. subject_public_key_info . as_ref ( ) ,
168+ ta. name_constraints . as_ref ( ) . map ( |n| n . as_ref ( ) ) ,
167169 )
168170 } ) ) ;
169171 // TLS 1.2 and TLS 1.3 is enabled.
@@ -217,12 +219,12 @@ impl ShadowTlsClient {
217219 }
218220
219221 /// Main relay for V2 protocol.
220- async fn relay_v2 ( & self , mut in_stream : TcpStream ) -> anyhow:: Result < ( ) > {
221- let ( mut out_stream, hash, session) = self . connect_v2 ( ) . await ?;
222+ async fn relay_v2 ( & self , in_stream : TcpStream ) -> anyhow:: Result < ( ) > {
223+ let ( out_stream, hash, session) = self . connect_v2 ( ) . await ?;
222224 let mut hash_8b = [ 0 ; 8 ] ;
223225 unsafe { std:: ptr:: copy_nonoverlapping ( hash. as_ptr ( ) , hash_8b. as_mut_ptr ( ) , 8 ) } ;
224- let ( out_r, mut out_w) = out_stream. split ( ) ;
225- let ( mut in_r, mut in_w) = in_stream. split ( ) ;
226+ let ( out_r, mut out_w) = out_stream. into_split ( ) ;
227+ let ( mut in_r, mut in_w) = in_stream. into_split ( ) ;
226228 let mut session_filtered_out_r = crate :: helper_v2:: SessionFilterStream :: new ( session, out_r) ;
227229 let ( a, b) = monoio:: join!(
228230 copy_without_application_data( & mut session_filtered_out_r, & mut in_w) ,
@@ -469,78 +471,72 @@ impl<S: AsyncReadRent> StreamWrapper<S> {
469471}
470472
471473impl < S : AsyncWriteRent > AsyncWriteRent for StreamWrapper < S > {
472- type WriteFuture < ' a , T > = S :: WriteFuture < ' a , T > where
473- T : monoio:: buf:: IoBuf + ' a , Self : ' a ;
474- type WritevFuture < ' a , T > = S :: WritevFuture < ' a , T > where
475- T : monoio:: buf:: IoVecBuf + ' a , Self : ' a ;
476- type FlushFuture < ' a > = S :: FlushFuture < ' a > where Self : ' a ;
477- type ShutdownFuture < ' a > = S :: ShutdownFuture < ' a > where Self : ' a ;
478-
479- fn write < T : monoio:: buf:: IoBuf > ( & mut self , buf : T ) -> Self :: WriteFuture < ' _ , T > {
474+ #[ inline]
475+ fn write < T : monoio:: buf:: IoBuf > (
476+ & mut self ,
477+ buf : T ,
478+ ) -> impl Future < Output = BufResult < usize , T > > {
480479 self . raw . write ( buf)
481480 }
482- fn writev < T : monoio:: buf:: IoVecBuf > ( & mut self , buf_vec : T ) -> Self :: WritevFuture < ' _ , T > {
481+ #[ inline]
482+ fn writev < T : monoio:: buf:: IoVecBuf > (
483+ & mut self ,
484+ buf_vec : T ,
485+ ) -> impl Future < Output = BufResult < usize , T > > {
483486 self . raw . writev ( buf_vec)
484487 }
485- fn flush ( & mut self ) -> Self :: FlushFuture < ' _ > {
488+ #[ inline]
489+ fn flush ( & mut self ) -> impl Future < Output = std:: io:: Result < ( ) > > {
486490 self . raw . flush ( )
487491 }
488- fn shutdown ( & mut self ) -> Self :: ShutdownFuture < ' _ > {
492+ #[ inline]
493+ fn shutdown ( & mut self ) -> impl Future < Output = std:: io:: Result < ( ) > > {
489494 self . raw . shutdown ( )
490495 }
491496}
492497
493498impl < S : AsyncReadRent > AsyncReadRent for StreamWrapper < S > {
494- type ReadFuture < ' a , B > = impl std:: future:: Future < Output = monoio:: BufResult < usize , B > > +' a where
495- B : monoio:: buf:: IoBufMut + ' a , S : ' a ;
496- type ReadvFuture < ' a , B > = impl std:: future:: Future < Output = monoio:: BufResult < usize , B > > +' a where
497- B : monoio:: buf:: IoVecBufMut + ' a , S : ' a ;
498-
499499 // uncancelable
500- fn read < T : monoio:: buf:: IoBufMut > ( & mut self , mut buf : T ) -> Self :: ReadFuture < ' _ , T > {
501- async move {
502- loop {
503- let owned_buf = self . read_buf . as_mut ( ) . unwrap ( ) ;
504- let data_len = owned_buf. len ( ) - self . read_pos ;
505- // there is enough data to copy
506- if data_len > 0 {
507- let to_copy = buf. bytes_total ( ) . min ( data_len) ;
508- unsafe {
509- copy_nonoverlapping (
510- owned_buf. as_ptr ( ) . add ( self . read_pos ) ,
511- buf. write_ptr ( ) ,
512- to_copy,
513- ) ;
514- buf. set_init ( to_copy) ;
515- } ;
516- self . read_pos += to_copy;
517- return ( Ok ( to_copy) , buf) ;
518- }
500+ async fn read < T : monoio:: buf:: IoBufMut > ( & mut self , mut buf : T ) -> BufResult < usize , T > {
501+ loop {
502+ let owned_buf = self . read_buf . as_mut ( ) . unwrap ( ) ;
503+ let data_len = owned_buf. len ( ) - self . read_pos ;
504+ // there is enough data to copy
505+ if data_len > 0 {
506+ let to_copy = buf. bytes_total ( ) . min ( data_len) ;
507+ unsafe {
508+ copy_nonoverlapping (
509+ owned_buf. as_ptr ( ) . add ( self . read_pos ) ,
510+ buf. write_ptr ( ) ,
511+ to_copy,
512+ ) ;
513+ buf. set_init ( to_copy) ;
514+ } ;
515+ self . read_pos += to_copy;
516+ return ( Ok ( to_copy) , buf) ;
517+ }
519518
520- // no data now
521- match self . feed_data ( ) . await {
522- Ok ( 0 ) => return ( Ok ( 0 ) , buf) ,
523- Ok ( _) => continue ,
524- Err ( e) => return ( Err ( e) , buf) ,
525- }
519+ // no data now
520+ match self . feed_data ( ) . await {
521+ Ok ( 0 ) => return ( Ok ( 0 ) , buf) ,
522+ Ok ( _) => continue ,
523+ Err ( e) => return ( Err ( e) , buf) ,
526524 }
527525 }
528526 }
529527
530- fn readv < T : monoio:: buf:: IoVecBufMut > ( & mut self , mut buf : T ) -> Self :: ReadvFuture < ' _ , T > {
531- async move {
532- let slice = match monoio:: buf:: IoVecWrapperMut :: new ( buf) {
533- Ok ( slice) => slice,
534- Err ( buf) => return ( Ok ( 0 ) , buf) ,
535- } ;
536-
537- let ( result, slice) = self . read ( slice) . await ;
538- buf = slice. into_inner ( ) ;
539- if let Ok ( n) = result {
540- unsafe { buf. set_init ( n) } ;
541- }
542- ( result, buf)
528+ async fn readv < T : monoio:: buf:: IoVecBufMut > ( & mut self , mut buf : T ) -> BufResult < usize , T > {
529+ let slice = match monoio:: buf:: IoVecWrapperMut :: new ( buf) {
530+ Ok ( slice) => slice,
531+ Err ( buf) => return ( Ok ( 0 ) , buf) ,
532+ } ;
533+
534+ let ( result, slice) = self . read ( slice) . await ;
535+ buf = slice. into_inner ( ) ;
536+ if let Ok ( n) = result {
537+ unsafe { buf. set_init ( n) } ;
543538 }
539+ ( result, buf)
544540 }
545541}
546542
0 commit comments