Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ influxdb3_sys_events = { path = "../influxdb3_sys_events" }
anyhow.workspace = true
backtrace.workspace = true
base64.workspace = true
bytes.workspace = true
clap.workspace = true
dotenvy.workspace = true
hashbrown.workspace = true
Expand Down Expand Up @@ -92,6 +93,7 @@ assert_cmd.workspace = true
futures.workspace = true
hyper.workspace = true
insta.workspace = true
parquet.workspace = true
pretty_assertions.workspace = true
reqwest.workspace = true
serde.workspace = true
Expand Down
52 changes: 47 additions & 5 deletions influxdb3/src/commands/show.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
use clap::Parser;
use secrecy::{ExposeSecret, Secret};
use std::error::Error;
use std::io;
use std::str::Utf8Error;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use url::Url;

use crate::commands::common::Format;
use system::Error as SystemCommandError;
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] influxdb3_client::Error),

#[error(
"must specify an output file path with `--output` parameter when formatting \
the output as `parquet`"
)]
NoOutputFileForParquet,

#[error("invalid UTF8 received from server: {0}")]
Utf8(#[from] Utf8Error),

#[error("io error: {0}")]
Io(#[from] io::Error),

#[error(transparent)]
SystemCommand(#[from] SystemCommandError),
}

mod system;
use system::SystemConfig;
Expand Down Expand Up @@ -45,30 +69,48 @@ pub struct DatabaseConfig {
/// The format in which to output the list of databases
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the list of databases into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
pub(crate) async fn command(config: Config) -> Result<(), Error> {
match config.cmd {
SubCommand::Databases(DatabaseConfig {
host_url,
auth_token,
show_deleted,
output_format,
output_file_path,
}) => {
let mut client = influxdb3_client::Client::new(host_url)?;

if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}

let resp_bytes = client
let mut resp_bytes = client
.api_v3_configure_db_show()
.with_format(output_format.into())
.with_format(output_format.clone().into())
.with_show_deleted(show_deleted)
.send()
.await?;

println!("{}", std::str::from_utf8(&resp_bytes)?);
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut resp_bytes).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", std::str::from_utf8(&resp_bytes)?);
}
}
SubCommand::System(cfg) => system::command(cfg).await?,
}
Expand Down
116 changes: 98 additions & 18 deletions influxdb3/src/commands/show/system.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use super::super::common::{Format, InfluxDb3Config};
use bytes::Bytes;
use clap::Parser;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use serde::Deserialize;

use super::super::common::{Format, InfluxDb3Config};
use std::io;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
Expand All @@ -15,6 +18,15 @@ pub(crate) enum Error {

#[error("system table '{0}' not found: {1}")]
SystemTableNotFound(String, SystemTableNotFound),

#[error(
"must specify an output file path with `--output` parameter when formatting \
the output as `parquet`"
)]
NoOutputFileForParquet,

#[error("io error: {0}")]
Io(#[from] io::Error),
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -77,6 +89,10 @@ pub struct TableListConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the table lists output into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

const SYS_TABLES_QUERY: &str = "WITH cols (table_name, column_name) AS (SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'system' ORDER BY (table_name, column_name)) SELECT table_name, array_agg(column_name) AS column_names FROM cols GROUP BY table_name ORDER BY table_name";
Expand All @@ -100,14 +116,19 @@ impl std::fmt::Display for SystemTableNotFound {

impl SystemCommandRunner {
async fn list(&self, config: TableListConfig) -> Result<()> {
let bs = self
let TableListConfig {
output_format,
output_file_path,
} = &config;

let mut bs = self
.client
.api_v3_query_sql(self.db.as_str(), SYS_TABLES_QUERY)
.format(config.output_format.into())
.format(output_format.clone().into())
.send()
.await?;

println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
write_to_std_out_or_file(output_file_path, output_format, &mut bs).await?;

Ok(())
}
Expand All @@ -124,7 +145,7 @@ pub struct TableConfig {
limit: u16,

/// Order by the specified fields.
#[clap(long = "order-by", short = 'o', num_args = 1, value_delimiter = ',')]
#[clap(long = "order-by", num_args = 1, value_delimiter = ',')]
order_by: Vec<String>,

/// Select specified fields from table.
Expand All @@ -134,6 +155,10 @@ pub struct TableConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the table output into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

impl SystemCommandRunner {
Expand All @@ -157,6 +182,7 @@ impl SystemCommandRunner {
select,
order_by,
output_format,
output_file_path,
} = &config;

let select_expr = if !select.is_empty() {
Expand Down Expand Up @@ -185,7 +211,7 @@ impl SystemCommandRunner {

let query = clauses.join("\n");

let bs = match client
let mut bs = match client
.api_v3_query_sql(db, query)
.format(output_format.clone().into())
.send()
Expand All @@ -204,13 +230,33 @@ impl SystemCommandRunner {
return Err(e.into());
}
};

println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());

write_to_std_out_or_file(output_file_path, output_format, &mut bs).await?;
Ok(())
}
}

async fn write_to_std_out_or_file(
output_file_path: &Option<String>,
output_format: &Format,
bytes: &mut Bytes,
) -> Result<()> {
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(bytes).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", String::from_utf8(bytes.as_ref().to_vec()).unwrap());
}
Ok(())
}

#[derive(Debug, Parser)]
pub struct SummaryConfig {
/// The maximum number of entries from each table to display in the output. Default is 10 and 0
Expand All @@ -221,25 +267,44 @@ pub struct SummaryConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the summary into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

impl SystemCommandRunner {
async fn summary(&self, config: SummaryConfig) -> Result<()> {
self.summarize_all_tables(config.limit, &config.output_format)
.await?;
self.summarize_all_tables(
config.limit,
&config.output_format,
&config.output_file_path,
)
.await?;
Ok(())
}

async fn summarize_all_tables(&self, limit: u16, format: &Format) -> Result<()> {
async fn summarize_all_tables(
&self,
limit: u16,
format: &Format,
output_file_path: &Option<String>,
) -> Result<()> {
let system_tables = self.get_system_tables().await?;
for table in system_tables {
self.summarize_table(table.table_name.as_str(), limit, format)
self.summarize_table(table.table_name.as_str(), limit, format, output_file_path)
.await?;
}
Ok(())
}

async fn summarize_table(&self, table_name: &str, limit: u16, format: &Format) -> Result<()> {
async fn summarize_table(
&self,
table_name: &str,
limit: u16,
format: &Format,
output_file_path: &Option<String>,
) -> Result<()> {
let Self { db, client } = self;
let mut clauses = vec![format!("SELECT * FROM system.{table_name}")];

Expand All @@ -257,14 +322,29 @@ impl SystemCommandRunner {

let query = clauses.join("\n");

let bs = client
let mut bs = client
.api_v3_query_sql(db, query)
.format(format.clone().into())
.send()
.await?;

println!("{table_name} summary:");
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(path)
.await?;
f.write_all_buf(&mut bs).await?;
} else {
if format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}

println!("{table_name} summary:");
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
}

Ok(())
}
}
Expand Down
Loading