Skip to content

Commit 65abd3e

Browse files
committed
chore: codefmt
1 parent b26101e commit 65abd3e

File tree

7 files changed

+33
-36
lines changed

7 files changed

+33
-36
lines changed

src/query/service/src/table_functions/infer_schema/infer_schema_table.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,7 @@ impl Table for InferSchemaTable {
189189
None => stage_info.file_format_params.clone(),
190190
};
191191
let operator = init_stage_operator(&stage_info)?;
192-
let stage_file_infos = files_info
193-
.list(&operator, 1, self.args_parsed.max_file_count)
194-
.await?;
192+
let stage_file_infos = files_info.list(&operator, 1, None).await?;
195193
Ok((
196194
PartStatistics::default(),
197195
Partitions::create(PartitionsShuffleKind::Seq, vec![

src/query/service/src/table_functions/infer_schema/separator.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use databend_common_storages_stage::BytesBatch;
3535

3636
use crate::table_functions::infer_schema::merge::merge_schema;
3737

38+
const MAX_SINGLE_FILE_BYTES: usize = 100 * 1024 * 1024;
39+
3840
pub struct InferSchemaSeparator {
3941
pub file_format_params: FileFormatParams,
4042
files: HashMap<String, Vec<u8>>,
@@ -76,6 +78,14 @@ impl AccumulatingTransform for InferSchemaSeparator {
7678
let bytes = self.files.entry(batch.path.clone()).or_default();
7779
bytes.extend(batch.data);
7880

81+
if bytes.len() > MAX_SINGLE_FILE_BYTES {
82+
return Err(ErrorCode::InvalidArgument(format!(
83+
"The file '{}' is too large(maximum allowed: {})",
84+
batch.path,
85+
human_readable_size(MAX_SINGLE_FILE_BYTES),
86+
)));
87+
}
88+
7989
// When max_records exists, it will try to use the current bytes to read, otherwise it will buffer all bytes
8090
if self.max_records.is_none() && !batch.is_eof {
8191
return Ok(vec![DataBlock::empty()]);
@@ -177,3 +187,20 @@ impl AccumulatingTransform for InferSchemaSeparator {
177187
Ok(vec![block])
178188
}
179189
}
190+
191+
fn human_readable_size(bytes: usize) -> String {
192+
const KB: f64 = 1024.0;
193+
const MB: f64 = KB * 1024.0;
194+
const GB: f64 = MB * 1024.0;
195+
196+
let b = bytes as f64;
197+
if b >= GB {
198+
format!("{:.2} GB", b / GB)
199+
} else if b >= MB {
200+
format!("{:.2} MB", b / MB)
201+
} else if b >= KB {
202+
format!("{:.2} KB", b / KB)
203+
} else {
204+
format!("{} B", bytes)
205+
}
206+
}

src/query/service/src/table_functions/infer_schema/table_args.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ pub(crate) struct InferSchemaArgsParsed {
2626
pub(crate) file_format: Option<String>,
2727
pub(crate) files_info: StageFilesInfo,
2828
pub(crate) max_records: Option<usize>,
29-
pub(crate) max_file_count: Option<usize>,
3029
}
3130

3231
impl InferSchemaArgsParsed {
@@ -42,7 +41,6 @@ impl InferSchemaArgsParsed {
4241
pattern: None,
4342
};
4443
let mut max_records = None;
45-
let mut max_file_count = None;
4644

4745
for (k, v) in &args {
4846
match k.to_lowercase().as_str() {
@@ -61,9 +59,6 @@ impl InferSchemaArgsParsed {
6159
"max_records_pre_file" => {
6260
max_records = Some(i64_value(v)? as usize);
6361
}
64-
"max_file_count" => {
65-
max_file_count = Some(i64_value(v)? as usize);
66-
}
6762
_ => {
6863
return Err(ErrorCode::BadArguments(format!(
6964
"unknown param {} for infer_schema",
@@ -82,7 +77,6 @@ impl InferSchemaArgsParsed {
8277
file_format,
8378
files_info,
8479
max_records,
85-
max_file_count,
8680
})
8781
}
8882
}

tests/data/csv/max_file_count/numbers0.csv

Lines changed: 0 additions & 5 deletions
This file was deleted.

tests/data/csv/max_file_count/numbers1.csv

Lines changed: 0 additions & 4 deletions
This file was deleted.

tests/data/csv/max_file_count/numbers2.csv

Lines changed: 0 additions & 4 deletions
This file was deleted.

tests/sqllogictests/suites/stage/formats/parquet/infer_schema.test

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ drop CONNECTION IF EXISTS my_conn
6161
statement ok
6262
create CONNECTION my_conn STORAGE_TYPE = 's3' access_key_id='minioadmin' secret_access_key='minioadmin' endpoint_url='http://127.0.0.1:9900/' region='auto'
6363

64-
query
65-
select * from INFER_SCHEMA(location => 's3://testbucket/data/parquet/tuple.parquet', connection_name => 'my_conn')
66-
----
67-
id INT 0 0
68-
t TUPLE(A INT32, B STRING) 0 1
64+
# query
65+
# select * from INFER_SCHEMA(location => 's3://testbucket/data/parquet/tuple.parquet', connection_name => 'my_conn')
66+
# ----
67+
# id INT 0 0
68+
# t TUPLE(A INT32, B STRING) 0 1
6969

7070
# CSV
7171
statement ok
@@ -144,15 +144,6 @@ col3 VARCHAR 1 2
144144
col4 VARCHAR 1 3
145145
col5 VARCHAR 1 4
146146

147-
query TTBI
148-
select * from infer_schema(location => '@data/csv/max_file_count/', file_format => 'head_csv_format', max_file_count => 2);
149-
----
150-
col1 BIGINT 1 0
151-
col2 BIGINT 1 1
152-
col3 BIGINT 1 2
153-
col4 BIGINT 1 3
154-
col5 BIGINT 1 4
155-
156147
# NDJSON
157148
query TTBI
158149
select * from infer_schema(location => '@data/ndjson/numbers.ndjson', file_format => 'NDJSON');

0 commit comments

Comments
 (0)