diff --git a/Cargo.lock b/Cargo.lock index 2262adf9186..06a65bf6ca7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2761,6 +2761,7 @@ dependencies = [ "authz", "backtrace", "base64 0.21.7", + "bytes", "clap", "console-subscriber", "datafusion_util", @@ -2793,6 +2794,7 @@ dependencies = [ "observability_deps", "panic_logging", "parking_lot", + "parquet", "parquet_file", "pretty_assertions", "rand", diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 189a318fa61..2d545ce39f0 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -40,6 +40,7 @@ influxdb3_sys_events = { path = "../influxdb3_sys_events" } anyhow.workspace = true backtrace.workspace = true base64.workspace = true +bytes.workspace = true clap.workspace = true dotenvy.workspace = true hashbrown.workspace = true @@ -92,6 +93,7 @@ assert_cmd.workspace = true futures.workspace = true hyper.workspace = true insta.workspace = true +parquet.workspace = true pretty_assertions.workspace = true reqwest.workspace = true serde.workspace = true diff --git a/influxdb3/src/commands/show.rs b/influxdb3/src/commands/show.rs index 8e71d753e3c..e5105529376 100644 --- a/influxdb3/src/commands/show.rs +++ b/influxdb3/src/commands/show.rs @@ -1,9 +1,33 @@ use clap::Parser; use secrecy::{ExposeSecret, Secret}; -use std::error::Error; +use std::io; +use std::str::Utf8Error; +use tokio::fs::OpenOptions; +use tokio::io::AsyncWriteExt; use url::Url; use crate::commands::common::Format; +use system::Error as SystemCommandError; +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error(transparent)] + Client(#[from] influxdb3_client::Error), + + #[error( + "must specify an output file path with `--output` parameter when formatting \ + the output as `parquet`" + )] + NoOutputFileForParquet, + + #[error("invalid UTF8 received from server: {0}")] + Utf8(#[from] Utf8Error), + + #[error("io error: {0}")] + Io(#[from] io::Error), + + #[error(transparent)] + SystemCommand(#[from] SystemCommandError), +} mod system; use system::SystemConfig; @@ -45,15 +69,20 @@ pub struct DatabaseConfig { /// The format in which to output the list of databases #[clap(value_enum, long = "format", default_value = "pretty")] output_format: Format, + + /// Put the list of databases into `output` + #[clap(short = 'o', long = "output")] + output_file_path: Option, } -pub(crate) async fn command(config: Config) -> Result<(), Box> { +pub(crate) async fn command(config: Config) -> Result<(), Error> { match config.cmd { SubCommand::Databases(DatabaseConfig { host_url, auth_token, show_deleted, output_format, + output_file_path, }) => { let mut client = influxdb3_client::Client::new(host_url)?; @@ -61,14 +90,27 @@ pub(crate) async fn command(config: Config) -> Result<(), Box> { client = client.with_auth_token(t.expose_secret()); } - let resp_bytes = client + let mut resp_bytes = client .api_v3_configure_db_show() - .with_format(output_format.into()) + .with_format(output_format.clone().into()) .with_show_deleted(show_deleted) .send() .await?; - println!("{}", std::str::from_utf8(&resp_bytes)?); + if let Some(path) = output_file_path { + let mut f = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; + f.write_all_buf(&mut resp_bytes).await?; + } else { + if output_format.is_parquet() { + Err(Error::NoOutputFileForParquet)? + } + println!("{}", std::str::from_utf8(&resp_bytes)?); + } } SubCommand::System(cfg) => system::command(cfg).await?, } diff --git a/influxdb3/src/commands/show/system.rs b/influxdb3/src/commands/show/system.rs index 7d0d6cce701..509378879d5 100644 --- a/influxdb3/src/commands/show/system.rs +++ b/influxdb3/src/commands/show/system.rs @@ -1,9 +1,12 @@ +use super::super::common::{Format, InfluxDb3Config}; +use bytes::Bytes; use clap::Parser; use influxdb3_client::Client; use secrecy::ExposeSecret; use serde::Deserialize; - -use super::super::common::{Format, InfluxDb3Config}; +use std::io; +use tokio::fs::OpenOptions; +use tokio::io::AsyncWriteExt; #[derive(Debug, thiserror::Error)] pub(crate) enum Error { @@ -15,6 +18,15 @@ pub(crate) enum Error { #[error("system table '{0}' not found: {1}")] SystemTableNotFound(String, SystemTableNotFound), + + #[error( + "must specify an output file path with `--output` parameter when formatting \ + the output as `parquet`" + )] + NoOutputFileForParquet, + + #[error("io error: {0}")] + Io(#[from] io::Error), } pub type Result = std::result::Result; @@ -77,6 +89,10 @@ pub struct TableListConfig { /// The format in which to output the query #[clap(value_enum, long = "format", default_value = "pretty")] output_format: Format, + + /// Put the table lists output into `output` + #[clap(short = 'o', long = "output")] + output_file_path: Option, } const SYS_TABLES_QUERY: &str = "WITH cols (table_name, column_name) AS (SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'system' ORDER BY (table_name, column_name)) SELECT table_name, array_agg(column_name) AS column_names FROM cols GROUP BY table_name ORDER BY table_name"; @@ -100,14 +116,19 @@ impl std::fmt::Display for SystemTableNotFound { impl SystemCommandRunner { async fn list(&self, config: TableListConfig) -> Result<()> { - let bs = self + let TableListConfig { + output_format, + output_file_path, + } = &config; + + let mut bs = self .client .api_v3_query_sql(self.db.as_str(), SYS_TABLES_QUERY) - .format(config.output_format.into()) + .format(output_format.clone().into()) .send() .await?; - println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap()); + write_to_std_out_or_file(output_file_path, output_format, &mut bs).await?; Ok(()) } @@ -124,7 +145,7 @@ pub struct TableConfig { limit: u16, /// Order by the specified fields. - #[clap(long = "order-by", short = 'o', num_args = 1, value_delimiter = ',')] + #[clap(long = "order-by", num_args = 1, value_delimiter = ',')] order_by: Vec, /// Select specified fields from table. @@ -134,6 +155,10 @@ pub struct TableConfig { /// The format in which to output the query #[clap(value_enum, long = "format", default_value = "pretty")] output_format: Format, + + /// Put the table output into `output` + #[clap(short = 'o', long = "output")] + output_file_path: Option, } impl SystemCommandRunner { @@ -157,6 +182,7 @@ impl SystemCommandRunner { select, order_by, output_format, + output_file_path, } = &config; let select_expr = if !select.is_empty() { @@ -185,7 +211,7 @@ impl SystemCommandRunner { let query = clauses.join("\n"); - let bs = match client + let mut bs = match client .api_v3_query_sql(db, query) .format(output_format.clone().into()) .send() @@ -204,13 +230,33 @@ impl SystemCommandRunner { return Err(e.into()); } }; - - println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap()); - + write_to_std_out_or_file(output_file_path, output_format, &mut bs).await?; Ok(()) } } +async fn write_to_std_out_or_file( + output_file_path: &Option, + output_format: &Format, + bytes: &mut Bytes, +) -> Result<()> { + if let Some(path) = output_file_path { + let mut f = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; + f.write_all_buf(bytes).await?; + } else { + if output_format.is_parquet() { + Err(Error::NoOutputFileForParquet)? + } + println!("{}", String::from_utf8(bytes.as_ref().to_vec()).unwrap()); + } + Ok(()) +} + #[derive(Debug, Parser)] pub struct SummaryConfig { /// The maximum number of entries from each table to display in the output. Default is 10 and 0 @@ -221,25 +267,44 @@ pub struct SummaryConfig { /// The format in which to output the query #[clap(value_enum, long = "format", default_value = "pretty")] output_format: Format, + + /// Put the summary into `output` + #[clap(short = 'o', long = "output")] + output_file_path: Option, } impl SystemCommandRunner { async fn summary(&self, config: SummaryConfig) -> Result<()> { - self.summarize_all_tables(config.limit, &config.output_format) - .await?; + self.summarize_all_tables( + config.limit, + &config.output_format, + &config.output_file_path, + ) + .await?; Ok(()) } - async fn summarize_all_tables(&self, limit: u16, format: &Format) -> Result<()> { + async fn summarize_all_tables( + &self, + limit: u16, + format: &Format, + output_file_path: &Option, + ) -> Result<()> { let system_tables = self.get_system_tables().await?; for table in system_tables { - self.summarize_table(table.table_name.as_str(), limit, format) + self.summarize_table(table.table_name.as_str(), limit, format, output_file_path) .await?; } Ok(()) } - async fn summarize_table(&self, table_name: &str, limit: u16, format: &Format) -> Result<()> { + async fn summarize_table( + &self, + table_name: &str, + limit: u16, + format: &Format, + output_file_path: &Option, + ) -> Result<()> { let Self { db, client } = self; let mut clauses = vec![format!("SELECT * FROM system.{table_name}")]; @@ -257,14 +322,29 @@ impl SystemCommandRunner { let query = clauses.join("\n"); - let bs = client + let mut bs = client .api_v3_query_sql(db, query) .format(format.clone().into()) .send() .await?; - println!("{table_name} summary:"); - println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap()); + if let Some(path) = output_file_path { + let mut f = OpenOptions::new() + .write(true) + .create(true) + .append(true) + .open(path) + .await?; + f.write_all_buf(&mut bs).await?; + } else { + if format.is_parquet() { + Err(Error::NoOutputFileForParquet)? + } + + println!("{table_name} summary:"); + println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap()); + } + Ok(()) } } diff --git a/influxdb3/tests/cli/mod.rs b/influxdb3/tests/cli/mod.rs index 57e47259613..1a5f0813347 100644 --- a/influxdb3/tests/cli/mod.rs +++ b/influxdb3/tests/cli/mod.rs @@ -1,10 +1,14 @@ use crate::server::{ConfigProvider, TestServer}; +use arrow_array::RecordBatch; +use arrow_util::assert_batches_eq; use assert_cmd::cargo::CommandCargoExt; use assert_cmd::Command as AssertCmd; use observability_deps::tracing::debug; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use pretty_assertions::assert_eq; use serde_json::{json, Value}; use std::{ + fs::File, io::Write, process::{Command, Stdio}, thread, @@ -333,6 +337,45 @@ async fn test_show_databases() { ]); // don't assert on actual output since it contains a time stamp which would be flaky assert_contains!(output, "foo-"); + + let expected_output = [ + "+---------------+", + "| iox::database |", + "+---------------+", + "| bar |", + "+---------------+", + ]; + let temp_file = NamedTempFile::new().unwrap(); + let file_path = temp_file.path().to_str().unwrap(); + + // Success test cases for show commands output to be copied in a file + let _ = run(&[ + "show", + "databases", + "--host", + &server_addr, + "--format", + "parquet", + "--output", + file_path, + ]); + let file = File::open(file_path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let actual: Vec = builder.build().unwrap().map(|r| r.unwrap()).collect(); + assert_batches_eq!(expected_output, &actual); + + // Failure test case for show command when the format is parquet and output not provided + let test_name = "fail without output-file when format is parquet for show databases"; + let output = run_and_err(&[ + "show", + "databases", + "--host", + &server_addr, + "--format", + "parquet", + ]); + let snap_name = test_name.replace(' ', "_"); + insta::assert_snapshot!(snap_name, output); } #[test_log::test(tokio::test)] @@ -1136,18 +1179,15 @@ async fn test_show_system() { ], }, ]; - for case in cases { let output = run(&case.args); let snap_name = case.name.replace(' ', "_"); insta::assert_snapshot!(snap_name, output); } - struct FailTestCase<'a> { name: &'static str, args: Vec<&'a str>, } - let cases = vec![ FailTestCase { name: "fail without database name", @@ -1161,6 +1201,18 @@ async fn test_show_system() { name: "iox schema table name exists, but should error because we're concerned here with system tables", args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table", "cpu"], }, + FailTestCase { + name: "fail without output-file when format is parquet for table", + args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table", "--format", "parquet", "distinct_caches"] + }, + FailTestCase { + name: "fail without output-file when format is parquet for table-list", + args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table-list", "--format", "parquet"] + }, + FailTestCase { + name: "fail without output-file when format is parquet for summary", + args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "summary", "--format", "parquet"] + } ]; for case in cases { @@ -1168,6 +1220,77 @@ async fn test_show_system() { let snap_name = case.name.replace(' ', "_"); insta::assert_snapshot!(snap_name, output); } + let temp_file = NamedTempFile::new().unwrap(); + let file_path = temp_file.path().to_str().unwrap(); + + let expected_output = vec![ "+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| table_name | column_names |", + "+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| distinct_caches | [table, name, column_ids, column_names, max_cardinality, max_age_seconds] |", + "| last_caches | [table, name, key_column_ids, key_column_names, value_column_ids, value_column_names, count, ttl] |", + "| parquet_files | [table_name, path, size_bytes, row_count, min_time, max_time] |", + "| processing_engine_triggers | [trigger_name, plugin_filename, trigger_specification, disabled] |", + "| queries | [id, phase, issue_time, query_type, query_text, partitions, parquet_files, plan_duration, permit_duration, execute_duration, end2end_duration, compute_duration, max_memory, success, running, cancelled, trace_id] |", + "+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",]; + let args = vec![ + "show", + "system", + "--host", + server_addr.as_str(), + "--database", + db_name, + "table-list", + "--format", + "parquet", + "--output", + file_path, + ]; + run(&args); + let file = File::open(file_path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let actual: Vec = builder.build().unwrap().map(|r| r.unwrap()).collect(); + assert_batches_eq!(expected_output, &actual); + + let expected_output = vec!["++", "++"]; + let args = vec![ + "show", + "system", + "--host", + server_addr.as_str(), + "--database", + db_name, + "table", + "--format", + "parquet", + "distinct_caches", + "--output", + file_path, + ]; + run(&args); + let file = File::open(file_path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let actual: Vec = builder.build().unwrap().map(|r| r.unwrap()).collect(); + assert_batches_eq!(expected_output, &actual); + + let expected_output = vec!["++", "++"]; + let args = vec![ + "show", + "system", + "--host", + server_addr.as_str(), + "--database", + db_name, + "summary", + "--format", + "parquet", + "--output", + file_path, + ]; + run(&args); + let file = File::open(file_path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let actual: Vec = builder.build().unwrap().map(|r| r.unwrap()).collect(); + assert_batches_eq!(expected_output, &actual); } #[tokio::test] @@ -1412,7 +1535,7 @@ async fn test_wal_plugin_errors() { expected_error: &'static str, } - let tests = vec![ + let tests = vec![ Test { name: "invalid_python", plugin_code: r#" diff --git a/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_show_databases.snap b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_show_databases.snap new file mode 100644 index 00000000000..f1a9fe3c7a0 --- /dev/null +++ b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_show_databases.snap @@ -0,0 +1,6 @@ +--- +source: influxdb3/tests/cli/mod.rs +assertion_line: 391 +expression: output +--- +Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet` diff --git a/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_summary.snap b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_summary.snap new file mode 100644 index 00000000000..27cb1387134 --- /dev/null +++ b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_summary.snap @@ -0,0 +1,5 @@ +--- +source: influxdb3/tests/cli/mod.rs +expression: output +--- +Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet` \ No newline at end of file diff --git a/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_table-list.snap b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_table-list.snap new file mode 100644 index 00000000000..27cb1387134 --- /dev/null +++ b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_table-list.snap @@ -0,0 +1,5 @@ +--- +source: influxdb3/tests/cli/mod.rs +expression: output +--- +Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet` \ No newline at end of file diff --git a/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_table.snap b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_table.snap new file mode 100644 index 00000000000..27cb1387134 --- /dev/null +++ b/influxdb3/tests/cli/snapshots/lib__cli__fail_without_output-file_when_format_is_parquet_for_table.snap @@ -0,0 +1,5 @@ +--- +source: influxdb3/tests/cli/mod.rs +expression: output +--- +Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet` \ No newline at end of file