Skip to content

Commit 97df01c

Browse files
committed
refactor: extract ListerStreamSourceBuilder
1 parent 115013e commit 97df01c

File tree

1 file changed

+121
-75
lines changed

1 file changed

+121
-75
lines changed

src/query/storages/system/src/temp_files_table.rs

Lines changed: 121 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16+
use std::future::Future;
1617
use std::sync::Arc;
1718

1819
use databend_common_catalog::plan::DataSourcePlan;
@@ -32,6 +33,7 @@ use databend_common_expression::BlockEntry;
3233
use databend_common_expression::DataBlock;
3334
use databend_common_expression::FromData;
3435
use databend_common_expression::Scalar;
36+
use databend_common_expression::SendableDataBlockStream;
3537
use databend_common_expression::TableDataType;
3638
use databend_common_expression::TableField;
3739
use databend_common_expression::TableSchemaRefExt;
@@ -43,17 +45,17 @@ use databend_common_pipeline_core::processors::OutputPort;
4345
use databend_common_pipeline_core::processors::ProcessorPtr;
4446
use databend_common_pipeline_core::query_spill_prefix;
4547
use databend_common_pipeline_core::Pipeline;
46-
use databend_common_pipeline_sources::AsyncSource;
47-
use databend_common_pipeline_sources::AsyncSourcer;
4848
use databend_common_pipeline_sources::EmptySource;
49+
use databend_common_pipeline_sources::StreamSource;
4950
use databend_common_storage::DataOperator;
51+
use futures::stream;
5052
use futures::stream::Chunks;
5153
use futures::stream::Take;
5254
use futures::StreamExt;
55+
use opendal::operator_futures::FutureLister;
5356
use opendal::Entry;
5457
use opendal::Lister;
5558
use opendal::Metakey;
56-
use opendal::Operator;
5759

5860
use crate::table::SystemTablePart;
5961

@@ -63,15 +65,15 @@ pub struct TempFilesTable {
6365

6466
#[async_trait::async_trait]
6567
impl Table for TempFilesTable {
66-
fn as_any(&self) -> &dyn Any {
67-
self
68-
}
69-
7068
fn is_local(&self) -> bool {
7169
// Follow the practice of `SyncOneBlockSystemTable::is_local`
7270
false
7371
}
7472

73+
fn as_any(&self) -> &dyn Any {
74+
self
75+
}
76+
7577
fn get_table_info(&self) -> &TableInfo {
7678
&self.table_info
7779
}
@@ -105,9 +107,7 @@ impl Table for TempFilesTable {
105107
}
106108

107109
pipeline.add_source(
108-
|output| {
109-
TempFilesTableAsyncSource::create(ctx.clone(), output, plan.push_downs.clone())
110-
},
110+
|output| TempFilesTable::create_source(ctx.clone(), output, plan.push_downs.clone()),
111111
1,
112112
)?;
113113

@@ -145,40 +145,39 @@ impl TempFilesTable {
145145

146146
Arc::new(Self { table_info })
147147
}
148-
}
149148

150-
struct TempFilesTableAsyncSource {
151-
finished: bool,
152-
context: Arc<dyn TableContext>,
153-
operator: Operator,
154-
location_prefix: String,
155-
entries_processed: usize,
156-
limit: usize,
157-
lister: Option<Chunks<Take<Lister>>>,
158-
}
159-
160-
impl TempFilesTableAsyncSource {
161-
pub fn create(
149+
pub fn create_source(
162150
ctx: Arc<dyn TableContext>,
163151
output: Arc<OutputPort>,
164152
push_downs: Option<PushDownInfo>,
165153
) -> Result<ProcessorPtr> {
166154
let tenant = ctx.get_tenant();
167155
let location_prefix = format!("{}/", query_spill_prefix(tenant.tenant_name(), ""));
168-
let limit = push_downs
169-
.as_ref()
170-
.and_then(|x| x.limit)
171-
.unwrap_or(usize::MAX);
172-
173-
AsyncSourcer::create(ctx.clone(), output, TempFilesTableAsyncSource {
174-
finished: false,
175-
context: ctx,
176-
operator: DataOperator::instance().operator(),
177-
location_prefix,
178-
entries_processed: 0,
179-
limit,
180-
lister: None,
181-
})
156+
let limit = push_downs.as_ref().and_then(|x| x.limit);
157+
158+
let operator = DataOperator::instance().operator();
159+
let lister = operator
160+
.lister_with(&location_prefix)
161+
.recursive(true)
162+
.metakey(Metakey::LastModified | Metakey::ContentLength);
163+
164+
let stream = {
165+
let prefix = location_prefix.clone();
166+
let mut counter = 0;
167+
let ctx = ctx.clone();
168+
let builder = ListerStreamSourceBuilder::with_lister_fut(lister);
169+
builder
170+
.with_limit_opt(limit)
171+
.with_chunk_size(MAX_BATCH_SIZE)
172+
.build(move |entries| {
173+
counter += entries.len();
174+
let block = Self::block_from_entries(&prefix, entries)?;
175+
ctx.set_status_info(format!("{} entries processed", counter).as_str());
176+
Ok(block)
177+
})?
178+
};
179+
180+
StreamSource::create(ctx, Some(stream), output)
182181
}
183182

184183
fn build_block(
@@ -210,29 +209,23 @@ impl TempFilesTableAsyncSource {
210209
)
211210
}
212211

213-
fn block_from_entries(&self, entries: Vec<opendal::Result<Entry>>) -> Result<DataBlock> {
212+
fn block_from_entries(location_prefix: &str, entries: Vec<Entry>) -> Result<DataBlock> {
214213
let num_items = entries.len();
215214
let mut temp_files_name: Vec<String> = Vec::with_capacity(num_items);
216215
let mut temp_files_content_length = Vec::with_capacity(num_items);
217216
let mut temp_files_last_modified = Vec::with_capacity(num_items);
218217
for entry in entries {
219-
let entry = entry?;
220218
let metadata = entry.metadata();
221219
if metadata.is_file() {
222-
temp_files_name.push(
223-
entry
224-
.path()
225-
.trim_start_matches(&self.location_prefix)
226-
.to_string(),
227-
);
220+
temp_files_name.push(entry.path().trim_start_matches(location_prefix).to_string());
228221

229222
temp_files_last_modified
230223
.push(metadata.last_modified().map(|x| x.timestamp_micros()));
231224
temp_files_content_length.push(metadata.content_length());
232225
}
233226
}
234227

235-
let data_block = Self::build_block(
228+
let data_block = TempFilesTable::build_block(
236229
temp_files_name,
237230
temp_files_content_length,
238231
temp_files_last_modified,
@@ -243,38 +236,91 @@ impl TempFilesTableAsyncSource {
243236

244237
const MAX_BATCH_SIZE: usize = 1000;
245238

246-
#[async_trait::async_trait]
247-
impl AsyncSource for TempFilesTableAsyncSource {
248-
const NAME: &'static str = "system.temp_files";
239+
pub struct ListerStreamSourceBuilder<T>
240+
where T: Future<Output = opendal::Result<Lister>> + Send + 'static
241+
{
242+
lister_fut: FutureLister<T>,
243+
limit: Option<usize>,
244+
chunk_size: usize,
245+
}
249246

250-
#[async_backtrace::framed]
251-
async fn generate(&mut self) -> Result<Option<DataBlock>> {
252-
if self.finished || self.limit == 0 {
253-
return Ok(None);
247+
impl<T> ListerStreamSourceBuilder<T>
248+
where T: Future<Output = opendal::Result<Lister>> + Send + 'static
249+
{
250+
pub fn with_lister_fut(lister_fut: FutureLister<T>) -> Self {
251+
Self {
252+
lister_fut,
253+
limit: None,
254+
chunk_size: MAX_BATCH_SIZE,
254255
}
256+
}
255257

256-
let lister = {
257-
if self.lister.is_none() {
258-
let lister = self
259-
.operator
260-
.lister_with(&self.location_prefix)
261-
.recursive(true)
262-
.metakey(Metakey::LastModified | Metakey::ContentLength)
263-
.await?;
264-
self.lister = Some(lister.take(self.limit).chunks(MAX_BATCH_SIZE));
265-
}
266-
self.lister.as_mut().unwrap()
267-
};
258+
pub fn with_limit(mut self, limit: usize) -> Self {
259+
self.limit = Some(limit);
260+
self
261+
}
268262

269-
if let Some(entries) = lister.next().await {
270-
let data_block = self.block_from_entries(entries)?;
271-
self.entries_processed += data_block.num_rows();
272-
self.context
273-
.set_status_info(&format!("{} entries processed", self.entries_processed));
274-
Ok(Some(data_block))
275-
} else {
276-
self.finished = true;
277-
Ok(None)
278-
}
263+
pub fn with_limit_opt(mut self, limit: Option<usize>) -> Self {
264+
self.limit = limit;
265+
self
279266
}
267+
268+
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
269+
self.chunk_size = chunk_size;
270+
self
271+
}
272+
273+
pub fn build(
274+
self,
275+
block_builder: (impl FnMut(Vec<Entry>) -> Result<DataBlock> + Sync + Send + 'static),
276+
) -> Result<SendableDataBlockStream> {
277+
stream_source_from_entry_lister_with_chunk_size(
278+
self.lister_fut,
279+
self.limit,
280+
self.chunk_size,
281+
block_builder,
282+
)
283+
}
284+
}
285+
286+
fn stream_source_from_entry_lister_with_chunk_size<T>(
287+
lister_fut: FutureLister<T>,
288+
limit: Option<usize>,
289+
chunk_size: usize,
290+
block_builder: (impl FnMut(Vec<Entry>) -> Result<DataBlock> + Sync + Send + 'static),
291+
) -> Result<SendableDataBlockStream>
292+
where
293+
T: Future<Output = opendal::Result<Lister>> + Send + 'static,
294+
{
295+
enum ListerState<U: Future<Output = opendal::Result<Lister>> + Send + 'static> {
296+
Uninitialized(FutureLister<U>),
297+
Initialized(Chunks<Take<Lister>>),
298+
}
299+
300+
let state = ListerState::<T>::Uninitialized(lister_fut);
301+
302+
let stream = stream::try_unfold(
303+
(state, block_builder),
304+
move |(mut state, mut builder)| async move {
305+
let mut lister = {
306+
match state {
307+
ListerState::Uninitialized(fut) => {
308+
let lister = fut.await?;
309+
lister.take(limit.unwrap_or(usize::MAX)).chunks(chunk_size)
310+
}
311+
ListerState::Initialized(l) => l,
312+
}
313+
};
314+
if let Some(entries) = lister.next().await {
315+
let entries = entries.into_iter().collect::<opendal::Result<Vec<_>>>()?;
316+
let data_block = builder(entries)?;
317+
state = ListerState::Initialized(lister);
318+
Ok(Some((data_block, (state, builder))))
319+
} else {
320+
Ok(None)
321+
}
322+
},
323+
);
324+
325+
Ok(stream.boxed())
280326
}

0 commit comments

Comments
 (0)