@@ -539,7 +539,7 @@ mod tests {
539539 }
540540
541541 #[ tokio:: test( flavor = "multi_thread" ) ]
542- async fn test_sized_spsc_channel ( ) {
542+ async fn test_spsc_channel ( ) {
543543 let ( mut sender, mut receiver) = sized_spsc :: < MockSpiller > ( 5 , 4 ) ;
544544
545545 let test_data = vec ! [
@@ -579,6 +579,48 @@ mod tests {
579579 assert_eq ! ( received_blocks_size, vec![ 4 , 4 , 1 ] ) ;
580580 }
581581
582+ #[ tokio:: test( flavor = "multi_thread" ) ]
583+ async fn test_spsc_slow ( ) {
584+ let ( mut sender, mut receiver) = sized_spsc :: < MockSpiller > ( 5 , 4 ) ;
585+
586+ let test_data = vec ! [ DataBlock :: new_from_columns( vec![ Int32Type :: from_data(
587+ vec![ 1 , 2 , 3 ] ,
588+ ) ] ) ] ;
589+
590+ let wait = Arc :: new ( Notify :: new ( ) ) ;
591+
592+ let sender_wait = wait. clone ( ) ;
593+ let sender_data = test_data. clone ( ) ;
594+ let send_task = databend_common_base:: runtime:: spawn ( async move {
595+ let format_settings = FormatSettings :: default ( ) ;
596+ sender. plan_ready ( format_settings, None ) ;
597+
598+ sender_wait. notified ( ) . await ;
599+
600+ for block in sender_data {
601+ sender. send ( block) . await . unwrap ( ) ;
602+ }
603+ sender. finish ( ) ;
604+ } ) ;
605+
606+ for _ in 0 ..10 {
607+ let deadline = Instant :: now ( ) + Duration :: from_millis ( 1 ) ;
608+ let ( serializer, _) = receiver. next_page ( & Wait :: Deadline ( deadline) ) . await . unwrap ( ) ;
609+ assert_eq ! ( serializer. num_rows( ) , 0 ) ;
610+ }
611+
612+ wait. notify_one ( ) ;
613+
614+ let ( serializer, _) = receiver
615+ . next_page ( & Wait :: Deadline ( Instant :: now ( ) + Duration :: from_secs ( 1 ) ) )
616+ . await
617+ . unwrap ( ) ;
618+ let _ = receiver. close ( ) ;
619+ send_task. await . unwrap ( ) ;
620+
621+ assert_eq ! ( serializer. num_rows( ) , 3 ) ;
622+ }
623+
582624 fn data_block_strategy < N > ( len : usize ) -> impl Strategy < Value = DataBlock >
583625 where
584626 N : Number + Arbitrary + ' static ,
@@ -604,7 +646,7 @@ mod tests {
604646 }
605647
606648 #[ tokio:: test( flavor = "multi_thread" ) ]
607- async fn test_sized_spsc_channel_fuzz ( ) {
649+ async fn test_spsc_channel_fuzz ( ) {
608650 let mut runner = TestRunner :: default ( ) ;
609651 for _ in 0 ..100 {
610652 let ( has_spiller, max_size, page_size, test_data) = (
0 commit comments