@@ -57,7 +57,7 @@ struct SizedChannelBuffer {
5757 max_rows : usize ,
5858 page_rows : usize ,
5959 pages : VecDeque < Page > ,
60-
60+
6161 /// The current_page gets moved outside the lock to spilling and then moved back in, or cleared on close.
6262 /// There's a lot of unwrap here to make sure there's no unintended behavior that's not by design.
6363 current_page : Option < PageBuilder > ,
@@ -613,6 +613,53 @@ mod tests {
613613 assert_eq ! ( serializer. num_rows( ) , 3 ) ;
614614 }
615615
616+ #[ tokio:: test( flavor = "multi_thread" ) ]
617+ async fn test_spsc_abort ( ) {
618+ let ( mut sender, mut receiver) = sized_spsc :: < MockSpiller > ( 5 , 4 ) ;
619+
620+ let test_data = vec ! [
621+ DataBlock :: new_from_columns( vec![ Int32Type :: from_data( vec![ 1 , 2 , 3 ] ) ] ) ,
622+ DataBlock :: new_from_columns( vec![ Int32Type :: from_data( vec![ 1 , 2 ] ) ] ) ,
623+ DataBlock :: new_from_columns( vec![ Int32Type :: from_data( vec![ ] ) ] ) ,
624+ DataBlock :: new_from_columns( vec![ Int32Type :: from_data( vec![ 4 , 5 , 6 , 7 , 8 ] ) ] ) ,
625+ DataBlock :: new_from_columns( vec![ Int32Type :: from_data( vec![ 7 , 8 , 9 ] ) ] ) ,
626+ ] ;
627+
628+ let sender_data = test_data. clone ( ) ;
629+ let send_task = databend_common_base:: runtime:: spawn ( async move {
630+ let format_settings = FormatSettings :: default ( ) ;
631+ sender. plan_ready ( format_settings, None ) ;
632+
633+ for ( i, block) in sender_data. into_iter ( ) . enumerate ( ) {
634+ sender. send ( block) . await . unwrap ( ) ;
635+ if i == 3 {
636+ sender. abort ( ) ;
637+ return ;
638+ }
639+ }
640+ } ) ;
641+
642+ let mut received_blocks_size = Vec :: new ( ) ;
643+ loop {
644+ let ( serializer, is_end) = receiver
645+ . next_page ( & Wait :: Deadline ( Instant :: now ( ) + Duration :: from_secs ( 1 ) ) )
646+ . await
647+ . unwrap ( ) ;
648+
649+ if serializer. num_rows ( ) > 0 {
650+ received_blocks_size. push ( serializer. num_rows ( ) ) ;
651+ }
652+
653+ if is_end {
654+ break ;
655+ }
656+ }
657+
658+ send_task. await . unwrap ( ) ;
659+
660+ assert_eq ! ( received_blocks_size, vec![ 4 , 4 ] )
661+ }
662+
616663 fn data_block_strategy < N > ( len : usize ) -> impl Strategy < Value = DataBlock >
617664 where
618665 N : Number + Arbitrary + ' static ,
0 commit comments