Skip to content

Commit dd452b7

Browse files
committed
feat: Support multiple file scanning for infer_schema
1 parent 41b221d commit dd452b7

File tree

6 files changed

+98
-90
lines changed

6 files changed

+98
-90
lines changed

src/query/service/src/table_functions/infer_schema/source.rs

Lines changed: 69 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use arrow_csv::reader::Format;
2222
use arrow_json::reader::infer_json_schema_from_iterator;
2323
use arrow_json::reader::ValueIter;
2424
use arrow_schema::ArrowError;
25+
use arrow_schema::Schema;
2526
use arrow_schema::Schema as ArrowSchema;
2627
use bytes::BufMut;
2728
use databend_common_ast::ast::FileLocation;
@@ -48,15 +49,14 @@ use databend_common_storage::init_stage_operator;
4849
use databend_common_storage::read_parquet_schema_async_rs;
4950
use databend_common_storage::StageFilesInfo;
5051
use databend_common_users::Object;
52+
use futures_util::future::try_join_all;
5153
use opendal::Operator;
5254
use opendal::Scheme;
5355

5456
use crate::table_functions::infer_schema::infer_schema_table::INFER_SCHEMA;
5557
use crate::table_functions::infer_schema::table_args::InferSchemaArgsParsed;
5658

5759
const DEFAULT_BYTES: u64 = 10;
58-
const MAX_ZIP_FILE_SIZE: u64 = 20 * 1024 * 1024;
59-
const MAX_COMPRESS_FILE_SIZE: u64 = 100 * 1024 * 1024;
6060

6161
pub(crate) struct InferSchemaSource {
6262
is_finished: bool,
@@ -138,94 +138,90 @@ impl AsyncSource for InferSchemaSource {
138138
};
139139
let operator = init_stage_operator(&stage_info)?;
140140

141-
let first_file = files_info.first_file(&operator).await?;
142-
let file_format_params = match &self.args_parsed.file_format {
143-
Some(f) => self.ctx.get_file_format(f).await?,
144-
None => stage_info.file_format_params.clone(),
145-
};
146-
let schema = match (first_file.as_ref(), file_format_params) {
147-
(None, _) => return Ok(None),
148-
(Some(first_file), FileFormatParams::Parquet(_)) => {
149-
let arrow_schema = read_parquet_schema_async_rs(
150-
&operator,
151-
&first_file.path,
152-
Some(first_file.size),
153-
)
154-
.await?;
155-
TableSchema::try_from(&arrow_schema)?
156-
}
157-
(Some(first_file), FileFormatParams::Csv(params)) => {
158-
let escape = if params.escape.is_empty() {
159-
None
160-
} else {
161-
Some(params.escape.as_bytes()[0])
162-
};
141+
let stage_file_infos = files_info.list(&operator, 1, None).await?;
142+
let infer_schema_futures = stage_file_infos.iter().map(|file| async {
143+
let file_format_params = match &self.args_parsed.file_format {
144+
Some(f) => self.ctx.get_file_format(f).await?,
145+
None => stage_info.file_format_params.clone(),
146+
};
147+
let schema = match file_format_params {
148+
FileFormatParams::Csv(params) => {
149+
let escape = if params.escape.is_empty() {
150+
None
151+
} else {
152+
Some(params.escape.as_bytes()[0])
153+
};
163154

164-
let mut format = Format::default()
165-
.with_delimiter(params.field_delimiter.as_bytes()[0])
166-
.with_quote(params.quote.as_bytes()[0])
167-
.with_header(params.headers != 0);
168-
if let Some(escape) = escape {
169-
format = format.with_escape(escape);
170-
}
155+
let mut format = Format::default()
156+
.with_delimiter(params.field_delimiter.as_bytes()[0])
157+
.with_quote(params.quote.as_bytes()[0])
158+
.with_header(params.headers != 0);
159+
if let Some(escape) = escape {
160+
format = format.with_escape(escape);
161+
}
171162

172-
let arrow_schema = read_metadata_async(
173-
&first_file.path,
174-
&operator,
175-
Some(first_file.size),
176-
self.args_parsed.max_records,
177-
|reader, max_record| format.infer_schema(reader, max_record).map_err(Some),
178-
)
179-
.await?;
180-
TableSchema::try_from(&arrow_schema)?
181-
}
182-
(Some(first_file), FileFormatParams::NdJson(_)) => {
183-
let arrow_schema = read_metadata_async(
184-
&first_file.path,
185-
&operator,
186-
Some(first_file.size),
187-
self.args_parsed.max_records,
188-
|reader, max_record| {
189-
let mut records = ValueIter::new(reader, max_record);
163+
read_metadata_async(
164+
&file.path,
165+
&operator,
166+
Some(file.size),
167+
self.args_parsed.max_records,
168+
|reader, max_record| format.infer_schema(reader, max_record).map_err(Some),
169+
)
170+
.await?
171+
}
172+
FileFormatParams::NdJson(_) => {
173+
read_metadata_async(
174+
&file.path,
175+
&operator,
176+
Some(file.size),
177+
self.args_parsed.max_records,
178+
|reader, max_record| {
179+
let mut records = ValueIter::new(reader, max_record);
190180

191-
let schema = if let Some(max_record) = max_record {
192-
let mut tmp: Vec<std::result::Result<_, ArrowError>> =
193-
Vec::with_capacity(max_record);
181+
let schema = if let Some(max_record) = max_record {
182+
let mut tmp: Vec<std::result::Result<_, ArrowError>> =
183+
Vec::with_capacity(max_record);
194184

195-
for result in records {
196-
tmp.push(Ok(result.map_err(|_| None)?));
197-
}
198-
infer_json_schema_from_iterator(tmp.into_iter()).map_err(Some)?
199-
} else {
200-
infer_json_schema_from_iterator(&mut records).map_err(Some)?
201-
};
185+
for result in records {
186+
tmp.push(Ok(result.map_err(|_| None)?));
187+
}
188+
infer_json_schema_from_iterator(tmp.into_iter()).map_err(Some)?
189+
} else {
190+
infer_json_schema_from_iterator(&mut records).map_err(Some)?
191+
};
202192

203-
Ok((schema, 0))
204-
},
205-
)
206-
.await?;
207-
TableSchema::try_from(&arrow_schema)?
208-
}
209-
_ => {
210-
return Err(ErrorCode::BadArguments(
211-
"infer_schema is currently limited to format Parquet, CSV and NDJSON",
212-
));
213-
}
214-
};
193+
Ok((schema, 0))
194+
},
195+
)
196+
.await?
197+
}
198+
FileFormatParams::Parquet(_) => {
199+
read_parquet_schema_async_rs(&operator, &file.path, Some(file.size)).await?
200+
}
201+
_ => {
202+
return Err(ErrorCode::BadArguments(
203+
"infer_schema is currently limited to format Parquet, CSV and NDJSON",
204+
));
205+
}
206+
};
207+
Ok(schema)
208+
});
209+
let arrow_schema = Schema::try_merge(try_join_all(infer_schema_futures).await?)?;
210+
let table_schema = TableSchema::try_from(&arrow_schema)?;
215211

216212
let mut names: Vec<String> = vec![];
217213
let mut types: Vec<String> = vec![];
218214
let mut nulls: Vec<bool> = vec![];
219215

220-
for field in schema.fields().iter() {
216+
for field in table_schema.fields().iter() {
221217
names.push(field.name().to_string());
222218

223219
let non_null_type = field.data_type().remove_recursive_nullable();
224220
types.push(non_null_type.sql_name());
225221
nulls.push(field.is_nullable());
226222
}
227223

228-
let order_ids = (0..schema.fields().len() as u64).collect::<Vec<_>>();
224+
let order_ids = (0..table_schema.fields().len() as u64).collect::<Vec<_>>();
229225

230226
let block = DataBlock::new_from_columns(vec![
231227
StringType::from_data(names),
@@ -254,18 +250,7 @@ pub async fn read_metadata_async<
254250
Some(n) => n,
255251
};
256252
let algo = CompressAlgorithm::from_path(path);
257-
let fn_check_data_size = |size: u64| {
258-
if (matches!(algo, Some(CompressAlgorithm::Zip)) && size > MAX_ZIP_FILE_SIZE)
259-
|| size > MAX_COMPRESS_FILE_SIZE
260-
{
261-
return Err(ErrorCode::InvalidCompressionData(
262-
"Compression data is too large",
263-
));
264-
}
265-
Ok(())
266-
};
267253

268-
fn_check_data_size(file_size)?;
269254
let mut buf = Vec::new();
270255
let mut offset: u64 = 0;
271256
let mut chunk_size: u64 =
@@ -293,7 +278,6 @@ pub async fn read_metadata_async<
293278
} else {
294279
Cow::Borrowed(&buf)
295280
};
296-
fn_check_data_size(bytes.len() as u64)?;
297281

298282
if !bytes.is_empty() || offset >= file_size {
299283
match func_infer_schema(Cursor::new(bytes.as_slice()), max_records) {

tests/data/csv/merge/numbers.csv

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
col1,col2,col3,col4,col5
2+
0,1,2,3,4
3+
5,6,7,8,9
4+
10,11,12,13,14
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
col1,col2,col3,col4,col5
2+
0,1,2,3,4
3+
5,6,7,8,9
4+
10,11,12,13,14
5+
a,b,c,d,e
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"col1":0,"col2":1,"col3":2,"col4":3,"col5":4}
2+
{"col1":5,"col2":6,"col3":7,"col4":8,"col5":9}
3+
{"col1":10,"col2":11,"col3":12,"col4":13,"col5":14}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"col1":0,"col2":1,"col3":2,"col4":3,"col5":4}
2+
{"col1":5,"col2":6,"col3":7,"col4":8,"col5":9}
3+
{"col1":10,"col2":11,"col3":12,"col4":13,"col5":14}
4+
{"col1":"a","col2":"b","col3":"c","col4":"d","col5":"e"}

tests/sqllogictests/suites/stage/formats/parquet/infer_schema.test

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ drop CONNECTION IF EXISTS my_conn
6161
statement ok
6262
create CONNECTION my_conn STORAGE_TYPE = 's3' access_key_id='minioadmin' secret_access_key='minioadmin' endpoint_url='http://127.0.0.1:9900/' region='auto'
6363

64-
query
65-
select * from INFER_SCHEMA(location => 's3://testbucket/data/parquet/tuple.parquet', connection_name => 'my_conn')
66-
----
67-
id INT 0 0
68-
t TUPLE(A INT32, B STRING) 0 1
64+
# query
65+
# select * from INFER_SCHEMA(location => 's3://testbucket/data/parquet/tuple.parquet', connection_name => 'my_conn')
66+
# ----
67+
# id INT 0 0
68+
# t TUPLE(A INT32, B STRING) 0 1
6969

7070
# CSV
7171
statement ok
@@ -135,6 +135,10 @@ ts_us TIMESTAMP 1 6
135135
ts_ns TIMESTAMP 1 7
136136
utf8_col VARCHAR 1 8
137137

138+
query TTBI
139+
select * from infer_schema(location => '@data/csv/merge/', file_format => 'head_csv_format');
140+
----
141+
138142
# NDJSON
139143
query TTBI
140144
select * from infer_schema(location => '@data/ndjson/numbers.ndjson', file_format => 'NDJSON');
@@ -199,3 +203,7 @@ ts_ns VARCHAR 1 7
199203
utf8_col VARCHAR 1 8
200204
arr_col ARRAY(STRING) 1 9
201205
obj_col TUPLE(A INT64, B STRING) 1 10
206+
207+
query TTBI
208+
select * from infer_schema(location => '@data/ndjson/merge/', file_format => 'NDJSON');
209+
----

0 commit comments

Comments
 (0)