Skip to content

Commit ff78344

Browse files
authored
feat(query): Add external_block_rows metrics (#17116)
* feat(query): Add `external_block_rows` metrics * fix * add init_semaphore * add rows metrics
1 parent 89aa80e commit ff78344

File tree

6 files changed

+60
-9
lines changed

6 files changed

+60
-9
lines changed

src/common/base/src/runtime/metrics/histogram.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ pub static BUCKET_MILLISECONDS: [f64; 15] = [
3939
300000.0, 600000.0, 1800000.0,
4040
];
4141

42+
pub static BUCKET_ROWS: [f64; 14] = [
43+
1.0, 10.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0, 100000.0, 500000.0, 1000000.0,
44+
5000000.0, 10000000.0, 50000000.0,
45+
];
46+
4247
/// Histogram is a port of prometheus-client's Histogram. The only difference is that
4348
/// we can reset the histogram.
4449
#[derive(Debug)]

src/common/base/src/runtime/metrics/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub use registry::register_gauge_family;
3434
pub use registry::register_histogram;
3535
pub use registry::register_histogram_family;
3636
pub use registry::register_histogram_family_in_milliseconds;
37+
pub use registry::register_histogram_family_in_rows;
3738
pub use registry::register_histogram_family_in_seconds;
3839
pub use registry::register_histogram_in_milliseconds;
3940
pub use registry::register_histogram_in_seconds;

src/common/base/src/runtime/metrics/registry.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::runtime::metrics::family_metrics::FamilyHistogram as InnerFamilyHisto
3838
use crate::runtime::metrics::gauge::Gauge;
3939
use crate::runtime::metrics::histogram::Histogram;
4040
use crate::runtime::metrics::histogram::BUCKET_MILLISECONDS;
41+
use crate::runtime::metrics::histogram::BUCKET_ROWS;
4142
use crate::runtime::metrics::histogram::BUCKET_SECONDS;
4243
use crate::runtime::metrics::process_collector::ProcessCollector;
4344
use crate::runtime::metrics::sample::MetricSample;
@@ -309,6 +310,11 @@ where T: FamilyLabels {
309310
register_histogram_family(name, BUCKET_MILLISECONDS.iter().copied())
310311
}
311312

313+
pub fn register_histogram_family_in_rows<T>(name: &str) -> FamilyHistogram<T>
314+
where T: FamilyLabels {
315+
register_histogram_family(name, BUCKET_ROWS.iter().copied())
316+
}
317+
312318
pub type FamilyGauge<T> = Family<T, InnerFamilyGauge<T>>;
313319
pub type FamilyCounter<T> = Family<T, InnerFamilyCounter<T>>;
314320
pub type FamilyHistogram<T> = Family<T, InnerFamilyHistogram<T>>;

src/common/metrics/src/metrics/external_server.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use std::sync::LazyLock;
1616
use std::time::Duration;
1717

1818
use databend_common_base::runtime::metrics::register_counter_family;
19-
use databend_common_base::runtime::metrics::register_histogram_family_in_seconds;
19+
use databend_common_base::runtime::metrics::register_histogram_family_in_milliseconds;
20+
use databend_common_base::runtime::metrics::register_histogram_family_in_rows;
2021
use databend_common_base::runtime::metrics::FamilyCounter;
2122
use databend_common_base::runtime::metrics::FamilyHistogram;
2223

@@ -28,12 +29,13 @@ const METRIC_RETRY: &str = "external_retry";
2829
const METRIC_ERROR: &str = "external_error";
2930
const METRIC_RUNNING_REQUESTS: &str = "external_running_requests";
3031
const METRIC_REQUESTS: &str = "external_requests";
32+
const METRIC_EXTERNAL_BLOCK_ROWS: &str = "external_block_rows";
3133

3234
static REQUEST_EXTERNAL_DURATION: LazyLock<FamilyHistogram<VecLabels>> =
33-
LazyLock::new(|| register_histogram_family_in_seconds(METRIC_REQUEST_EXTERNAL_DURATION));
35+
LazyLock::new(|| register_histogram_family_in_milliseconds(METRIC_REQUEST_EXTERNAL_DURATION));
3436

3537
static CONNECT_EXTERNAL_DURATION: LazyLock<FamilyHistogram<VecLabels>> =
36-
LazyLock::new(|| register_histogram_family_in_seconds(METRIC_CONNECT_EXTERNAL_DURATION));
38+
LazyLock::new(|| register_histogram_family_in_milliseconds(METRIC_CONNECT_EXTERNAL_DURATION));
3739

3840
static RETRY_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
3941
LazyLock::new(|| register_counter_family(METRIC_RETRY));
@@ -47,6 +49,9 @@ static RUNNING_REQUESTS_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
4749
static REQUESTS_EXTERNAL_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
4850
LazyLock::new(|| register_counter_family(METRIC_REQUESTS));
4951

52+
static EXTERNAL_BLOCK_ROWS: LazyLock<FamilyHistogram<VecLabels>> =
53+
LazyLock::new(|| register_histogram_family_in_rows(METRIC_EXTERNAL_BLOCK_ROWS));
54+
5055
const LABEL_FUNCTION_NAME: &str = "function_name";
5156
const LABEL_ERROR_KIND: &str = "error_kind";
5257

@@ -64,6 +69,13 @@ pub fn record_request_external_duration(function_name: impl Into<String>, durati
6469
.observe(duration.as_millis_f64());
6570
}
6671

72+
pub fn record_request_external_block_rows(function_name: impl Into<String>, rows: usize) {
73+
let labels = &vec![(LABEL_FUNCTION_NAME, function_name.into())];
74+
EXTERNAL_BLOCK_ROWS
75+
.get_or_create(labels)
76+
.observe(rows as f64);
77+
}
78+
6779
pub fn record_retry_external(function_name: impl Into<String>, error_kind: impl Into<String>) {
6880
let labels = &vec![
6981
(LABEL_FUNCTION_NAME, function_name.into()),

src/query/service/src/pipelines/builders/builder_udf.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,15 @@ impl PipelineBuilder {
4141
))
4242
})
4343
} else {
44+
let semaphore = TransformUdfServer::init_semaphore(self.ctx.clone())?;
45+
let endpoints = TransformUdfServer::init_endpoints(self.ctx.clone(), &udf.udf_funcs)?;
4446
self.main_pipeline.try_add_async_transformer(|| {
45-
TransformUdfServer::new(self.ctx.clone(), udf.udf_funcs.clone())
47+
TransformUdfServer::new(
48+
self.ctx.clone(),
49+
udf.udf_funcs.clone(),
50+
semaphore.clone(),
51+
endpoints.clone(),
52+
)
4653
})
4754
}
4855
}

src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use databend_common_expression::DataField;
3434
use databend_common_expression::DataSchema;
3535
use databend_common_metrics::external_server::record_connect_external_duration;
3636
use databend_common_metrics::external_server::record_error_external;
37+
use databend_common_metrics::external_server::record_request_external_block_rows;
3738
use databend_common_metrics::external_server::record_request_external_duration;
3839
use databend_common_metrics::external_server::record_retry_external;
3940
use databend_common_metrics::external_server::record_running_requests_external_finish;
@@ -60,15 +61,20 @@ pub struct TransformUdfServer {
6061
}
6162

6263
impl TransformUdfServer {
63-
pub fn new(ctx: Arc<QueryContext>, funcs: Vec<UdfFunctionDesc>) -> Result<Self> {
64+
pub fn init_semaphore(ctx: Arc<QueryContext>) -> Result<Arc<Semaphore>> {
6465
let settings = ctx.get_settings();
65-
let connect_timeout = settings.get_external_server_connect_timeout_secs()?;
66-
let request_timeout = settings.get_external_server_request_timeout_secs()?;
67-
let request_batch_rows = settings.get_external_server_request_batch_rows()? as usize;
6866
let request_max_threads = settings.get_external_server_request_max_threads()? as usize;
69-
let retry_times = settings.get_external_server_request_retry_times()? as usize;
7067
let semaphore = Arc::new(Semaphore::new(request_max_threads));
68+
Ok(semaphore)
69+
}
7170

71+
pub fn init_endpoints(
72+
ctx: Arc<QueryContext>,
73+
funcs: &[UdfFunctionDesc],
74+
) -> Result<BTreeMap<String, Arc<Endpoint>>> {
75+
let settings = ctx.get_settings();
76+
let connect_timeout = settings.get_external_server_connect_timeout_secs()?;
77+
let request_timeout = settings.get_external_server_request_timeout_secs()?;
7278
let mut endpoints: BTreeMap<String, Arc<Endpoint>> = BTreeMap::new();
7379
for func in funcs.iter() {
7480
let server_addr = func.udf_type.as_server().unwrap();
@@ -79,6 +85,19 @@ impl TransformUdfServer {
7985
UDFFlightClient::build_endpoint(server_addr, connect_timeout, request_timeout)?;
8086
endpoints.insert(server_addr.clone(), endpoint);
8187
}
88+
Ok(endpoints)
89+
}
90+
91+
pub fn new(
92+
ctx: Arc<QueryContext>,
93+
funcs: Vec<UdfFunctionDesc>,
94+
semaphore: Arc<Semaphore>,
95+
endpoints: BTreeMap<String, Arc<Endpoint>>,
96+
) -> Result<Self> {
97+
let settings = ctx.get_settings();
98+
let connect_timeout = settings.get_external_server_connect_timeout_secs()?;
99+
let request_batch_rows = settings.get_external_server_request_batch_rows()? as usize;
100+
let retry_times = settings.get_external_server_request_retry_times()? as usize;
82101

83102
Ok(Self {
84103
ctx,
@@ -222,6 +241,7 @@ impl AsyncTransform for TransformUdfServer {
222241
.map(|start| data_block.slice(start..start + batch_rows.min(rows - start)))
223242
.collect();
224243
for func in self.funcs.iter() {
244+
record_request_external_block_rows(func.func_name.clone(), rows);
225245
let server_addr = func.udf_type.as_server().unwrap();
226246
let endpoint = self.endpoints.get(server_addr).unwrap();
227247
let tasks: Vec<_> = batch_blocks

0 commit comments

Comments
 (0)