@@ -20,19 +20,21 @@ use std::{
2020} ;
2121use tracing:: info;
2222
23- fn create_stream_client ( ) -> ClientTokio < ClientProvider , NoopSubscriber > {
23+ fn create_stream_client ( ) -> ( ClientTokio < ClientProvider , NoopSubscriber > , Map ) {
2424 let tls_materials_provider = TestTlsProvider { } ;
2525 let test_event_subscriber = NoopSubscriber { } ;
2626
27+ let client_map = Map :: new (
28+ Signer :: new ( b"default" ) ,
29+ 100 ,
30+ StdClock :: default ( ) ,
31+ test_event_subscriber. clone ( ) ,
32+ ) ;
33+
2734 let handshake_client = ClientProvider :: builder ( )
2835 . start (
2936 "127.0.0.1:0" . parse ( ) . unwrap ( ) ,
30- Map :: new (
31- Signer :: new ( b"default" ) ,
32- 100 ,
33- StdClock :: default ( ) ,
34- test_event_subscriber. clone ( ) ,
35- ) ,
37+ client_map. clone ( ) ,
3638 tls_materials_provider. clone ( ) ,
3739 test_event_subscriber. clone ( ) ,
3840 query_event,
@@ -49,7 +51,7 @@ fn create_stream_client() -> ClientTokio<ClientProvider, NoopSubscriber> {
4951 . unwrap ( ) ;
5052
5153 info ! ( "Client created" ) ;
52- stream_client
54+ ( stream_client, client_map )
5355}
5456
5557async fn create_handshake_server ( ) -> ServerProvider {
@@ -102,7 +104,7 @@ async fn setup_servers() {
102104 let unix_socket_path1 = PathBuf :: from ( "/tmp/shared1.sock" ) ;
103105 let unix_socket_path2 = PathBuf :: from ( "/tmp/shared2.sock" ) ;
104106
105- let stream_client = create_stream_client ( ) ;
107+ let ( stream_client, _ ) = create_stream_client ( ) ;
106108 let handshake_server = create_handshake_server ( ) . await ;
107109
108110 let handshake_addr = handshake_server. local_addr ( ) ;
@@ -219,7 +221,7 @@ async fn test_kernel_queue_full() {
219221 let test_event_subscriber = NoopSubscriber { } ;
220222 let unix_socket_path = PathBuf :: from ( "/tmp/kernel_queue_test.sock" ) ;
221223
222- let stream_client = create_stream_client ( ) ;
224+ let ( stream_client, _ ) = create_stream_client ( ) ;
223225 let handshake_server = create_handshake_server ( ) . await ;
224226
225227 let handshake_addr = handshake_server. local_addr ( ) ;
@@ -302,7 +304,7 @@ async fn test_kernel_queue_full_application_crash() {
302304 let test_event_subscriber = NoopSubscriber { } ;
303305 let unix_socket_path = PathBuf :: from ( "/tmp/kernel_queue_crash.sock" ) ;
304306
305- let stream_client = create_stream_client ( ) ;
307+ let ( stream_client, _ ) = create_stream_client ( ) ;
306308 let handshake_server = create_handshake_server ( ) . await ;
307309
308310 let handshake_addr = handshake_server. local_addr ( ) ;
@@ -363,3 +365,94 @@ async fn test_kernel_queue_full_application_crash() {
363365 assert_eq ! ( error. kind( ) , std:: io:: ErrorKind :: UnexpectedEof ) ;
364366 }
365367}
368+
369+ #[ tokio:: test]
370+ async fn test_dedup_check ( ) {
371+ init_tracing ( ) ;
372+ let test_event_subscriber = NoopSubscriber { } ;
373+ let unix_socket_path1 = PathBuf :: from ( "/tmp/dedup1.sock" ) ;
374+ let unix_socket_path2 = PathBuf :: from ( "/tmp/dedup2.sock" ) ;
375+
376+ let ( client, client_map) = create_stream_client ( ) ;
377+
378+ let handshake_server = create_handshake_server ( ) . await ;
379+ let handshake_addr = handshake_server. local_addr ( ) ;
380+ let res = client
381+ . handshake_with ( handshake_addr, server_name ( ) )
382+ . await
383+ . unwrap ( ) ;
384+ info ! ( "Handshake completed, {:?}" , res) ;
385+
386+ let manager_server1 = manager:: Server :: < ServerProvider , NoopSubscriber > :: builder ( )
387+ . with_address ( "127.0.0.1:0" . parse ( ) . unwrap ( ) )
388+ . with_protocol ( Protocol :: Tcp )
389+ . with_udp ( false )
390+ . with_workers ( NonZeroUsize :: new ( 1 ) . unwrap ( ) )
391+ . with_socket_path ( & unix_socket_path1)
392+ . build ( handshake_server. clone ( ) , test_event_subscriber. clone ( ) )
393+ . unwrap ( ) ;
394+
395+ info ! (
396+ "Manager server created at: {:?}" ,
397+ manager_server1. acceptor_addr( )
398+ ) ;
399+
400+ let manager_server2 = manager:: Server :: < ServerProvider , NoopSubscriber > :: builder ( )
401+ . with_address ( "127.0.0.1:0" . parse ( ) . unwrap ( ) )
402+ . with_protocol ( Protocol :: Tcp )
403+ . with_udp ( false )
404+ . with_workers ( NonZeroUsize :: new ( 1 ) . unwrap ( ) )
405+ . with_socket_path ( & unix_socket_path2)
406+ . build ( handshake_server. clone ( ) , test_event_subscriber. clone ( ) )
407+ . unwrap ( ) ;
408+
409+ info ! (
410+ "Manager server created at: {:?}" ,
411+ manager_server2. acceptor_addr( )
412+ ) ;
413+
414+ let app_server1 = create_application_server ( & unix_socket_path1, test_event_subscriber. clone ( ) ) ;
415+ let _app_server2 = create_application_server ( & unix_socket_path2, test_event_subscriber) ;
416+
417+ let acceptor_addr1 = manager_server1. acceptor_addr ( ) . unwrap ( ) ;
418+ let mut client_stream = client
419+ . connect ( handshake_addr, acceptor_addr1, server_name ( ) )
420+ . await
421+ . unwrap ( ) ;
422+ let ( mut server_stream, _addr) = app_server1. accept ( ) . await . unwrap ( ) ;
423+
424+ let test_message = b"Hello from server!" ;
425+ let data_exchange_result = tokio:: try_join!(
426+ async {
427+ let mut buffer = Vec :: <u8 >:: new( ) ;
428+ let bytes_read = client_stream. read_into( & mut buffer) . await ?;
429+ assert_eq!( & buffer[ ..bytes_read] , test_message) ;
430+ Ok :: <( ) , Box <dyn std:: error:: Error + Send + Sync >>( ( ) )
431+ } ,
432+ async {
433+ let mut message_slice = & test_message[ ..] ;
434+ server_stream. write_from( & mut message_slice) . await ?;
435+ Ok :: <( ) , Box <dyn std:: error:: Error + Send + Sync >>( ( ) )
436+ }
437+ ) ;
438+
439+ assert ! ( data_exchange_result. is_ok( ) ) ;
440+
441+ client_map. reset_all_senders ( ) ;
442+
443+ let acceptor_addr2 = manager_server2. acceptor_addr ( ) . unwrap ( ) ;
444+ let mut client_stream2 = client
445+ . connect ( handshake_addr, acceptor_addr2, server_name ( ) )
446+ . await
447+ . unwrap ( ) ;
448+
449+ let mut buffer: Vec < u8 > = Vec :: new ( ) ;
450+ let read_result = client_stream2. read_into ( & mut buffer) . await ;
451+ assert ! ( read_result. is_err( ) ) ;
452+ let error = read_result. unwrap_err ( ) ;
453+ info ! ( "Read error {:?}" , error) ;
454+ // FIXME should the server be sending a control packet on ReplayDefinitelyDetected?
455+ assert_eq ! ( error. kind( ) , std:: io:: ErrorKind :: UnexpectedEof ) ;
456+
457+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
458+ }
0 commit comments