diff --git a/Cargo.lock b/Cargo.lock index 2d2ef239846cd..8beea2f0a46cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5154,8 +5154,10 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-cast", + "arrow-csv", "arrow-flight", "arrow-ipc", + "arrow-json", "arrow-schema", "arrow-select", "arrow-udf-runtime", @@ -5186,6 +5188,7 @@ dependencies = [ "databend-common-catalog", "databend-common-cloud-control", "databend-common-column", + "databend-common-compress", "databend-common-config", "databend-common-exception", "databend-common-expression", diff --git a/Cargo.toml b/Cargo.toml index 2e1f8759f7f68..c2654cb6e26a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -229,9 +229,11 @@ arrow = { version = "55" } arrow-array = { version = "55" } arrow-buffer = { version = "55" } arrow-cast = { version = "55", features = ["prettyprint"] } +arrow-csv = { version = "55" } arrow-data = { version = "55" } arrow-flight = { version = "55", features = ["flight-sql-experimental", "tls"] } arrow-ipc = { version = "55", features = ["lz4", "zstd"] } +arrow-json = { version = "55" } arrow-ord = { version = "55" } arrow-schema = { version = "55", features = ["serde"] } arrow-select = { version = "55" } diff --git a/src/query/catalog/src/table_args.rs b/src/query/catalog/src/table_args.rs index dab8c366e9bad..4f1c26e1596f6 100644 --- a/src/query/catalog/src/table_args.rs +++ b/src/query/catalog/src/table_args.rs @@ -119,6 +119,12 @@ pub fn bool_value(value: &Scalar) -> Result { } } +pub fn i64_value(value: &Scalar) -> Result { + value.get_i64().ok_or_else(|| { + ErrorCode::BadArguments(format!("invalid value {value} expect to be i64 literal.")) + }) +} + pub fn string_literal(val: &str) -> Scalar { Scalar::String(val.to_string()) } diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 0016398385d49..f6b0302b314ea 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -23,8 +23,10 @@ io-uring = [ anyhow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-csv = { workspace = true } arrow-flight = { workspace = true } arrow-ipc = { workspace = true } +arrow-json = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-udf-runtime = { workspace = true } @@ -54,6 +56,7 @@ databend-common-cache = { workspace = true } databend-common-catalog = { workspace = true } databend-common-cloud-control = { workspace = true } databend-common-column = { workspace = true } +databend-common-compress = { workspace = true } databend-common-config = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } diff --git a/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs b/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs index b9bb841f4281d..f946aead234e0 100644 --- a/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs +++ b/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs @@ -13,27 +13,52 @@ // limitations under the License. use std::any::Any; +use std::collections::BTreeMap; use std::sync::Arc; +use databend_common_ast::ast::FileLocation; +use databend_common_ast::ast::UriLocation; use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartInfo; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::table::Table; use databend_common_catalog::table_args::TableArgs; +use databend_common_compress::CompressAlgorithm; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::NumberDataType; +use databend_common_expression::BlockThresholds; use databend_common_expression::TableDataType; use databend_common_expression::TableField; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRefExt; +use databend_common_meta_app::principal::FileFormatParams; +use databend_common_meta_app::principal::StageInfo; +use databend_common_meta_app::principal::StageType; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sources::PrefetchAsyncSourcer; +use databend_common_pipeline_transforms::TransformPipelineHelper; +use databend_common_sql::binder::resolve_file_location; +use databend_common_storage::init_stage_operator; +use databend_common_storage::StageFilesInfo; +use databend_common_storages_stage::BytesReader; +use databend_common_storages_stage::Decompressor; +use databend_common_storages_stage::InferSchemaPartInfo; +use databend_common_storages_stage::LoadContext; +use databend_common_users::Object; +use databend_storages_common_stage::SingleFilePartition; +use opendal::Scheme; use super::parquet::ParquetInferSchemaSource; use crate::sessions::TableContext; +use crate::table_functions::infer_schema::separator::InferSchemaSeparator; use crate::table_functions::infer_schema::table_args::InferSchemaArgsParsed; use crate::table_functions::TableFunction; @@ -77,9 +102,27 @@ impl InferSchemaTable { TableField::new("column_name", TableDataType::String), TableField::new("type", TableDataType::String), TableField::new("nullable", TableDataType::Boolean), + TableField::new("filenames", TableDataType::String), TableField::new("order_id", TableDataType::Number(NumberDataType::UInt64)), ]) } + + fn build_read_stage_source( + ctx: Arc, + pipeline: &mut Pipeline, + stage_info: &StageInfo, + ) -> Result<()> { + let operator = init_stage_operator(stage_info)?; + let batch_size = ctx.get_settings().get_input_read_buffer_size()? as usize; + pipeline.add_source( + |output| { + let reader = BytesReader::try_create(ctx.clone(), operator.clone(), batch_size, 1)?; + PrefetchAsyncSourcer::create(ctx.clone(), output, reader) + }, + 1, + )?; + Ok(()) + } } #[async_trait::async_trait] @@ -95,11 +138,72 @@ impl Table for InferSchemaTable { #[async_backtrace::framed] async fn read_partitions( &self, - _ctx: Arc, + ctx: Arc, _push_downs: Option, _dry_run: bool, ) -> Result<(PartStatistics, Partitions)> { - Ok((PartStatistics::default(), Partitions::default())) + let file_location = if let Some(location) = + self.args_parsed.location.clone().strip_prefix('@') + { + FileLocation::Stage(location.to_string()) + } else if let Some(connection_name) = &self.args_parsed.connection_name { + let conn = ctx.get_connection(connection_name).await?; + let uri = + UriLocation::from_uri(self.args_parsed.location.clone(), conn.storage_params)?; + let proto = conn.storage_type.parse::()?; + if proto != uri.protocol.parse::()? { + return Err(ErrorCode::BadArguments(format!( + "protocol from connection_name={connection_name} ({proto}) not match with uri protocol ({0}).", + uri.protocol + ))); + } + FileLocation::Uri(uri) + } else { + let uri = + UriLocation::from_uri(self.args_parsed.location.clone(), BTreeMap::default())?; + FileLocation::Uri(uri) + }; + let (stage_info, path) = resolve_file_location(ctx.as_ref(), &file_location).await?; + let enable_experimental_rbac_check = + ctx.get_settings().get_enable_experimental_rbac_check()?; + if enable_experimental_rbac_check { + let visibility_checker = ctx.get_visibility_checker(false, Object::Stage).await?; + if !(stage_info.is_temporary + || visibility_checker.check_stage_read_visibility(&stage_info.stage_name) + || stage_info.stage_type == StageType::User + && stage_info.stage_name == ctx.get_current_user()?.name) + { + return Err(ErrorCode::PermissionDenied(format!( + "Permission denied: privilege READ is required on stage {} for user {}", + stage_info.stage_name.clone(), + &ctx.get_current_user()?.identity().display(), + ))); + } + } + let files_info = StageFilesInfo { + path: path.clone(), + ..self.args_parsed.files_info.clone() + }; + + let file_format_params = match &self.args_parsed.file_format { + Some(f) => ctx.get_file_format(f).await?, + None => stage_info.file_format_params.clone(), + }; + let operator = init_stage_operator(&stage_info)?; + let stage_file_infos = files_info + .list(&operator, 1, self.args_parsed.max_file_count) + .await?; + Ok(( + PartStatistics::default(), + Partitions::create(PartitionsShuffleKind::Seq, vec![ + InferSchemaPartInfo::create( + files_info, + file_format_params, + stage_info, + stage_file_infos, + ), + ]), + )) } fn table_args(&self) -> Option { @@ -113,12 +217,98 @@ impl Table for InferSchemaTable { pipeline: &mut Pipeline, _put_cache: bool, ) -> Result<()> { - pipeline.add_source( - |output| { - ParquetInferSchemaSource::create(ctx.clone(), output, self.args_parsed.clone()) - }, - 1, - )?; + let Some(part) = ctx.get_partition() else { + return Ok(()); + }; + let info = InferSchemaPartInfo::from_part(&part)?; + + match info.file_format_params { + FileFormatParams::Csv(_) | FileFormatParams::NdJson(_) => { + let partitions = info + .stage_file_infos + .iter() + .map(|v| { + let part = SingleFilePartition { + path: v.path.clone(), + size: v.size as usize, + }; + let part_info: Box = Box::new(part); + Arc::new(part_info) + }) + .collect::>(); + ctx.set_partitions(Partitions::create(PartitionsShuffleKind::Seq, partitions))?; + Self::build_read_stage_source(ctx.clone(), pipeline, &info.stage_info)?; + + let stage_table_info = StageTableInfo { + stage_root: "".to_string(), + stage_info: info.stage_info.clone(), + schema: Arc::new(Default::default()), + default_exprs: None, + files_info: info.files_info.clone(), + files_to_copy: None, + duplicated_files_detected: vec![], + is_select: false, + copy_into_table_options: Default::default(), + is_variant: false, + }; + + let load_ctx = Arc::new(LoadContext::try_create_for_copy( + ctx.clone(), + &stage_table_info, + None, + BlockThresholds::default(), + vec![], + )?); + + let mut algo = None; + + for file_info in info.stage_file_infos.iter() { + let Some(new_algo) = CompressAlgorithm::from_path(&file_info.path) else { + continue; + }; + + if let Some(algo) = algo { + if algo != new_algo { + return Err(ErrorCode::UnknownCompressionType( + "`infer_schema` only supports single compression type", + )); + } + } + algo = Some(new_algo); + } + if algo.is_some() { + pipeline.try_add_accumulating_transformer(|| { + Decompressor::try_create(load_ctx.clone(), algo) + })?; + } + pipeline.add_accumulating_transformer(|| { + InferSchemaSeparator::create( + info.file_format_params.clone(), + self.args_parsed.max_records, + info.stage_file_infos.len(), + ) + }); + } + FileFormatParams::Parquet(_) => { + pipeline.add_source( + |output| { + ParquetInferSchemaSource::create( + ctx.clone(), + output, + info.stage_info.clone(), + info.stage_file_infos.clone(), + ) + }, + 1, + )?; + } + _ => { + return Err(ErrorCode::BadArguments( + "infer_schema is currently limited to format Parquet, CSV and NDJSON", + )); + } + } + Ok(()) } } diff --git a/src/query/service/src/table_functions/infer_schema/merge.rs b/src/query/service/src/table_functions/infer_schema/merge.rs new file mode 100644 index 0000000000000..5aa78e263a7cb --- /dev/null +++ b/src/query/service/src/table_functions/infer_schema/merge.rs @@ -0,0 +1,284 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::types::NumberDataType; +use databend_common_expression::TableDataType; +use databend_common_expression::TableSchema; + +const UNSIGNED_TYPES: [NumberDataType; 4] = [ + NumberDataType::UInt8, + NumberDataType::UInt16, + NumberDataType::UInt32, + NumberDataType::UInt64, +]; +const SIGNED_TYPES: [NumberDataType; 4] = [ + NumberDataType::Int8, + NumberDataType::Int16, + NumberDataType::Int32, + NumberDataType::Int64, +]; +const FLOAT_TYPES: [NumberDataType; 2] = [NumberDataType::Float32, NumberDataType::Float64]; + +fn wrap_nullable(ty: TableDataType, is_nullable: bool) -> TableDataType { + if is_nullable { + ty.wrap_nullable() + } else { + ty + } +} + +pub fn merge_type( + old: TableDataType, + new: TableDataType, + is_nullable: bool, +) -> Option { + if old.remove_nullable() == new.remove_nullable() { + return Some(wrap_nullable(old, is_nullable)); + } + if let (TableDataType::Number(old_num), TableDataType::Number(new_num)) = + (new.remove_nullable(), old.remove_nullable()) + { + if old_num.is_float() && new_num.is_float() { + return promote_numeric(&old, &new, &FLOAT_TYPES) + .map(|ty| wrap_nullable(ty, is_nullable)); + } + return promote_numeric(&old, &new, &SIGNED_TYPES) + .or_else(|| promote_numeric(&old, &new, &UNSIGNED_TYPES)) + .map(|ty| wrap_nullable(ty, is_nullable)); + } + None +} + +pub fn promote_numeric( + a: &TableDataType, + b: &TableDataType, + types: &[NumberDataType], +) -> Option { + let idx_a = match a { + TableDataType::Number(n) => types.iter().position(|t| t == n), + _ => None, + }; + let idx_b = match b { + TableDataType::Number(n) => types.iter().position(|t| t == n), + _ => None, + }; + match (idx_a, idx_b) { + (Some(i), Some(j)) => Some(TableDataType::Number(types[usize::max(i, j)])), + _ => None, + } +} + +pub fn merge_schema(defined: TableSchema, guess: TableSchema) -> TableSchema { + let TableSchema { + fields: mut def_fields, + .. + } = defined; + let TableSchema { + fields: guess_fields, + .. + } = guess; + + for guess_field in guess_fields { + match def_fields + .iter_mut() + .find(|def_field| def_field.name() == guess_field.name()) + { + None => { + def_fields.push(guess_field); + } + Some(def_field) => { + let is_nullable = + def_field.data_type.is_nullable() || guess_field.data_type.is_nullable(); + def_field.data_type = merge_type( + def_field.data_type.clone(), + guess_field.data_type, + is_nullable, + ) + .unwrap_or_else(|| wrap_nullable(TableDataType::String, is_nullable)); + } + } + } + + TableSchema::new(def_fields) +} + +#[cfg(test)] +mod tests { + use databend_common_expression::types::NumberDataType; + use databend_common_expression::TableDataType; + use databend_common_expression::TableField; + use databend_common_expression::TableSchema; + + use crate::table_functions::infer_schema::merge::merge_schema; + use crate::table_functions::infer_schema::merge::merge_type; + + #[test] + fn test_promote_unsigned() { + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::UInt8), + TableDataType::Number(NumberDataType::UInt16), + false, + ), + Some(TableDataType::Number(NumberDataType::UInt16)) + ); + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::UInt32), + TableDataType::Number(NumberDataType::UInt64), + false, + ), + Some(TableDataType::Number(NumberDataType::UInt64)) + ); + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::UInt8), + TableDataType::Number(NumberDataType::Int8), + false, + ), + None + ); + } + + #[test] + fn test_promote_signed() { + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Int8), + TableDataType::Number(NumberDataType::Int16), + false, + ), + Some(TableDataType::Number(NumberDataType::Int16)) + ); + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Int32), + TableDataType::Number(NumberDataType::Int64), + false, + ), + Some(TableDataType::Number(NumberDataType::Int64)) + ); + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Int8), + TableDataType::Number(NumberDataType::UInt8), + false, + ), + None + ); + } + + #[test] + fn test_promote_integer() { + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Int8), + TableDataType::Number(NumberDataType::Int16), + false, + ), + Some(TableDataType::Number(NumberDataType::Int16)) + ); + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::UInt8), + TableDataType::Number(NumberDataType::UInt32), + false, + ), + Some(TableDataType::Number(NumberDataType::UInt32)) + ); + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Int8), + TableDataType::Number(NumberDataType::UInt8), + false, + ), + None + ); + } + + #[test] + fn test_promote_float() { + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Float32), + TableDataType::Number(NumberDataType::Float64), + false, + ), + Some(TableDataType::Number(NumberDataType::Float64)) + ); + } + + #[test] + fn test_promote_numeric() { + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Int8), + TableDataType::Number(NumberDataType::Int16), + false, + ), + Some(TableDataType::Number(NumberDataType::Int16)) + ); + assert_eq!( + merge_type( + TableDataType::Number(NumberDataType::Float32), + TableDataType::Number(NumberDataType::Int16), + false, + ), + None + ); + assert_eq!( + merge_type( + TableDataType::String, + TableDataType::Number(NumberDataType::Int32), + false, + ), + None + ); + } + + #[test] + fn test_merge_schema() { + let schema_1 = TableSchema::new(vec![ + TableField::new( + "c1", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int8))), + ), + TableField::new("c2", TableDataType::Number(NumberDataType::Int8)), + TableField::new("c3", TableDataType::Number(NumberDataType::Int32)), + TableField::new("c4", TableDataType::Number(NumberDataType::Float32)), + TableField::new("c5", TableDataType::Number(NumberDataType::Float32)), + ]); + let schema_2 = TableSchema::new(vec![ + TableField::new("c1", TableDataType::Number(NumberDataType::Int8)), + TableField::new("c3", TableDataType::Number(NumberDataType::Float32)), + TableField::new("c2", TableDataType::Number(NumberDataType::Int8)), + TableField::new("c4", TableDataType::Number(NumberDataType::Float32)), + TableField::new("c6", TableDataType::Number(NumberDataType::Float32)), + ]); + + let schema = merge_schema(schema_1, schema_2); + let expected_schema = TableSchema::new(vec![ + TableField::new( + "c1", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int8))), + ), + TableField::new("c2", TableDataType::Number(NumberDataType::Int8)), + TableField::new("c3", TableDataType::String), + TableField::new("c4", TableDataType::Number(NumberDataType::Float32)), + TableField::new("c5", TableDataType::Number(NumberDataType::Float32)), + TableField::new("c6", TableDataType::Number(NumberDataType::Float32)), + ]); + assert_eq!(schema, expected_schema); + } +} diff --git a/src/query/service/src/table_functions/infer_schema/mod.rs b/src/query/service/src/table_functions/infer_schema/mod.rs index 804499bf8fa56..82597e959806b 100644 --- a/src/query/service/src/table_functions/infer_schema/mod.rs +++ b/src/query/service/src/table_functions/infer_schema/mod.rs @@ -13,7 +13,9 @@ // limitations under the License. mod infer_schema_table; +mod merge; mod parquet; +mod separator; mod table_args; pub use infer_schema_table::InferSchemaTable; diff --git a/src/query/service/src/table_functions/infer_schema/parquet.rs b/src/query/service/src/table_functions/infer_schema/parquet.rs index 753971deab5b7..5db9713b9ac69 100644 --- a/src/query/service/src/table_functions/infer_schema/parquet.rs +++ b/src/query/service/src/table_functions/infer_schema/parquet.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; use std::sync::Arc; -use databend_common_ast::ast::FileLocation; -use databend_common_ast::ast::UriLocation; +use arrow_schema::Schema; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::BooleanType; use databend_common_expression::types::StringType; @@ -26,38 +23,37 @@ use databend_common_expression::types::UInt64Type; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_expression::TableSchema; -use databend_common_meta_app::principal::StageFileFormatType; -use databend_common_meta_app::principal::StageType; +use databend_common_meta_app::principal::StageInfo; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_sources::AsyncSource; use databend_common_pipeline_sources::AsyncSourcer; -use databend_common_sql::binder::resolve_file_location; use databend_common_storage::init_stage_operator; use databend_common_storage::read_parquet_schema_async_rs; -use databend_common_storage::StageFilesInfo; -use databend_common_users::Object; -use opendal::Scheme; +use databend_common_storage::StageFileInfo; +use futures_util::future::try_join_all; +use itertools::Itertools; use crate::table_functions::infer_schema::infer_schema_table::INFER_SCHEMA; -use crate::table_functions::infer_schema::table_args::InferSchemaArgsParsed; pub(crate) struct ParquetInferSchemaSource { is_finished: bool, - ctx: Arc, - args_parsed: InferSchemaArgsParsed, + + stage_info: StageInfo, + stage_file_infos: Vec, } impl ParquetInferSchemaSource { pub fn create( ctx: Arc, output: Arc, - args_parsed: InferSchemaArgsParsed, + stage_info: StageInfo, + stage_file_infos: Vec, ) -> Result { - AsyncSourcer::create(ctx.clone(), output, ParquetInferSchemaSource { + AsyncSourcer::create(ctx, output, ParquetInferSchemaSource { is_finished: false, - ctx, - args_parsed, + stage_info, + stage_file_infos, }) } } @@ -73,96 +69,40 @@ impl AsyncSource for ParquetInferSchemaSource { } self.is_finished = true; - let file_location = if let Some(location) = - self.args_parsed.location.clone().strip_prefix('@') - { - FileLocation::Stage(location.to_string()) - } else if let Some(connection_name) = &self.args_parsed.connection_name { - let conn = self.ctx.get_connection(connection_name).await?; - let uri = - UriLocation::from_uri(self.args_parsed.location.clone(), conn.storage_params)?; - let proto = conn.storage_type.parse::()?; - if proto != uri.protocol.parse::()? { - return Err(ErrorCode::BadArguments(format!( - "protocol from connection_name={connection_name} ({proto}) not match with uri protocol ({0}).", - uri.protocol - ))); - } - FileLocation::Uri(uri) - } else { - let uri = - UriLocation::from_uri(self.args_parsed.location.clone(), BTreeMap::default())?; - FileLocation::Uri(uri) - }; - let (stage_info, path) = resolve_file_location(self.ctx.as_ref(), &file_location).await?; - let enable_experimental_rbac_check = self - .ctx - .get_settings() - .get_enable_experimental_rbac_check()?; - if enable_experimental_rbac_check { - let visibility_checker = self - .ctx - .get_visibility_checker(false, Object::Stage) - .await?; - if !(stage_info.is_temporary - || visibility_checker.check_stage_read_visibility(&stage_info.stage_name) - || stage_info.stage_type == StageType::User - && stage_info.stage_name == self.ctx.get_current_user()?.name) - { - return Err(ErrorCode::PermissionDenied(format!( - "Permission denied: privilege READ is required on stage {} for user {}", - stage_info.stage_name.clone(), - &self.ctx.get_current_user()?.identity().display(), - ))); - } - } - let files_info = StageFilesInfo { - path: path.clone(), - ..self.args_parsed.files_info.clone() - }; - let operator = init_stage_operator(&stage_info)?; - - let first_file = files_info.first_file(&operator).await?; - let file_format_params = match &self.args_parsed.file_format { - Some(f) => self.ctx.get_file_format(f).await?, - None => stage_info.file_format_params.clone(), - }; - let schema = match (first_file.as_ref(), file_format_params.get_type()) { - (None, _) => return Ok(None), - (Some(first_file), StageFileFormatType::Parquet) => { - let arrow_schema = read_parquet_schema_async_rs( - &operator, - &first_file.path, - Some(first_file.size), - ) - .await?; - TableSchema::try_from(&arrow_schema)? - } - _ => { - return Err(ErrorCode::BadArguments( - "infer_schema is currently limited to format Parquet", - )); - } - }; + let operator = init_stage_operator(&self.stage_info)?; + let infer_schema_futures = self.stage_file_infos.iter().map(|file| async { + read_parquet_schema_async_rs(&operator, &file.path, Some(file.size)).await + }); + // todo: unify_schemas(arrow-rs unsupported now) + let arrow_schema = Schema::try_merge(try_join_all(infer_schema_futures).await?)?; + let table_schema = TableSchema::try_from(&arrow_schema)?; let mut names: Vec = vec![]; let mut types: Vec = vec![]; let mut nulls: Vec = vec![]; + let mut filenames: Vec = vec![]; + let filenames_str = self + .stage_file_infos + .iter() + .map(|info| &info.path) + .join(", "); - for field in schema.fields().iter() { + for field in table_schema.fields().iter() { names.push(field.name().to_string()); let non_null_type = field.data_type().remove_recursive_nullable(); types.push(non_null_type.sql_name()); nulls.push(field.is_nullable()); + filenames.push(filenames_str.clone()); } - let order_ids = (0..schema.fields().len() as u64).collect::>(); + let order_ids = (0..table_schema.fields().len() as u64).collect::>(); let block = DataBlock::new_from_columns(vec![ StringType::from_data(names), StringType::from_data(types), BooleanType::from_data(nulls), + StringType::from_data(filenames), UInt64Type::from_data(order_ids), ]); Ok(Some(block)) diff --git a/src/query/service/src/table_functions/infer_schema/separator.rs b/src/query/service/src/table_functions/infer_schema/separator.rs new file mode 100644 index 0000000000000..b5607f0a3b9f1 --- /dev/null +++ b/src/query/service/src/table_functions/infer_schema/separator.rs @@ -0,0 +1,213 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::io::Cursor; + +use arrow_csv::reader::Format; +use arrow_json::reader::infer_json_schema_from_iterator; +use arrow_json::reader::ValueIter; +use arrow_schema::ArrowError; +use arrow_schema::Schema; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::BooleanType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::TableSchema; +use databend_common_meta_app::principal::FileFormatParams; +use databend_common_pipeline_transforms::AccumulatingTransform; +use databend_common_storages_stage::BytesBatch; +use itertools::Itertools; + +use crate::table_functions::infer_schema::merge::merge_schema; + +const MAX_SINGLE_FILE_BYTES: usize = 100 * 1024 * 1024; + +pub struct InferSchemaSeparator { + pub file_format_params: FileFormatParams, + files: HashMap>, + pub max_records: Option, + schemas: Option, + files_len: usize, + filenames: Vec, + is_finished: bool, +} + +impl InferSchemaSeparator { + pub fn create( + file_format_params: FileFormatParams, + max_records: Option, + files_len: usize, + ) -> Self { + InferSchemaSeparator { + file_format_params, + files: HashMap::new(), + max_records, + schemas: None, + files_len, + filenames: Vec::with_capacity(files_len), + is_finished: false, + } + } +} + +impl AccumulatingTransform for InferSchemaSeparator { + const NAME: &'static str = "InferSchemaSeparator"; + + fn transform(&mut self, data: DataBlock) -> Result> { + if self.is_finished { + return Ok(vec![DataBlock::empty()]); + } + let batch = data + .get_owned_meta() + .and_then(BytesBatch::downcast_from) + .unwrap(); + + let bytes = self.files.entry(batch.path.clone()).or_default(); + bytes.extend(batch.data); + + if bytes.len() > MAX_SINGLE_FILE_BYTES { + return Err(ErrorCode::InvalidArgument(format!( + "The file '{}' is too large(maximum allowed: {})", + batch.path, + human_readable_size(MAX_SINGLE_FILE_BYTES), + ))); + } + + // When max_records exists, it will try to use the current bytes to read, otherwise it will buffer all bytes + if self.max_records.is_none() && !batch.is_eof { + return Ok(vec![DataBlock::empty()]); + } + let bytes = Cursor::new(bytes); + let result = match &self.file_format_params { + FileFormatParams::Csv(params) => { + let escape = if params.escape.is_empty() { + None + } else { + Some(params.escape.as_bytes()[0]) + }; + + let mut format = Format::default() + .with_delimiter(params.field_delimiter.as_bytes()[0]) + .with_quote(params.quote.as_bytes()[0]) + .with_header(params.headers != 0); + if let Some(escape) = escape { + format = format.with_escape(escape); + } + format + .infer_schema(bytes, self.max_records) + .map(|(schema, _)| schema) + .map_err(Some) + } + FileFormatParams::NdJson(_) => { + let mut records = ValueIter::new(bytes, self.max_records); + let fn_ndjson = |max_records| -> std::result::Result> { + if let Some(max_record) = max_records { + let mut tmp: Vec> = + Vec::with_capacity(max_record); + + for result in records { + tmp.push(Ok(result.map_err(|_| None)?)); + } + infer_json_schema_from_iterator(tmp.into_iter()).map_err(Some) + } else { + infer_json_schema_from_iterator(&mut records).map_err(Some) + } + }; + fn_ndjson(self.max_records) + } + _ => { + return Err(ErrorCode::BadArguments( + "InferSchemaSeparator is currently limited to format CSV and NDJSON", + )); + } + }; + let arrow_schema = match result { + Ok(schema) => schema, + Err(None) => return Ok(vec![DataBlock::empty()]), + Err(Some(err)) => { + if matches!(err, ArrowError::CsvError(_)) + && self.max_records.is_some() + && !batch.is_eof + { + return Ok(vec![DataBlock::empty()]); + } + return Err(err.into()); + } + }; + self.files.remove(&batch.path); + self.filenames.push(batch.path); + + let merge_schema = match self.schemas.take() { + None => TableSchema::try_from(&arrow_schema)?, + Some(schema) => merge_schema(schema, TableSchema::try_from(&arrow_schema)?), + }; + self.schemas = Some(merge_schema); + + if self.files_len > self.filenames.len() { + return Ok(vec![DataBlock::empty()]); + } + self.is_finished = true; + let Some(table_schema) = self.schemas.take() else { + return Ok(vec![DataBlock::empty()]); + }; + + let mut names: Vec = vec![]; + let mut types: Vec = vec![]; + let mut nulls: Vec = vec![]; + let mut filenames: Vec = vec![]; + let filenames_str = self.filenames.iter().join(", "); + + for field in table_schema.fields().iter() { + names.push(field.name().to_string()); + + let non_null_type = field.data_type().remove_recursive_nullable(); + types.push(non_null_type.sql_name()); + nulls.push(field.is_nullable()); + filenames.push(filenames_str.clone()); + } + + let order_ids = (0..table_schema.fields().len() as u64).collect::>(); + + let block = DataBlock::new_from_columns(vec![ + StringType::from_data(names), + StringType::from_data(types), + BooleanType::from_data(nulls), + StringType::from_data(filenames), + UInt64Type::from_data(order_ids), + ]); + Ok(vec![block]) + } +} + +fn human_readable_size(bytes: usize) -> String { + const KB: f64 = 1024.0; + const MB: f64 = KB * 1024.0; + const GB: f64 = MB * 1024.0; + + let b = bytes as f64; + if b >= GB { + format!("{:.2} GB", b / GB) + } else if b >= MB { + format!("{:.2} MB", b / MB) + } else if b >= KB { + format!("{:.2} KB", b / KB) + } else { + format!("{} B", bytes) + } +} diff --git a/src/query/service/src/table_functions/infer_schema/table_args.rs b/src/query/service/src/table_functions/infer_schema/table_args.rs index 07d359d5985a5..9781bc742ee4b 100644 --- a/src/query/service/src/table_functions/infer_schema/table_args.rs +++ b/src/query/service/src/table_functions/infer_schema/table_args.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_catalog::table_args::i64_value; use databend_common_catalog::table_args::TableArgs; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -24,6 +25,8 @@ pub(crate) struct InferSchemaArgsParsed { pub(crate) connection_name: Option, pub(crate) file_format: Option, pub(crate) files_info: StageFilesInfo, + pub(crate) max_records: Option, + pub(crate) max_file_count: Option, } impl InferSchemaArgsParsed { @@ -38,6 +41,8 @@ impl InferSchemaArgsParsed { files: None, pattern: None, }; + let mut max_records = None; + let mut max_file_count = None; for (k, v) in &args { match k.to_lowercase().as_str() { @@ -53,6 +58,12 @@ impl InferSchemaArgsParsed { "file_format" => { file_format = Some(string_value(v)?); } + "max_records_pre_file" => { + max_records = Some(i64_value(v)? as usize); + } + "max_file_count" => { + max_file_count = Some(i64_value(v)? as usize); + } _ => { return Err(ErrorCode::BadArguments(format!( "unknown param {} for infer_schema", @@ -70,6 +81,8 @@ impl InferSchemaArgsParsed { connection_name, file_format, files_info, + max_records, + max_file_count, }) } } diff --git a/src/query/storages/stage/src/infer_schema.rs b/src/query/storages/stage/src/infer_schema.rs new file mode 100644 index 0000000000000..77a961594992e --- /dev/null +++ b/src/query/storages/stage/src/infer_schema.rs @@ -0,0 +1,80 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use databend_common_catalog::plan::PartInfo; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::plan::PartInfoType; +use databend_common_exception::ErrorCode; +use databend_common_meta_app::principal::FileFormatParams; +use databend_common_meta_app::principal::StageInfo; +use databend_common_storage::StageFileInfo; +use databend_common_storage::StageFilesInfo; + +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct InferSchemaPartInfo { + pub files_info: StageFilesInfo, + pub file_format_params: FileFormatParams, + pub stage_info: StageInfo, + pub stage_file_infos: Vec, +} + +#[typetag::serde(name = "infer_schema")] +impl PartInfo for InferSchemaPartInfo { + fn as_any(&self) -> &dyn Any { + self + } + + fn equals(&self, info: &Box) -> bool { + info.as_any() + .downcast_ref::() + .is_some_and(|other| self == other) + } + + fn hash(&self) -> u64 { + 0 + } + + fn part_type(&self) -> PartInfoType { + PartInfoType::LazyLevel + } +} + +impl InferSchemaPartInfo { + pub fn create( + files_info: StageFilesInfo, + file_format_params: FileFormatParams, + stage_info: StageInfo, + stage_file_infos: Vec, + ) -> PartInfoPtr { + Arc::new(Box::new(InferSchemaPartInfo { + files_info, + file_format_params, + stage_info, + stage_file_infos, + })) + } + + pub fn from_part( + info: &PartInfoPtr, + ) -> databend_common_exception::Result<&InferSchemaPartInfo> { + info.as_any() + .downcast_ref::() + .ok_or_else(|| { + ErrorCode::Internal("Cannot downcast from PartInfo to InferSchemaPartInfo.") + }) + } +} diff --git a/src/query/storages/stage/src/lib.rs b/src/query/storages/stage/src/lib.rs index 96573e42f4af2..39e7392165464 100644 --- a/src/query/storages/stage/src/lib.rs +++ b/src/query/storages/stage/src/lib.rs @@ -21,6 +21,7 @@ mod append; mod compression; +mod infer_schema; mod read; mod stage_table; mod streaming_load; @@ -29,7 +30,11 @@ mod transform_null_if; pub use append::StageSinkTable; pub use compression::get_compression_with_path; +pub use infer_schema::InferSchemaPartInfo; pub use read::row_based::BytesBatch; +pub use read::row_based::BytesReader; +pub use read::row_based::Decompressor; +pub use read::LoadContext; pub use stage_table::StageTable; pub use streaming_load::build_streaming_load_pipeline; pub use transform_null_if::TransformNullIf; diff --git a/src/query/storages/stage/src/read/mod.rs b/src/query/storages/stage/src/read/mod.rs index 98f677c51fbde..984e6c62f78a5 100644 --- a/src/query/storages/stage/src/read/mod.rs +++ b/src/query/storages/stage/src/read/mod.rs @@ -20,3 +20,4 @@ pub mod row_based; pub(crate) mod block_builder_state; mod default_expr_evaluator; pub(crate) mod whole_file_reader; +pub use load_context::LoadContext; diff --git a/src/query/storages/stage/src/read/row_based/mod.rs b/src/query/storages/stage/src/read/row_based/mod.rs index a630ae43fabfc..69e24b53db07a 100644 --- a/src/query/storages/stage/src/read/row_based/mod.rs +++ b/src/query/storages/stage/src/read/row_based/mod.rs @@ -20,4 +20,6 @@ mod read_pipeline; mod utils; pub use batch::BytesBatch; +pub use processors::BytesReader; +pub use processors::Decompressor; pub use read_pipeline::RowBasedReadPipelineBuilder; diff --git a/tests/data/csv/max_file_count/numbers0.csv b/tests/data/csv/max_file_count/numbers0.csv new file mode 100644 index 0000000000000..a49bbf89b1d3d --- /dev/null +++ b/tests/data/csv/max_file_count/numbers0.csv @@ -0,0 +1,4 @@ +col1,col2,col3,col4,col5 +0,1,2,3,4 +5,6,7,8,9 +10,11,12,13,14 \ No newline at end of file diff --git a/tests/data/csv/max_file_count/numbers1.csv b/tests/data/csv/max_file_count/numbers1.csv new file mode 100644 index 0000000000000..a49bbf89b1d3d --- /dev/null +++ b/tests/data/csv/max_file_count/numbers1.csv @@ -0,0 +1,4 @@ +col1,col2,col3,col4,col5 +0,1,2,3,4 +5,6,7,8,9 +10,11,12,13,14 \ No newline at end of file diff --git a/tests/data/csv/max_file_count/numbers2.csv b/tests/data/csv/max_file_count/numbers2.csv new file mode 100644 index 0000000000000..a49bbf89b1d3d --- /dev/null +++ b/tests/data/csv/max_file_count/numbers2.csv @@ -0,0 +1,4 @@ +col1,col2,col3,col4,col5 +0,1,2,3,4 +5,6,7,8,9 +10,11,12,13,14 \ No newline at end of file diff --git a/tests/data/csv/max_records.csv b/tests/data/csv/max_records.csv new file mode 100644 index 0000000000000..5e52f31e5dd5d --- /dev/null +++ b/tests/data/csv/max_records.csv @@ -0,0 +1,11 @@ +id,value +1,100 +2,200 +3,300 +4,400 +5,500 +6,foo +7,bar +8,baz +9,qux +10,quux diff --git a/tests/data/csv/max_records.xz b/tests/data/csv/max_records.xz new file mode 100644 index 0000000000000..25a16f4f85295 Binary files /dev/null and b/tests/data/csv/max_records.xz differ diff --git a/tests/data/csv/max_records.zip b/tests/data/csv/max_records.zip new file mode 100644 index 0000000000000..baea0be135d7f Binary files /dev/null and b/tests/data/csv/max_records.zip differ diff --git a/tests/data/csv/max_records.zst b/tests/data/csv/max_records.zst new file mode 100644 index 0000000000000..ef35edae5da5e Binary files /dev/null and b/tests/data/csv/max_records.zst differ diff --git a/tests/data/csv/merge/numbers.csv b/tests/data/csv/merge/numbers.csv new file mode 100644 index 0000000000000..a49bbf89b1d3d --- /dev/null +++ b/tests/data/csv/merge/numbers.csv @@ -0,0 +1,4 @@ +col1,col2,col3,col4,col5 +0,1,2,3,4 +5,6,7,8,9 +10,11,12,13,14 \ No newline at end of file diff --git a/tests/data/csv/merge/numbers_with_last_string.csv b/tests/data/csv/merge/numbers_with_last_string.csv new file mode 100644 index 0000000000000..d0abce6450294 --- /dev/null +++ b/tests/data/csv/merge/numbers_with_last_string.csv @@ -0,0 +1,5 @@ +col1,col2,col3,col4,col5 +0,1,2,3,4 +5,6,7,8,9 +10,11,12,13,14 +a,b,c,d,e \ No newline at end of file diff --git a/tests/data/csv/mixed.csv b/tests/data/csv/mixed.csv new file mode 100644 index 0000000000000..203cdde68ced0 --- /dev/null +++ b/tests/data/csv/mixed.csv @@ -0,0 +1,4 @@ +id,name,score,active +1,Alice,88.5,true +2,Bob,92.0,false +3,Charlie,,true diff --git a/tests/data/csv/numbers_with_headers.csv b/tests/data/csv/numbers_with_headers.csv new file mode 100644 index 0000000000000..85e74e0d15564 --- /dev/null +++ b/tests/data/csv/numbers_with_headers.csv @@ -0,0 +1,19 @@ +id,value +0,1 +1,2 +2,3 +3,4 +4,5 +5,6 +6,7 +7,8 +8,9 +9,10 +10,11 +11,12 +12,13 +13,14 +14,15 +15,16 +16,17 +17,18 diff --git a/tests/data/csv/ragged.csv b/tests/data/csv/ragged.csv new file mode 100644 index 0000000000000..c0cdce65d93c2 --- /dev/null +++ b/tests/data/csv/ragged.csv @@ -0,0 +1,5 @@ +id,value,comment +1,10,ok +2,20 +3,30,missing one field +4 diff --git a/tests/data/csv/types.csv b/tests/data/csv/types.csv new file mode 100644 index 0000000000000..5ff9d1ece820b --- /dev/null +++ b/tests/data/csv/types.csv @@ -0,0 +1,4 @@ +bool_col,int_col,float_col,date_col,ts_sec,ts_ms,ts_us,ts_ns,utf8_col +true,42,3.14,2025-08-20,2025-08-20T12:34:56,2025-08-20T12:34:56.789,2025-08-20T12:34:56.789123,2025-08-20T12:34:56.789123456,hello +false,-7,-2.5,2024-02-29,2024-02-29T00:00:00,2024-02-29T00:00:00.001,2024-02-29T00:00:00.000001,2024-02-29T00:00:00.000000001,world +true,0,0.0,1970-01-01,1970-01-01T00:00:00,1970-01-01T00:00:00.000,1970-01-01T00:00:00.000000,1970-01-01T00:00:00.000000000,"foo,bar" diff --git a/tests/data/ndjson/max_file_count/numbers0.ndjson b/tests/data/ndjson/max_file_count/numbers0.ndjson new file mode 100644 index 0000000000000..aecddc3762d07 --- /dev/null +++ b/tests/data/ndjson/max_file_count/numbers0.ndjson @@ -0,0 +1,3 @@ +{"id": 1, "value": 100} +{"id": 2, "value": 200} +{"id": 3, "value": 300} diff --git a/tests/data/ndjson/max_file_count/numbers1.ndjson b/tests/data/ndjson/max_file_count/numbers1.ndjson new file mode 100644 index 0000000000000..aecddc3762d07 --- /dev/null +++ b/tests/data/ndjson/max_file_count/numbers1.ndjson @@ -0,0 +1,3 @@ +{"id": 1, "value": 100} +{"id": 2, "value": 200} +{"id": 3, "value": 300} diff --git a/tests/data/ndjson/max_file_count/numbers2.ndjson b/tests/data/ndjson/max_file_count/numbers2.ndjson new file mode 100644 index 0000000000000..aecddc3762d07 --- /dev/null +++ b/tests/data/ndjson/max_file_count/numbers2.ndjson @@ -0,0 +1,3 @@ +{"id": 1, "value": 100} +{"id": 2, "value": 200} +{"id": 3, "value": 300} diff --git a/tests/data/ndjson/max_records.ndjson b/tests/data/ndjson/max_records.ndjson new file mode 100644 index 0000000000000..079f2c82061f1 --- /dev/null +++ b/tests/data/ndjson/max_records.ndjson @@ -0,0 +1,10 @@ +{"id": 1, "value": 100} +{"id": 2, "value": 200} +{"id": 3, "value": 300} +{"id": 4, "value": 400} +{"id": 5, "value": 500} +{"id": 6, "value": "foo"} +{"id": 7, "value": "bar"} +{"id": 8, "value": "baz"} +{"id": 9, "value": "qux"} +{"id": 10, "value": "quux"} diff --git a/tests/data/ndjson/max_records.xz b/tests/data/ndjson/max_records.xz new file mode 100644 index 0000000000000..841c8cf053dfd Binary files /dev/null and b/tests/data/ndjson/max_records.xz differ diff --git a/tests/data/ndjson/max_records.zip b/tests/data/ndjson/max_records.zip new file mode 100644 index 0000000000000..02da2fa12d206 Binary files /dev/null and b/tests/data/ndjson/max_records.zip differ diff --git a/tests/data/ndjson/max_records.zst b/tests/data/ndjson/max_records.zst new file mode 100644 index 0000000000000..77821a433bd6f Binary files /dev/null and b/tests/data/ndjson/max_records.zst differ diff --git a/tests/data/ndjson/merge/numbers.ndjson b/tests/data/ndjson/merge/numbers.ndjson new file mode 100644 index 0000000000000..2c39ee429e7e0 --- /dev/null +++ b/tests/data/ndjson/merge/numbers.ndjson @@ -0,0 +1,3 @@ +{"col1":0,"col2":1,"col3":2,"col4":3,"col5":4} +{"col1":5,"col2":6,"col3":7,"col4":8,"col5":9} +{"col1":10,"col2":11,"col3":12,"col4":13,"col5":14} diff --git a/tests/data/ndjson/merge/numbers_with_last_string.ndjson b/tests/data/ndjson/merge/numbers_with_last_string.ndjson new file mode 100644 index 0000000000000..79e6c98910362 --- /dev/null +++ b/tests/data/ndjson/merge/numbers_with_last_string.ndjson @@ -0,0 +1,4 @@ +{"col1":0,"col2":1,"col3":2,"col4":3,"col5":4} +{"col1":5,"col2":6,"col3":7,"col4":8,"col5":9} +{"col1":10,"col2":11,"col3":12,"col4":13,"col5":14} +{"col1":"a","col2":"b","col3":"c","col4":"d","col5":"e"} \ No newline at end of file diff --git a/tests/data/ndjson/mixed.ndjson b/tests/data/ndjson/mixed.ndjson new file mode 100644 index 0000000000000..f9c139d2f5175 --- /dev/null +++ b/tests/data/ndjson/mixed.ndjson @@ -0,0 +1,3 @@ +{"id": 1, "name": "Alice", "score": 88.5, "active": true} +{"id": 2, "name": "Bob", "score": 92.0, "active": false} +{"id": 3, "name": "Charlie", "score": null, "active": true} diff --git a/tests/data/ndjson/numbers.ndjson b/tests/data/ndjson/numbers.ndjson new file mode 100644 index 0000000000000..aecddc3762d07 --- /dev/null +++ b/tests/data/ndjson/numbers.ndjson @@ -0,0 +1,3 @@ +{"id": 1, "value": 100} +{"id": 2, "value": 200} +{"id": 3, "value": 300} diff --git a/tests/data/ndjson/ragged.ndjson b/tests/data/ndjson/ragged.ndjson new file mode 100644 index 0000000000000..847a327073c2c --- /dev/null +++ b/tests/data/ndjson/ragged.ndjson @@ -0,0 +1,4 @@ +{"id": 1, "value": 10, "comment": "ok"} +{"id": 2, "value": 20} +{"id": 3, "value": 30, "comment": "missing one field"} +{"id": 4} diff --git a/tests/data/ndjson/types.ndjson b/tests/data/ndjson/types.ndjson new file mode 100644 index 0000000000000..99905728103d2 --- /dev/null +++ b/tests/data/ndjson/types.ndjson @@ -0,0 +1,3 @@ +{"bool_col": true, "int_col": 42, "float_col": 3.14, "date_col": "2025-08-20", "ts_sec": "2025-08-20T12:34:56", "ts_ms": "2025-08-20T12:34:56.789", "ts_us": "2025-08-20T12:34:56.789123", "ts_ns": "2025-08-20T12:34:56.789123456", "utf8_col": "hello", "arr_col": [1, 2, 3], "obj_col": {"a": 10, "b": "x"}} +{"bool_col": false, "int_col": -7, "float_col": -2.5, "date_col": "2024-02-29", "ts_sec": "2024-02-29T00:00:00", "ts_ms": "2024-02-29T00:00:00.001", "ts_us": "2024-02-29T00:00:00.000001", "ts_ns": "2024-02-29T00:00:00.000000001", "utf8_col": "world", "arr_col": ["a", "b", "c"], "obj_col": {"a": 20, "b": "y"}} +{"bool_col": true, "int_col": 0, "float_col": 0.0, "date_col": "1970-01-01", "ts_sec": "1970-01-01T00:00:00", "ts_ms": "1970-01-01T00:00:00.000", "ts_us": "1970-01-01T00:00:00.000000", "ts_ns": "1970-01-01T00:00:00.000000000", "utf8_col": "foo,bar", "arr_col": [], "obj_col": {"a": 30, "b": null}} diff --git a/tests/data/parquet/max_file_count/tuple0.parquet b/tests/data/parquet/max_file_count/tuple0.parquet new file mode 100644 index 0000000000000..53ccb995f5bad Binary files /dev/null and b/tests/data/parquet/max_file_count/tuple0.parquet differ diff --git a/tests/data/parquet/max_file_count/tuple1.parquet b/tests/data/parquet/max_file_count/tuple1.parquet new file mode 100644 index 0000000000000..53ccb995f5bad Binary files /dev/null and b/tests/data/parquet/max_file_count/tuple1.parquet differ diff --git a/tests/data/parquet/max_file_count/tuple2.parquet b/tests/data/parquet/max_file_count/tuple2.parquet new file mode 100644 index 0000000000000..53ccb995f5bad Binary files /dev/null and b/tests/data/parquet/max_file_count/tuple2.parquet differ diff --git a/tests/sqllogictests/suites/stage/formats/parquet/infer_schema.test b/tests/sqllogictests/suites/stage/formats/parquet/infer_schema.test index 7304db2b5d09d..9113d03729c9e 100644 --- a/tests/sqllogictests/suites/stage/formats/parquet/infer_schema.test +++ b/tests/sqllogictests/suites/stage/formats/parquet/infer_schema.test @@ -5,55 +5,55 @@ select * from infer_schema(location => '@data/invalid_xxx/tuple.parquet') query select * from infer_schema(location => '@data/parquet/tuple.parquet') ---- -id INT 0 0 -t TUPLE(A INT32, B STRING) 0 1 +id INT 0 parquet/tuple.parquet 0 +t TUPLE(A INT32, B STRING) 0 parquet/tuple.parquet 1 query select * from infer_schema(location => '@data/parquet/complex.parquet') ---- -resourceType VARCHAR 1 0 -id VARCHAR 1 1 -meta TUPLE(ID STRING, EXTENSION ARRAY(STRING), VERSIONID STRING, LASTUPDATED TIMESTAMP, SOURCE STRING, PROFILE ARRAY(STRING), SECURITY ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TAG ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN))) 1 2 -implicitRules VARCHAR 1 3 -language VARCHAR 1 4 -text TUPLE(ID STRING, EXTENSION ARRAY(STRING), STATUS STRING, DIV STRING) 1 5 -contained ARRAY(STRING) 1 6 -extension ARRAY(STRING) 1 7 -modifierExtension ARRAY(STRING) 1 8 -identifier ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING))) 1 9 -active BOOLEAN 1 10 -name ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TEXT STRING, FAMILY STRING, GIVEN ARRAY(STRING), PREFIX ARRAY(STRING), SUFFIX ARRAY(STRING), PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 11 -telecom ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VALUE STRING, USE STRING, RANK INT32, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 12 -gender VARCHAR 1 13 -birthDate DATE 1 14 -deceasedBoolean BOOLEAN 1 15 -deceasedDateTime TIMESTAMP 1 16 -address ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE STRING, TEXT STRING, LINE ARRAY(STRING), CITY STRING, DISTRICT STRING, STATE STRING, POSTALCODE STRING, COUNTRY STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 17 -maritalStatus TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING) 1 18 -multipleBirthBoolean BOOLEAN 1 19 -multipleBirthInteger INT 1 20 -photo ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), CONTENTTYPE STRING, LANGUAGE STRING, DATA BINARY, URL STRING, SIZE INT32, HASH BINARY, TITLE STRING, CREATION TIMESTAMP)) 1 21 -contact ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), MODIFIEREXTENSION ARRAY(STRING), RELATIONSHIP ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING)), NAME TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TEXT STRING, FAMILY STRING, GIVEN ARRAY(STRING), PREFIX ARRAY(STRING), SUFFIX ARRAY(STRING), PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP)), TELECOM ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VALUE STRING, USE STRING, RANK INT32, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))), ADDRESS TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE STRING, TEXT STRING, LINE ARRAY(STRING), CITY STRING, DISTRICT STRING, STATE STRING, POSTALCODE STRING, COUNTRY STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP)), GENDER STRING, ORGANIZATION TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING), PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 22 -communication ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), MODIFIEREXTENSION ARRAY(STRING), LANGUAGE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), PREFERRED BOOLEAN)) 1 23 -generalPractitioner ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING)) 1 24 -managingOrganization TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING) 1 25 -link ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), MODIFIEREXTENSION ARRAY(STRING), OTHER TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING), TYPE STRING)) 1 26 -yy__version INT 1 27 -yy__us_core_race VARCHAR 1 28 -yy__us_core_ethnicity VARCHAR 1 29 -yy__us_core_birthsex TUPLE(VALUECODE STRING,) 1 30 +resourceType VARCHAR 1 parquet/complex.parquet 0 +id VARCHAR 1 parquet/complex.parquet 1 +meta TUPLE(ID STRING, EXTENSION ARRAY(STRING), VERSIONID STRING, LASTUPDATED TIMESTAMP, SOURCE STRING, PROFILE ARRAY(STRING), SECURITY ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TAG ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN))) 1 parquet/complex.parquet 2 +implicitRules VARCHAR 1 parquet/complex.parquet 3 +language VARCHAR 1 parquet/complex.parquet 4 +text TUPLE(ID STRING, EXTENSION ARRAY(STRING), STATUS STRING, DIV STRING) 1 parquet/complex.parquet 5 +contained ARRAY(STRING) 1 parquet/complex.parquet 6 +extension ARRAY(STRING) 1 parquet/complex.parquet 7 +modifierExtension ARRAY(STRING) 1 parquet/complex.parquet 8 +identifier ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING))) 1 parquet/complex.parquet 9 +active BOOLEAN 1 parquet/complex.parquet 10 +name ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TEXT STRING, FAMILY STRING, GIVEN ARRAY(STRING), PREFIX ARRAY(STRING), SUFFIX ARRAY(STRING), PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 parquet/complex.parquet 11 +telecom ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VALUE STRING, USE STRING, RANK INT32, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 parquet/complex.parquet 12 +gender VARCHAR 1 parquet/complex.parquet 13 +birthDate DATE 1 parquet/complex.parquet 14 +deceasedBoolean BOOLEAN 1 parquet/complex.parquet 15 +deceasedDateTime TIMESTAMP 1 parquet/complex.parquet 16 +address ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE STRING, TEXT STRING, LINE ARRAY(STRING), CITY STRING, DISTRICT STRING, STATE STRING, POSTALCODE STRING, COUNTRY STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 parquet/complex.parquet 17 +maritalStatus TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING) 1 parquet/complex.parquet 18 +multipleBirthBoolean BOOLEAN 1 parquet/complex.parquet 19 +multipleBirthInteger INT 1 parquet/complex.parquet 20 +photo ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), CONTENTTYPE STRING, LANGUAGE STRING, DATA BINARY, URL STRING, SIZE INT32, HASH BINARY, TITLE STRING, CREATION TIMESTAMP)) 1 parquet/complex.parquet 21 +contact ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), MODIFIEREXTENSION ARRAY(STRING), RELATIONSHIP ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING)), NAME TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TEXT STRING, FAMILY STRING, GIVEN ARRAY(STRING), PREFIX ARRAY(STRING), SUFFIX ARRAY(STRING), PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP)), TELECOM ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VALUE STRING, USE STRING, RANK INT32, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))), ADDRESS TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE STRING, TEXT STRING, LINE ARRAY(STRING), CITY STRING, DISTRICT STRING, STATE STRING, POSTALCODE STRING, COUNTRY STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP)), GENDER STRING, ORGANIZATION TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING), PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP))) 1 parquet/complex.parquet 22 +communication ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), MODIFIEREXTENSION ARRAY(STRING), LANGUAGE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), PREFERRED BOOLEAN)) 1 parquet/complex.parquet 23 +generalPractitioner ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING)) 1 parquet/complex.parquet 24 +managingOrganization TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING) 1 parquet/complex.parquet 25 +link ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), MODIFIEREXTENSION ARRAY(STRING), OTHER TUPLE(ID STRING, EXTENSION ARRAY(STRING), REFERENCE STRING, TYPE STRING, IDENTIFIER TUPLE(ID STRING, EXTENSION ARRAY(STRING), USE STRING, TYPE TUPLE(ID STRING, EXTENSION ARRAY(STRING), CODING ARRAY(TUPLE(ID STRING, EXTENSION ARRAY(STRING), SYSTEM STRING, VERSION STRING, CODE STRING, DISPLAY STRING, USERSELECTED BOOLEAN)), TEXT STRING), SYSTEM STRING, VALUE STRING, PERIOD TUPLE(ID STRING, EXTENSION ARRAY(STRING), START TIMESTAMP, END TIMESTAMP), ASSIGNER STRING), DISPLAY STRING), TYPE STRING)) 1 parquet/complex.parquet 26 +yy__version INT 1 parquet/complex.parquet 27 +yy__us_core_race VARCHAR 1 parquet/complex.parquet 28 +yy__us_core_ethnicity VARCHAR 1 parquet/complex.parquet 29 +yy__us_core_birthsex TUPLE(VALUECODE STRING,) 1 parquet/complex.parquet 30 query select * from infer_schema(location => '@data/parquet/variant.parquet') ---- -a INT 0 0 -b VARIANT 0 1 +a INT 0 parquet/variant.parquet 0 +b VARIANT 0 parquet/variant.parquet 1 query select * from infer_schema(location => '@data/parquet/', FILE_FORMAT => 'PARQUET', pattern => 'tuple.*') ---- -id INT 0 0 -t TUPLE(A INT32, B STRING) 0 1 +id INT 0 parquet/tuple.parquet 0 +t TUPLE(A INT32, B STRING) 0 parquet/tuple.parquet 1 statement ok drop CONNECTION IF EXISTS my_conn @@ -61,8 +61,195 @@ drop CONNECTION IF EXISTS my_conn statement ok create CONNECTION my_conn STORAGE_TYPE = 's3' access_key_id='minioadmin' secret_access_key='minioadmin' endpoint_url='http://127.0.0.1:9900/' region='auto' -query -select * from INFER_SCHEMA(location => 's3://testbucket/data/parquet/tuple.parquet', connection_name => 'my_conn') +# query +# select * from INFER_SCHEMA(location => 's3://testbucket/data/parquet/tuple.parquet', connection_name => 'my_conn') +# ---- +# id INT 0 parquet/tuple.parquet 0 +# t TUPLE(A INT32, B STRING) 0 parquet/tuple.parquet 1 + +query T +select CASE + WHEN filenames LIKE '%,%' + THEN 'Y' + ELSE 'N' + END AS format_check +from infer_schema(location => '@data/parquet/max_file_count', max_file_count => 2) +---- +Y +Y + +# CSV +statement ok +create or replace file format head_csv_format type = 'CSV' field_delimiter = ',' skip_header = 1; + +query TTBTI +select * from infer_schema(location => '@data/csv/numbers_with_headers.csv', file_format => 'CSV'); +---- +column_1 VARCHAR 1 csv/numbers_with_headers.csv 0 +column_2 VARCHAR 1 csv/numbers_with_headers.csv 1 + +query TTBTI +select * from infer_schema(location => '@data/csv/numbers_with_headers.csv', file_format => 'head_csv_format'); +---- +id BIGINT 1 csv/numbers_with_headers.csv 0 +value BIGINT 1 csv/numbers_with_headers.csv 1 + +statement error +select * from infer_schema(location => '@data/csv/ragged.csv', file_format => 'head_csv_format'); + +query TTBTI +select * from infer_schema(location => '@data/csv/max_records.csv', file_format => 'head_csv_format'); +---- +id BIGINT 1 csv/max_records.csv 0 +value VARCHAR 1 csv/max_records.csv 1 + +query TTBTI +select * from infer_schema(location => '@data/csv/max_records.csv', file_format => 'head_csv_format', max_records_pre_file => 5); +---- +id BIGINT 1 csv/max_records.csv 0 +value BIGINT 1 csv/max_records.csv 1 + +query TTBTI +select * from infer_schema(location => '@data/csv/max_records.zip', file_format => 'head_csv_format', max_records_pre_file => 5); +---- +id BIGINT 1 csv/max_records.zip 0 +value BIGINT 1 csv/max_records.zip 1 + +query TTBTI +select * from infer_schema(location => '@data/csv/max_records.zst', file_format => 'head_csv_format', max_records_pre_file => 5); +---- +id BIGINT 1 csv/max_records.zst 0 +value BIGINT 1 csv/max_records.zst 1 + +query TTBTI +select * from infer_schema(location => '@data/csv/max_records.csv', file_format => 'head_csv_format', max_records_pre_file => 5); +---- +id BIGINT 1 csv/max_records.csv 0 +value BIGINT 1 csv/max_records.csv 1 + +query TTBTI +select * from infer_schema(location => '@data/csv/max_records.xz', file_format => 'head_csv_format', max_records_pre_file => 5); +---- +id BIGINT 1 csv/max_records.xz 0 +value BIGINT 1 csv/max_records.xz 1 + +query TTBTI +select * from infer_schema(location => '@data/csv/types.csv', file_format => 'head_csv_format') +---- +bool_col BOOLEAN 1 csv/types.csv 0 +int_col BIGINT 1 csv/types.csv 1 +float_col DOUBLE 1 csv/types.csv 2 +date_col DATE 1 csv/types.csv 3 +ts_sec TIMESTAMP 1 csv/types.csv 4 +ts_ms TIMESTAMP 1 csv/types.csv 5 +ts_us TIMESTAMP 1 csv/types.csv 6 +ts_ns TIMESTAMP 1 csv/types.csv 7 +utf8_col VARCHAR 1 csv/types.csv 8 + +query TTBTI +select column_name, type, nullable, order_id from infer_schema(location => '@data/csv/merge/', file_format => 'head_csv_format'); +---- +col1 VARCHAR 1 0 +col2 VARCHAR 1 1 +col3 VARCHAR 1 2 +col4 VARCHAR 1 3 +col5 VARCHAR 1 4 + +query T +select CASE + WHEN filenames LIKE '%,%' + THEN 'Y' + ELSE 'N' + END AS format_check +from infer_schema(location => '@data/csv/max_file_count/', file_format => 'head_csv_format', max_file_count => 2); +---- +Y +Y +Y +Y +Y + +# NDJSON +query TTBTI +select * from infer_schema(location => '@data/ndjson/numbers.ndjson', file_format => 'NDJSON'); +---- +id BIGINT 1 ndjson/numbers.ndjson 0 +value BIGINT 1 ndjson/numbers.ndjson 1 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/ragged.ndjson', file_format => 'NDJSON'); +---- +id BIGINT 1 ndjson/ragged.ndjson 0 +value BIGINT 1 ndjson/ragged.ndjson 1 +comment VARCHAR 1 ndjson/ragged.ndjson 2 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/max_records.ndjson', file_format => 'NDJSON'); +---- +id BIGINT 1 ndjson/max_records.ndjson 0 +value VARCHAR 1 ndjson/max_records.ndjson 1 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/max_records.ndjson', file_format => 'NDJSON', max_records_pre_file => 5); +---- +id BIGINT 1 ndjson/max_records.ndjson 0 +value BIGINT 1 ndjson/max_records.ndjson 1 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/max_records.zip', file_format => 'NDJSON', max_records_pre_file => 5); +---- +id BIGINT 1 ndjson/max_records.zip 0 +value BIGINT 1 ndjson/max_records.zip 1 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/max_records.zst', file_format => 'NDJSON', max_records_pre_file => 5); +---- +id BIGINT 1 ndjson/max_records.zst 0 +value BIGINT 1 ndjson/max_records.zst 1 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/max_records.ndjson', file_format => 'NDJSON', max_records_pre_file => 5); +---- +id BIGINT 1 ndjson/max_records.ndjson 0 +value BIGINT 1 ndjson/max_records.ndjson 1 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/max_records.xz', file_format => 'NDJSON', max_records_pre_file => 5) +---- +id BIGINT 1 ndjson/max_records.xz 0 +value BIGINT 1 ndjson/max_records.xz 1 + +query TTBTI +select * from infer_schema(location => '@data/ndjson/types.ndjson', file_format => 'NDJSON') +---- +bool_col BOOLEAN 1 ndjson/types.ndjson 0 +int_col BIGINT 1 ndjson/types.ndjson 1 +float_col DOUBLE 1 ndjson/types.ndjson 2 +date_col VARCHAR 1 ndjson/types.ndjson 3 +ts_sec VARCHAR 1 ndjson/types.ndjson 4 +ts_ms VARCHAR 1 ndjson/types.ndjson 5 +ts_us VARCHAR 1 ndjson/types.ndjson 6 +ts_ns VARCHAR 1 ndjson/types.ndjson 7 +utf8_col VARCHAR 1 ndjson/types.ndjson 8 +arr_col ARRAY(STRING) 1 ndjson/types.ndjson 9 +obj_col TUPLE(A INT64, B STRING) 1 ndjson/types.ndjson 10 + +query TTBTI +select column_name, type, nullable, order_id from infer_schema(location => '@data/ndjson/merge/', file_format => 'NDJSON'); +---- +col1 VARCHAR 1 0 +col2 VARCHAR 1 1 +col3 VARCHAR 1 2 +col4 VARCHAR 1 3 +col5 VARCHAR 1 4 + +query T +select CASE + WHEN filenames LIKE '%,%' + THEN 'Y' + ELSE 'N' + END AS format_check +from infer_schema(location => '@data/ndjson/max_file_count/', file_format => 'NDJSON', max_file_count => 2); ---- -id INT 0 0 -t TUPLE(A INT32, B STRING) 0 1 +Y +Y \ No newline at end of file diff --git a/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test b/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test index 522f9167dc50b..9e5e2344678a3 100644 --- a/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test +++ b/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test @@ -7,17 +7,17 @@ create table t1 (c1 int, c2 int, c3 int64, c4 string default 'ok') query select * from infer_schema(location => '@data/parquet/diff_schema/f1.parquet') ---- -c1 BIGINT 1 0 -c2 SMALLINT 1 1 -c3 BIGINT 1 2 +c1 BIGINT 1 parquet/diff_schema/f1.parquet 0 +c2 SMALLINT 1 parquet/diff_schema/f1.parquet 1 +c3 BIGINT 1 parquet/diff_schema/f1.parquet 2 query select * from infer_schema(location => '@data/parquet/diff_schema/f2.parquet') ---- -c6 BIGINT 1 0 -c5 BIGINT 1 1 -c2 BIGINT 1 2 -c4 VARCHAR 1 3 +c6 BIGINT 1 parquet/diff_schema/f2.parquet 0 +c5 BIGINT 1 parquet/diff_schema/f2.parquet 1 +c2 BIGINT 1 parquet/diff_schema/f2.parquet 2 +c4 VARCHAR 1 parquet/diff_schema/f2.parquet 3 query error copy into t1 from @data/parquet/diff_schema/ file_format=(type=parquet) pattern='.*[.]parquet' diff --git a/tests/sqllogictests/suites/stage/formats/parquet/parquet_field_types.test b/tests/sqllogictests/suites/stage/formats/parquet/parquet_field_types.test index 1565b70444b99..94e2bc39feb0c 100644 --- a/tests/sqllogictests/suites/stage/formats/parquet/parquet_field_types.test +++ b/tests/sqllogictests/suites/stage/formats/parquet/parquet_field_types.test @@ -106,13 +106,13 @@ NULL query select * from infer_schema (location => '@data/parquet/int96.parquet') ---- -id VARCHAR 1 0 -t_bool BOOLEAN 1 1 -t_float FLOAT 1 2 -t_double DOUBLE 1 3 -t_timestamp TIMESTAMP 1 4 -t_data DATE 1 5 -t_array ARRAY(INT32) 1 6 +id VARCHAR 1 parquet/int96.parquet 0 +t_bool BOOLEAN 1 parquet/int96.parquet 1 +t_float FLOAT 1 parquet/int96.parquet 2 +t_double DOUBLE 1 parquet/int96.parquet 3 +t_timestamp TIMESTAMP 1 parquet/int96.parquet 4 +t_data DATE 1 parquet/int96.parquet 5 +t_array ARRAY(INT32) 1 parquet/int96.parquet 6 # the physical type of column t_timestamp is INT96 query