Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,18 @@ pub struct Config {
/// smaller time ranges if possible in a query.
#[clap(long = "query-file-limit", env = "INFLUXDB3_QUERY_FILE_LIMIT", action)]
pub query_file_limit: Option<usize>,

/// Max memory for snapshot in bytes, as this determines how many bytes are
/// allowed per parquet file it is set in bytes to allow tests to be ran with
/// few kb or mb
#[clap(
long = "max-memory-for-snapshot-bytes",
env = "INFLUXDB3_MAX_MEMORY_FOR_SNAPSHOT_BYTES",
// 100 MB by default
default_value = "100000000",
action
)]
pub max_memory_for_snapshot_bytes: u64,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -568,6 +580,7 @@ pub async fn command(config: Config) -> Result<()> {
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
query_file_limit: config.query_file_limit,
max_memory_for_snapshot_bytes: config.max_memory_for_snapshot_bytes,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
10 changes: 9 additions & 1 deletion influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use influxdb3_wal::{
use iox_time::Time;
use observability_deps::tracing::{debug, info, warn};
use parking_lot::RwLock;
use schema::{Schema, SchemaBuilder};
use schema::{Schema, SchemaBuilder, sort::SortKey};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::cmp::Ordering;
Expand Down Expand Up @@ -1134,6 +1134,14 @@ impl TableDefinition {
pub fn series_key_names(&self) -> &[Arc<str>] {
&self.series_key_names
}

pub fn sort_key(&self) -> SortKey {
let cols = self
.series_key
.iter()
.map(|c| Arc::clone(&self.column_id_to_name_unchecked(c)));
SortKey::from_columns(cols)
}
}

trait TableUpdate {
Expand Down
1 change: 1 addition & 0 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ mod tests {
metric_registry: Arc::clone(&metric_registry),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ mod tests {
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: 100,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
},
)
.await
Expand Down
33 changes: 17 additions & 16 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 1,
query_file_limit,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -932,9 +933,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| cpu | 2133 | 3 | 0 | 20 |",
"| cpu | 2133 | 3 | 30 | 50 |",
"| cpu | 2133 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -947,9 +948,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"| mem | 2133 | 3 | 0 | 20 |",
"| mem | 2133 | 3 | 30 | 50 |",
"| mem | 2133 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -961,12 +962,12 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"| cpu | 2133 | 3 | 0 | 20 |",
"| cpu | 2133 | 3 | 30 | 50 |",
"| cpu | 2133 | 3 | 60 | 80 |",
"| mem | 2133 | 3 | 0 | 20 |",
"| mem | 2133 | 3 | 30 | 50 |",
"| mem | 2133 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -979,10 +980,10 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 60 | 80 |",
"| cpu | 2133 | 3 | 0 | 20 |",
"| cpu | 2133 | 3 | 30 | 50 |",
"| cpu | 2133 | 3 | 60 | 80 |",
"| mem | 2133 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand Down
18 changes: 17 additions & 1 deletion influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ impl Gen1Duration {
self.0.as_nanos() as i64
}

pub fn as_10m(&self) -> u64 {
let duration_secs = self.0.as_secs();
let ten_min_secs = 600;
if duration_secs >= ten_min_secs {
1
} else {
ten_min_secs / duration_secs
}
}

pub fn new_1m() -> Self {
Self(Duration::from_secs(60))
}
Expand Down Expand Up @@ -239,7 +249,7 @@ impl Default for Gen1Duration {

#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub struct NoopDetails {
timestamp_ns: i64,
pub timestamp_ns: i64,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -902,6 +912,12 @@ pub struct Row {
pub fields: Vec<Field>,
}

impl AsRef<Row> for Row {
fn as_ref(&self) -> &Row {
self
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum FieldData {
Timestamp(i64),
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true

[dependencies]
# Core Crates
arrow_util.workspace = true
data_types.workspace = true
datafusion_util.workspace = true
executor.workspace = true
Expand Down Expand Up @@ -61,6 +62,7 @@ serde_json.workspace = true
serde_with.workspace = true
sha2.workspace = true
snap.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio.workspace = true
url.workspace = true
Expand Down
21 changes: 13 additions & 8 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub struct WriteBufferImplArgs {
pub metric_registry: Arc<Registry>,
pub snapshotted_wal_files_to_keep: u64,
pub query_file_limit: Option<usize>,
pub max_memory_for_snapshot_bytes: u64,
}

impl WriteBufferImpl {
Expand All @@ -191,6 +192,7 @@ impl WriteBufferImpl {
metric_registry,
snapshotted_wal_files_to_keep,
query_file_limit,
max_memory_for_snapshot_bytes,
}: WriteBufferImplArgs,
) -> Result<Arc<Self>> {
// load snapshots and replay the wal into the in memory buffer
Expand Down Expand Up @@ -222,6 +224,7 @@ impl WriteBufferImpl {
distinct_cache_provider: Arc::clone(&distinct_cache),
persisted_files: Arc::clone(&persisted_files),
parquet_cache: parquet_cache.clone(),
max_size_per_parquet_file_bytes: max_memory_for_snapshot_bytes,
}));

// create the wal instance, which will replay into the queryable buffer and start
Expand Down Expand Up @@ -1077,6 +1080,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -1172,6 +1176,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -1245,6 +1250,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap()
Expand Down Expand Up @@ -1492,6 +1498,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -2089,7 +2096,7 @@ mod tests {
);
}

#[tokio::test]
#[test_log::test(tokio::test)]
async fn notifies_watchers_of_snapshot() {
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let (wbuf, _, _) = setup(
Expand Down Expand Up @@ -2759,10 +2766,7 @@ mod tests {
#[test_log::test(tokio::test)]
async fn test_out_of_order_data() {
let tmp_dir = test_helpers::tmp_dir().unwrap();
debug!(
?tmp_dir,
">>> using tmp dir for test_check_mem_and_force_snapshot"
);
debug!(?tmp_dir, ">>> using tmp dir");
let obj_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap());
let (write_buffer, _, _) = setup(
Expand Down Expand Up @@ -2833,6 +2837,9 @@ mod tests {
"| a | us | 1970-01-01T00:00:28Z | 10.0 |",
"| a | us | 1970-01-01T00:00:29Z | 10.0 |",
"| a | us | 1970-01-01T00:00:30Z | 10.0 |",
"| a | us | 1970-01-01T00:00:20Z | 10.0 |",
"| a | us | 1970-01-01T00:00:21Z | 10.0 |",
"| a | us | 1970-01-01T00:00:22Z | 10.0 |",
"| a | us | 1970-01-01T00:01:40Z | 10.0 |",
"| a | us | 1970-01-01T00:01:41Z | 10.0 |",
"| a | us | 1970-01-01T00:01:42Z | 10.0 |",
Expand All @@ -2845,9 +2852,6 @@ mod tests {
"| a | us | 1970-01-01T00:01:49Z | 10.0 |",
"| a | us | 1970-01-01T00:01:50Z | 10.0 |",
"| a | us | 1970-01-01T00:01:51Z | 10.0 |",
"| a | us | 1970-01-01T00:00:20Z | 10.0 |",
"| a | us | 1970-01-01T00:00:21Z | 10.0 |",
"| a | us | 1970-01-01T00:00:22Z | 10.0 |",
"+------+--------+----------------------+-------+",
],
&actual
Expand Down Expand Up @@ -3388,6 +3392,7 @@ mod tests {
metric_registry: Arc::clone(&metric_registry),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down
Loading