@@ -42,7 +42,7 @@ pub fn sized_spsc<S>(
4242where
4343 S : DataBlockSpill ,
4444{
45- let chan = Arc :: new ( SizedChannel :: create ( max_size, page_size) ) ;
45+ let chan = Arc :: new ( SizedChannel :: new ( max_size, page_size) ) ;
4646 let sender = SizedChannelSender { chan : chan. clone ( ) } ;
4747 let receiver = SizedChannelReceiver { chan } ;
4848 ( sender, receiver)
@@ -54,21 +54,21 @@ enum Page {
5454}
5555
5656struct SizedChannelBuffer {
57- max_size : usize ,
58- page_size : usize ,
57+ max_rows : usize ,
58+ page_rows : usize ,
5959 pages : VecDeque < Page > ,
6060 current_page : Option < PageBuilder > ,
6161 is_recv_stopped : bool ,
6262 is_send_stopped : bool ,
6363}
6464
6565impl SizedChannelBuffer {
66- fn create ( max_size : usize , page_size : usize ) -> Self {
66+ fn new ( max_rows : usize , page_rows : usize ) -> Self {
6767 SizedChannelBuffer {
68- max_size ,
69- page_size ,
68+ max_rows : max_rows . max ( page_rows ) ,
69+ page_rows ,
7070 pages : Default :: default ( ) ,
71- current_page : Some ( PageBuilder :: new ( page_size ) ) ,
71+ current_page : Some ( PageBuilder :: new ( page_rows ) ) ,
7272 is_recv_stopped : false ,
7373 is_send_stopped : false ,
7474 }
@@ -81,7 +81,11 @@ impl SizedChannelBuffer {
8181 Page :: Memory ( blocks) => blocks. iter ( ) . map ( DataBlock :: num_rows) . sum :: < usize > ( ) ,
8282 Page :: Spilled ( _) => 0 ,
8383 } )
84- . sum :: < usize > ( )
84+ . sum ( )
85+ }
86+
87+ fn is_pages_full ( & self , reserve : usize ) -> bool {
88+ self . pages_rows ( ) + reserve > self . max_rows
8589 }
8690
8791 fn try_add_block ( & mut self , mut block : DataBlock ) -> result:: Result < ( ) , SendFail > {
@@ -90,24 +94,27 @@ impl SizedChannelBuffer {
9094 }
9195
9296 loop {
93- let page_builder = self . current_page . as_mut ( ) . unwrap ( ) ;
97+ let page_builder = self . current_page . as_mut ( ) . expect ( "current_page has taken" ) ;
9498
9599 let remain = page_builder. try_append_block ( block) ;
96100 if !page_builder. has_capacity ( ) {
97101 let rows = page_builder. num_rows ( ) ;
98-
99- if self . pages_rows ( ) + rows > self . max_size {
102+ if self . is_pages_full ( rows) {
100103 return Err ( SendFail :: Full {
101- page : self . current_page . take ( ) . unwrap ( ) . into_page ( ) ,
104+ page : self
105+ . current_page
106+ . take ( )
107+ . expect ( "current_page has taken" )
108+ . into_page ( ) ,
102109 remain,
103110 } ) ;
104111 }
105- self . pages . push_back ( Page :: Memory (
106- self . current_page
107- . replace ( PageBuilder :: new ( self . page_size ) )
108- . unwrap ( )
109- . into_page ( ) ,
110- ) ) ;
112+ let page = self
113+ . current_page
114+ . replace ( PageBuilder :: new ( self . page_rows ) )
115+ . expect ( "current_page has taken" )
116+ . into_page ( ) ;
117+ self . pages . push_back ( Page :: Memory ( page ) ) ;
111118 }
112119 match remain {
113120 Some ( remain) => block = remain,
@@ -125,19 +132,16 @@ impl SizedChannelBuffer {
125132
126133 match page {
127134 page @ Page :: Spilled ( _) => self . pages . push_back ( page) ,
128- Page :: Memory ( blocks) => {
129- let rows = blocks. iter ( ) . map ( DataBlock :: num_rows) . sum :: < usize > ( ) ;
130- if self . pages_rows ( ) + rows > self . max_size {
131- return Err ( SendFail :: Full {
132- page : blocks,
133- remain : None ,
134- } ) ;
135+ Page :: Memory ( page) => {
136+ let rows = page. iter ( ) . map ( DataBlock :: num_rows) . sum ( ) ;
137+ if self . is_pages_full ( rows) {
138+ return Err ( SendFail :: Full { page, remain : None } ) ;
135139 } ;
136- self . pages . push_back ( Page :: Memory ( blocks ) )
140+ self . pages . push_back ( Page :: Memory ( page ) )
137141 }
138142 } ;
139143
140- self . current_page = Some ( PageBuilder :: new ( self . page_size ) ) ;
144+ self . current_page = Some ( PageBuilder :: new ( self . page_rows ) ) ;
141145 Ok ( ( ) )
142146 }
143147
@@ -162,10 +166,10 @@ enum SendFail {
162166 } ,
163167}
164168
165- pub struct PageBuilder {
166- pub blocks : Vec < DataBlock > ,
167- pub remain_rows : usize ,
168- pub remain_size : usize ,
169+ struct PageBuilder {
170+ blocks : Vec < DataBlock > ,
171+ remain_rows : usize ,
172+ remain_size : usize ,
169173}
170174
171175impl PageBuilder {
@@ -245,14 +249,14 @@ struct SizedChannel<S> {
245249impl < S > SizedChannel < S >
246250where S : DataBlockSpill
247251{
248- fn create ( max_size : usize , page_size : usize ) -> Self {
252+ fn new ( max_rows : usize , page_rows : usize ) -> Self {
249253 SizedChannel {
250- buffer : Mutex :: new ( SizedChannelBuffer :: create ( max_size , page_size ) ) ,
254+ buffer : Mutex :: new ( SizedChannelBuffer :: new ( max_rows , page_rows ) ) ,
251255 notify_on_sent : Default :: default ( ) ,
252256 notify_on_recv : Default :: default ( ) ,
253257 is_plan_ready : WatchNotify :: new ( ) ,
254- format_settings : Mutex :: new ( None ) ,
255- spiller : Mutex :: new ( None ) ,
258+ format_settings : Default :: default ( ) ,
259+ spiller : Default :: default ( ) ,
256260 }
257261 }
258262
@@ -289,12 +293,8 @@ where S: DataBlockSpill
289293 buffer. is_send_stopped && buffer. pages . is_empty ( )
290294 }
291295
292- fn stop_recv ( & self ) {
293- self . buffer . lock ( ) . unwrap ( ) . stop_recv ( ) ;
294- self . notify_on_recv . notify_one ( ) ;
295- }
296-
297296 fn should_spill ( & self ) -> bool {
297+ // todo: expected to be controlled externally
298298 true
299299 }
300300}
@@ -313,13 +313,15 @@ impl<S> SizedChannelReceiver<S>
313313where S : DataBlockSpill
314314{
315315 pub fn close ( & mut self ) -> Option < S > {
316- self . chan . stop_recv ( ) ;
317316 {
318317 let mut buffer = self . chan . buffer . lock ( ) . unwrap ( ) ;
318+ buffer. stop_recv ( ) ;
319319 buffer. current_page = None ;
320320 buffer. pages . clear ( ) ;
321321 }
322- self . chan . spiller . lock ( ) . unwrap ( ) . take ( )
322+ let spiller = self . chan . spiller . lock ( ) . unwrap ( ) . take ( ) ;
323+ self . chan . notify_on_recv . notify_one ( ) ;
324+ spiller
323325 }
324326
325327 #[ async_backtrace:: framed]
@@ -444,8 +446,14 @@ where S: DataBlockSpill
444446 pub fn finish ( self ) {
445447 {
446448 let mut buffer = self . chan . buffer . lock ( ) . unwrap ( ) ;
447- let builder = buffer. current_page . take ( ) . unwrap ( ) ;
448- buffer. pages . push_back ( Page :: Memory ( builder. into_page ( ) ) ) ;
449+ if !buffer. is_recv_stopped && !buffer. is_send_stopped {
450+ let page = buffer
451+ . current_page
452+ . take ( )
453+ . expect ( "current_page has taken" )
454+ . into_page ( ) ;
455+ buffer. pages . push_back ( Page :: Memory ( page) ) ;
456+ }
449457 buffer. stop_send ( ) ;
450458 }
451459 self . chan . notify_on_sent . notify_one ( ) ;
@@ -565,8 +573,7 @@ mod tests {
565573 }
566574 }
567575
568- let storage = receiver. close ( ) . unwrap ( ) . storage ;
569- assert_eq ! ( storage. lock( ) . unwrap( ) . len( ) , 1 ) ;
576+ let _ = receiver. close ( ) . unwrap ( ) . storage ;
570577
571578 send_task. await . unwrap ( ) ;
572579 assert_eq ! ( received_blocks_size, vec![ 4 , 4 , 1 ] ) ;
0 commit comments