Skip to content

Commit 48cd72c

Browse files
committed
chore: add check on csv and ndjson compression
1 parent 2e4c03f commit 48cd72c

File tree

1 file changed

+17
-7
lines changed
  • src/query/service/src/table_functions/infer_schema

1 file changed

+17
-7
lines changed

src/query/service/src/table_functions/infer_schema/source.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16-
use std::io::Cursor;
1716
use std::sync::Arc;
1817

1918
use arrow_csv::reader::Format;
@@ -32,6 +31,7 @@ use databend_common_expression::FromData;
3231
use databend_common_expression::TableSchema;
3332
use databend_common_meta_app::principal::CsvFileFormatParams;
3433
use databend_common_meta_app::principal::FileFormatParams;
34+
use databend_common_meta_app::principal::StageFileCompression;
3535
use databend_common_meta_app::principal::StageType;
3636
use databend_common_pipeline_core::processors::OutputPort;
3737
use databend_common_pipeline_core::processors::ProcessorPtr;
@@ -145,6 +145,11 @@ impl AsyncSource for InferSchemaSource {
145145
TableSchema::try_from(&arrow_schema)?
146146
}
147147
(Some(first_file), FileFormatParams::Csv(params)) => {
148+
if params.compression != StageFileCompression::None {
149+
return Err(ErrorCode::InvalidCompressionData(
150+
"Compressed CSV files are not supported",
151+
));
152+
}
148153
let arrow_schema = read_csv_metadata_async(
149154
&first_file.path,
150155
&operator,
@@ -155,7 +160,12 @@ impl AsyncSource for InferSchemaSource {
155160
.await?;
156161
TableSchema::try_from(&arrow_schema)?
157162
}
158-
(Some(first_file), FileFormatParams::NdJson(_)) => {
163+
(Some(first_file), FileFormatParams::NdJson(params)) => {
164+
if params.compression != StageFileCompression::None {
165+
return Err(ErrorCode::InvalidCompressionData(
166+
"Compressed NDJSON files are not supported",
167+
));
168+
}
159169
let arrow_schema = read_json_metadata_async(
160170
&first_file.path,
161171
&operator,
@@ -167,7 +177,7 @@ impl AsyncSource for InferSchemaSource {
167177
}
168178
_ => {
169179
return Err(ErrorCode::BadArguments(
170-
"infer_schema is currently limited to format Parquet",
180+
"infer_schema is currently limited to format Parquet, CSV and NDJSON",
171181
));
172182
}
173183
};
@@ -214,7 +224,7 @@ pub async fn read_csv_metadata_async(
214224
};
215225

216226
// TODO: It would be better if it could be read in the form of Read trait
217-
let buf = operator.read_with(path).range(..file_size).await?.to_vec();
227+
let buf = operator.read_with(path).range(..file_size).await?;
218228
let mut format = Format::default()
219229
.with_delimiter(params.field_delimiter.as_bytes()[0])
220230
.with_quote(params.quote.as_bytes()[0])
@@ -223,7 +233,7 @@ pub async fn read_csv_metadata_async(
223233
if let Some(escape) = escape {
224234
format = format.with_escape(escape);
225235
}
226-
let (schema, _) = format.infer_schema(Cursor::new(&buf), max_records)?;
236+
let (schema, _) = format.infer_schema(buf, max_records)?;
227237

228238
Ok(schema)
229239
}
@@ -239,8 +249,8 @@ pub async fn read_json_metadata_async(
239249
Some(n) => n,
240250
};
241251
// TODO: It would be better if it could be read in the form of Read trait
242-
let buf = operator.read_with(path).range(..file_size).await?.to_vec();
243-
let (schema, _) = infer_json_schema(Cursor::new(&buf), max_records)?;
252+
let buf = operator.read_with(path).range(..file_size).await?;
253+
let (schema, _) = infer_json_schema(buf, max_records)?;
244254

245255
Ok(schema)
246256
}

0 commit comments

Comments
 (0)