1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: any:: Any ;
16+ use std:: future:: Future ;
1517use std:: sync:: Arc ;
1618
19+ use databend_common_catalog:: plan:: DataSourcePlan ;
20+ use databend_common_catalog:: plan:: PartStatistics ;
21+ use databend_common_catalog:: plan:: Partitions ;
22+ use databend_common_catalog:: plan:: PartitionsShuffleKind ;
1723use databend_common_catalog:: plan:: PushDownInfo ;
1824use databend_common_catalog:: table:: Table ;
1925use databend_common_catalog:: table_context:: TableContext ;
@@ -27,99 +33,85 @@ use databend_common_expression::BlockEntry;
2733use databend_common_expression:: DataBlock ;
2834use databend_common_expression:: FromData ;
2935use databend_common_expression:: Scalar ;
36+ use databend_common_expression:: SendableDataBlockStream ;
3037use databend_common_expression:: TableDataType ;
3138use databend_common_expression:: TableField ;
3239use databend_common_expression:: TableSchemaRefExt ;
3340use databend_common_expression:: Value ;
3441use databend_common_meta_app:: schema:: TableIdent ;
3542use databend_common_meta_app:: schema:: TableInfo ;
3643use databend_common_meta_app:: schema:: TableMeta ;
44+ use databend_common_pipeline_core:: processors:: OutputPort ;
45+ use databend_common_pipeline_core:: processors:: ProcessorPtr ;
3746use databend_common_pipeline_core:: query_spill_prefix;
47+ use databend_common_pipeline_core:: Pipeline ;
48+ use databend_common_pipeline_sources:: EmptySource ;
49+ use databend_common_pipeline_sources:: StreamSource ;
3850use databend_common_storage:: DataOperator ;
51+ use futures:: stream;
52+ use futures:: stream:: Chunks ;
53+ use futures:: stream:: Take ;
3954use futures:: StreamExt ;
40- use futures:: TryStreamExt ;
55+ use opendal:: operator_futures:: FutureLister ;
56+ use opendal:: Entry ;
57+ use opendal:: Lister ;
4158use opendal:: Metakey ;
4259
43- use crate :: table:: AsyncOneBlockSystemTable ;
44- use crate :: table:: AsyncSystemTable ;
60+ use crate :: table:: SystemTablePart ;
4561
4662pub struct TempFilesTable {
4763 table_info : TableInfo ,
4864}
4965
5066#[ async_trait:: async_trait]
51- impl AsyncSystemTable for TempFilesTable {
52- const NAME : & ' static str = "system.temp_files" ;
67+ impl Table for TempFilesTable {
68+ fn is_local ( & self ) -> bool {
69+ // Follow the practice of `SyncOneBlockSystemTable::is_local`
70+ false
71+ }
72+
73+ fn as_any ( & self ) -> & dyn Any {
74+ self
75+ }
5376
5477 fn get_table_info ( & self ) -> & TableInfo {
5578 & self . table_info
5679 }
5780
5881 #[ async_backtrace:: framed]
59- async fn get_full_data (
82+ async fn read_partitions (
6083 & self ,
61- ctx : Arc < dyn TableContext > ,
62- push_downs : Option < PushDownInfo > ,
63- ) -> Result < DataBlock > {
64- let tenant = ctx. get_tenant ( ) ;
65- let operator = DataOperator :: instance ( ) . operator ( ) ;
66-
67- let mut temp_files_name: Vec < String > = vec ! [ ] ;
68- let mut temp_files_content_length = vec ! [ ] ;
69- let mut temp_files_last_modified = vec ! [ ] ;
84+ _ctx : Arc < dyn TableContext > ,
85+ _push_downs : Option < PushDownInfo > ,
86+ _dry_run : bool ,
87+ ) -> Result < ( PartStatistics , Partitions ) > {
88+ Ok ( (
89+ PartStatistics :: default ( ) ,
90+ Partitions :: create ( PartitionsShuffleKind :: Seq , vec ! [ Arc :: new( Box :: new(
91+ SystemTablePart ,
92+ ) ) ] ) ,
93+ ) )
94+ }
7095
71- let location_prefix = format ! ( "{}/" , query_spill_prefix( tenant. tenant_name( ) , "" ) ) ;
72- if let Ok ( lister) = operator
73- . lister_with ( & location_prefix)
74- . recursive ( true )
75- . metakey ( Metakey :: LastModified | Metakey :: ContentLength )
76- . await
77- {
78- let limit = push_downs. and_then ( |x| x. limit ) . unwrap_or ( usize:: MAX ) ;
79- let mut lister = lister. take ( limit) ;
80-
81- while let Some ( entry) = lister. try_next ( ) . await ? {
82- let metadata = entry. metadata ( ) ;
83-
84- if metadata. is_file ( ) {
85- temp_files_name. push (
86- entry
87- . path ( )
88- . trim_start_matches ( & location_prefix)
89- . to_string ( ) ,
90- ) ;
91-
92- temp_files_last_modified
93- . push ( metadata. last_modified ( ) . map ( |x| x. timestamp_micros ( ) ) ) ;
94- temp_files_content_length. push ( metadata. content_length ( ) ) ;
95- }
96- }
96+ fn read_data (
97+ & self ,
98+ ctx : Arc < dyn TableContext > ,
99+ plan : & DataSourcePlan ,
100+ pipeline : & mut Pipeline ,
101+ _put_cache : bool ,
102+ ) -> Result < ( ) > {
103+ // avoid duplicate read in cluster mode.
104+ if plan. parts . partitions . is_empty ( ) {
105+ pipeline. add_source ( EmptySource :: create, 1 ) ?;
106+ return Ok ( ( ) ) ;
97107 }
98108
99- let num_rows = temp_files_name. len ( ) ;
100- let data_block = DataBlock :: new (
101- vec ! [
102- BlockEntry :: new(
103- DataType :: String ,
104- Value :: Scalar ( Scalar :: String ( "Spill" . to_string( ) ) ) ,
105- ) ,
106- BlockEntry :: new(
107- DataType :: String ,
108- Value :: Column ( StringType :: from_data( temp_files_name) ) ,
109- ) ,
110- BlockEntry :: new(
111- DataType :: Number ( NumberDataType :: UInt64 ) ,
112- Value :: Column ( NumberType :: from_data( temp_files_content_length) ) ,
113- ) ,
114- BlockEntry :: new(
115- DataType :: Timestamp . wrap_nullable( ) ,
116- Value :: Column ( TimestampType :: from_opt_data( temp_files_last_modified) ) ,
117- ) ,
118- ] ,
119- num_rows,
120- ) ;
109+ pipeline. add_source (
110+ |output| TempFilesTable :: create_source ( ctx. clone ( ) , output, plan. push_downs . clone ( ) ) ,
111+ 1 ,
112+ ) ?;
121113
122- Ok ( data_block . consume_convert_to_full ( ) )
114+ Ok ( ( ) )
123115 }
124116}
125117
@@ -151,6 +143,184 @@ impl TempFilesTable {
151143 ..Default :: default ( )
152144 } ;
153145
154- AsyncOneBlockSystemTable :: create ( Self { table_info } )
146+ Arc :: new ( Self { table_info } )
147+ }
148+
149+ pub fn create_source (
150+ ctx : Arc < dyn TableContext > ,
151+ output : Arc < OutputPort > ,
152+ push_downs : Option < PushDownInfo > ,
153+ ) -> Result < ProcessorPtr > {
154+ let tenant = ctx. get_tenant ( ) ;
155+ let location_prefix = format ! ( "{}/" , query_spill_prefix( tenant. tenant_name( ) , "" ) ) ;
156+ let limit = push_downs. as_ref ( ) . and_then ( |x| x. limit ) ;
157+
158+ let operator = DataOperator :: instance ( ) . operator ( ) ;
159+ let lister = operator
160+ . lister_with ( & location_prefix)
161+ . recursive ( true )
162+ . metakey ( Metakey :: LastModified | Metakey :: ContentLength ) ;
163+
164+ let stream = {
165+ let prefix = location_prefix. clone ( ) ;
166+ let mut counter = 0 ;
167+ let ctx = ctx. clone ( ) ;
168+ let builder = ListerStreamSourceBuilder :: with_lister_fut ( lister) ;
169+ builder
170+ . limit_opt ( limit)
171+ . chunk_size ( MAX_BATCH_SIZE )
172+ . build ( move |entries| {
173+ counter += entries. len ( ) ;
174+ let block = Self :: block_from_entries ( & prefix, entries) ?;
175+ ctx. set_status_info ( format ! ( "{} entries processed" , counter) . as_str ( ) ) ;
176+ Ok ( block)
177+ } ) ?
178+ } ;
179+
180+ StreamSource :: create ( ctx, Some ( stream) , output)
181+ }
182+
183+ fn build_block (
184+ names : Vec < String > ,
185+ file_lens : Vec < u64 > ,
186+ file_last_modifieds : Vec < Option < i64 > > ,
187+ ) -> DataBlock {
188+ let row_number = names. len ( ) ;
189+ DataBlock :: new (
190+ vec ! [
191+ BlockEntry :: new(
192+ DataType :: String ,
193+ Value :: Scalar ( Scalar :: String ( "Spill" . to_string( ) ) ) ,
194+ ) ,
195+ BlockEntry :: new(
196+ DataType :: String ,
197+ Value :: Column ( StringType :: from_data( names) ) ,
198+ ) ,
199+ BlockEntry :: new(
200+ DataType :: Number ( NumberDataType :: UInt64 ) ,
201+ Value :: Column ( NumberType :: from_data( file_lens) ) ,
202+ ) ,
203+ BlockEntry :: new(
204+ DataType :: Timestamp . wrap_nullable( ) ,
205+ Value :: Column ( TimestampType :: from_opt_data( file_last_modifieds) ) ,
206+ ) ,
207+ ] ,
208+ row_number,
209+ )
210+ }
211+
212+ fn block_from_entries ( location_prefix : & str , entries : Vec < Entry > ) -> Result < DataBlock > {
213+ let num_items = entries. len ( ) ;
214+ let mut temp_files_name: Vec < String > = Vec :: with_capacity ( num_items) ;
215+ let mut temp_files_content_length = Vec :: with_capacity ( num_items) ;
216+ let mut temp_files_last_modified = Vec :: with_capacity ( num_items) ;
217+ for entry in entries {
218+ let metadata = entry. metadata ( ) ;
219+ if metadata. is_file ( ) {
220+ temp_files_name. push ( entry. path ( ) . trim_start_matches ( location_prefix) . to_string ( ) ) ;
221+
222+ temp_files_last_modified
223+ . push ( metadata. last_modified ( ) . map ( |x| x. timestamp_micros ( ) ) ) ;
224+ temp_files_content_length. push ( metadata. content_length ( ) ) ;
225+ }
226+ }
227+
228+ let data_block = TempFilesTable :: build_block (
229+ temp_files_name,
230+ temp_files_content_length,
231+ temp_files_last_modified,
232+ ) ;
233+ Ok ( data_block)
234+ }
235+ }
236+
237+ const MAX_BATCH_SIZE : usize = 1000 ;
238+
239+ pub struct ListerStreamSourceBuilder < T >
240+ where T : Future < Output = opendal:: Result < Lister > > + Send + ' static
241+ {
242+ lister_fut : FutureLister < T > ,
243+ limit : Option < usize > ,
244+ chunk_size : usize ,
245+ }
246+
247+ impl < T > ListerStreamSourceBuilder < T >
248+ where T : Future < Output = opendal:: Result < Lister > > + Send + ' static
249+ {
250+ pub fn with_lister_fut ( lister_fut : FutureLister < T > ) -> Self {
251+ Self {
252+ lister_fut,
253+ limit : None ,
254+ chunk_size : MAX_BATCH_SIZE ,
255+ }
256+ }
257+
258+ pub fn limit ( mut self , limit : usize ) -> Self {
259+ self . limit = Some ( limit) ;
260+ self
155261 }
262+
263+ pub fn limit_opt ( mut self , limit : Option < usize > ) -> Self {
264+ self . limit = limit;
265+ self
266+ }
267+
268+ pub fn chunk_size ( mut self , chunk_size : usize ) -> Self {
269+ self . chunk_size = chunk_size;
270+ self
271+ }
272+
273+ pub fn build (
274+ self ,
275+ block_builder : ( impl FnMut ( Vec < Entry > ) -> Result < DataBlock > + Sync + Send + ' static ) ,
276+ ) -> Result < SendableDataBlockStream > {
277+ stream_source_from_entry_lister_with_chunk_size (
278+ self . lister_fut ,
279+ self . limit ,
280+ self . chunk_size ,
281+ block_builder,
282+ )
283+ }
284+ }
285+
286+ fn stream_source_from_entry_lister_with_chunk_size < T > (
287+ lister_fut : FutureLister < T > ,
288+ limit : Option < usize > ,
289+ chunk_size : usize ,
290+ block_builder : ( impl FnMut ( Vec < Entry > ) -> Result < DataBlock > + Sync + Send + ' static ) ,
291+ ) -> Result < SendableDataBlockStream >
292+ where
293+ T : Future < Output = opendal:: Result < Lister > > + Send + ' static ,
294+ {
295+ enum ListerState < U : Future < Output = opendal:: Result < Lister > > + Send + ' static > {
296+ Uninitialized ( FutureLister < U > ) ,
297+ Initialized ( Chunks < Take < Lister > > ) ,
298+ }
299+
300+ let state = ListerState :: < T > :: Uninitialized ( lister_fut) ;
301+
302+ let stream = stream:: try_unfold (
303+ ( state, block_builder) ,
304+ move |( mut state, mut builder) | async move {
305+ let mut lister = {
306+ match state {
307+ ListerState :: Uninitialized ( fut) => {
308+ let lister = fut. await ?;
309+ lister. take ( limit. unwrap_or ( usize:: MAX ) ) . chunks ( chunk_size)
310+ }
311+ ListerState :: Initialized ( l) => l,
312+ }
313+ } ;
314+ if let Some ( entries) = lister. next ( ) . await {
315+ let entries = entries. into_iter ( ) . collect :: < opendal:: Result < Vec < _ > > > ( ) ?;
316+ let data_block = builder ( entries) ?;
317+ state = ListerState :: Initialized ( lister) ;
318+ Ok ( Some ( ( data_block, ( state, builder) ) ) )
319+ } else {
320+ Ok ( None )
321+ }
322+ } ,
323+ ) ;
324+
325+ Ok ( stream. boxed ( ) )
156326}
0 commit comments