diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index ad268f081a8f4..61f88e83d1799 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -40,6 +40,7 @@ pub use net::get_free_udp_port; pub use ordered_float::OrderedFloat; pub use profiling::Profiling; pub use progress::Progress; +pub use progress::ProgressHook; pub use progress::ProgressValues; pub use progress::SpillProgress; pub use select::select3; diff --git a/src/common/base/src/base/progress.rs b/src/common/base/src/base/progress.rs index 0b444164fe690..d03d903d16637 100644 --- a/src/common/base/src/base/progress.rs +++ b/src/common/base/src/base/progress.rs @@ -24,10 +24,17 @@ pub struct ProgressValues { pub bytes: usize, } +/// [`ProgressHook`] can be used to hook the progress to update the global metrics +/// whenever the progress is updated. +pub trait ProgressHook: std::fmt::Debug + Send + Sync { + fn incr(&self, progress_values: &ProgressValues); +} + #[derive(Debug)] pub struct Progress { rows: AtomicUsize, bytes: AtomicUsize, + hook: Option>, } impl Progress { @@ -35,6 +42,15 @@ impl Progress { Self { rows: AtomicUsize::new(0), bytes: AtomicUsize::new(0), + hook: None, + } + } + + pub fn with_hook(self, hook: Box) -> Self { + Self { + rows: self.rows, + bytes: self.bytes, + hook: Some(hook), } } @@ -42,6 +58,9 @@ impl Progress { self.rows.fetch_add(progress_values.rows, Ordering::Relaxed); self.bytes .fetch_add(progress_values.bytes, Ordering::Relaxed); + if let Some(hook) = &self.hook { + hook.incr(progress_values); + } } pub fn set(&self, progress_values: &ProgressValues) { diff --git a/src/common/base/src/runtime/metrics/mod.rs b/src/common/base/src/runtime/metrics/mod.rs index cc1e5e3a36708..8d729224176c2 100644 --- a/src/common/base/src/runtime/metrics/mod.rs +++ b/src/common/base/src/runtime/metrics/mod.rs @@ -22,6 +22,9 @@ mod registry; mod sample; pub use counter::Counter; +pub use family_metrics::FamilyCounter as InnerFamilyCounter; +pub use family_metrics::FamilyGauge as InnerFamilyGauge; +pub use family_metrics::FamilyHistogram as InnerFamilyHistogram; pub use gauge::Gauge; pub use histogram::Histogram; pub use histogram::BUCKET_MILLISECONDS; diff --git a/src/common/metrics/src/metrics/interpreter.rs b/src/common/metrics/src/metrics/interpreter.rs index e9c1c2e303205..aa809f928e142 100644 --- a/src/common/metrics/src/metrics/interpreter.rs +++ b/src/common/metrics/src/metrics/interpreter.rs @@ -38,6 +38,10 @@ const METRIC_QUERY_SCAN_PARTITIONS: &str = "query_scan_partitions"; const METRIC_QUERY_TOTAL_PARTITIONS: &str = "query_total_partitions"; const METRIC_QUERY_RESULT_ROWS: &str = "query_result_rows"; const METRIC_QUERY_RESULT_BYTES: &str = "query_result_bytes"; +const METRIC_QUERY_PROGRESS_SCAN_ROWS: &str = "query_progress_scan_rows"; +const METRIC_QUERY_PROGRESS_SCAN_BYTES: &str = "query_progress_scan_bytes"; +const METRIC_QUERY_PROGRESS_WRITE_ROWS: &str = "query_progress_write_rows"; +const METRIC_QUERY_PROGRESS_WRITE_BYTES: &str = "query_progress_write_bytes"; pub static QUERY_START: LazyLock> = LazyLock::new(|| register_counter_family(METRIC_QUERY_START)); @@ -73,3 +77,12 @@ pub static QUERY_RESULT_ROWS: LazyLock> = LazyLock::new(|| register_counter_family(METRIC_QUERY_RESULT_ROWS)); pub static QUERY_RESULT_BYTES: LazyLock> = LazyLock::new(|| register_counter_family(METRIC_QUERY_RESULT_BYTES)); + +pub static QUERY_PROGRESS_SCAN_ROWS: LazyLock> = + LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_SCAN_ROWS)); +pub static QUERY_PROGRESS_SCAN_BYTES: LazyLock> = + LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_SCAN_BYTES)); +pub static QUERY_PROGRESS_WRITE_ROWS: LazyLock> = + LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_WRITE_ROWS)); +pub static QUERY_PROGRESS_WRITE_BYTES: LazyLock> = + LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_WRITE_BYTES)); diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index c31fe083316b2..563ea53bad544 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -24,8 +24,11 @@ use std::time::SystemTime; use dashmap::DashMap; use databend_common_base::base::short_sql; use databend_common_base::base::Progress; +use databend_common_base::base::ProgressHook; +use databend_common_base::base::ProgressValues; use databend_common_base::base::SpillProgress; use databend_common_base::runtime::drop_guard; +use databend_common_base::runtime::metrics::InnerFamilyCounter; use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::catalog::CatalogManager; @@ -154,6 +157,10 @@ impl QueryContextShared { session: Arc, cluster_cache: Arc, ) -> Result> { + let progress_metrics = QueryProgressMetrics::new( + session.get_current_tenant().tenant_name(), + &cluster_cache.local_id, + ); Ok(Arc::new(QueryContextShared { query_settings: Settings::create(session.get_current_tenant()), catalog_manager: CatalogManager::instance(), @@ -162,9 +169,13 @@ impl QueryContextShared { data_operator: DataOperator::instance(), init_query_id: Arc::new(RwLock::new(Uuid::new_v4().to_string())), total_scan_values: Arc::new(Progress::create()), - scan_progress: Arc::new(Progress::create()), + scan_progress: Arc::new( + Progress::create().with_hook(progress_metrics.scan_progress_hook()), + ), result_progress: Arc::new(Progress::create()), - write_progress: Arc::new(Progress::create()), + write_progress: Arc::new( + Progress::create().with_hook(progress_metrics.write_progress_hook()), + ), error: Arc::new(Mutex::new(None)), warnings: Arc::new(Mutex::new(vec![])), runtime: Arc::new(RwLock::new(None)), @@ -661,3 +672,74 @@ impl Drop for QueryContextShared { }) } } + +struct QueryProgressMetrics { + scan_rows: Arc>>, + scan_bytes: Arc>>, + write_rows: Arc>>, + write_bytes: Arc>>, +} + +impl QueryProgressMetrics { + fn new(tenant: &str, cluster: &str) -> Self { + let common_labels = vec![ + ("tenant", tenant.to_string()), + ("cluster", cluster.to_string()), + ]; + + let scan_rows = databend_common_metrics::interpreter::QUERY_PROGRESS_SCAN_ROWS + .get_or_create(&common_labels); + let scan_bytes = databend_common_metrics::interpreter::QUERY_PROGRESS_SCAN_BYTES + .get_or_create(&common_labels); + let write_rows = databend_common_metrics::interpreter::QUERY_PROGRESS_WRITE_ROWS + .get_or_create(&common_labels); + let write_bytes = databend_common_metrics::interpreter::QUERY_PROGRESS_WRITE_BYTES + .get_or_create(&common_labels); + + Self { + scan_rows, + scan_bytes, + write_rows, + write_bytes, + } + } + + fn scan_progress_hook(&self) -> Box { + Box::new(QueryProgressMetricsHook::new( + self.scan_rows.clone(), + self.scan_bytes.clone(), + )) + } + + fn write_progress_hook(&self) -> Box { + Box::new(QueryProgressMetricsHook::new( + self.write_rows.clone(), + self.write_bytes.clone(), + )) + } +} + +#[derive(Debug)] +pub struct QueryProgressMetricsHook { + rows_metrics: Arc>>, + bytes_metrics: Arc>>, +} + +impl QueryProgressMetricsHook { + pub fn new( + rows_metrics: Arc>>, + bytes_metrics: Arc>>, + ) -> Self { + Self { + rows_metrics, + bytes_metrics, + } + } +} + +impl ProgressHook for QueryProgressMetricsHook { + fn incr(&self, progress_values: &ProgressValues) { + self.rows_metrics.inc_by(progress_values.rows as u64); + self.bytes_metrics.inc_by(progress_values.bytes as u64); + } +}