Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/common/base/src/runtime/metrics/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub static BUCKET_MILLISECONDS: [f64; 15] = [
300000.0, 600000.0, 1800000.0,
];

pub static BUCKET_ROWS: [f64; 12] = [
1.0, 10.0, 100.0, 1000.0, 10000.0, 50000.0, 100000.0, 500000.0, 1000000.0, 5000000.0,
10000000.0, 50000000.0,
];

/// Histogram is a port of prometheus-client's Histogram. The only difference is that
/// we can reset the histogram.
#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions src/common/base/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use registry::register_gauge_family;
pub use registry::register_histogram;
pub use registry::register_histogram_family;
pub use registry::register_histogram_family_in_milliseconds;
pub use registry::register_histogram_family_in_rows;
pub use registry::register_histogram_family_in_seconds;
pub use registry::register_histogram_in_milliseconds;
pub use registry::register_histogram_in_seconds;
Expand Down
6 changes: 6 additions & 0 deletions src/common/base/src/runtime/metrics/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::runtime::metrics::family_metrics::FamilyHistogram as InnerFamilyHisto
use crate::runtime::metrics::gauge::Gauge;
use crate::runtime::metrics::histogram::Histogram;
use crate::runtime::metrics::histogram::BUCKET_MILLISECONDS;
use crate::runtime::metrics::histogram::BUCKET_ROWS;
use crate::runtime::metrics::histogram::BUCKET_SECONDS;
use crate::runtime::metrics::process_collector::ProcessCollector;
use crate::runtime::metrics::sample::MetricSample;
Expand Down Expand Up @@ -309,6 +310,11 @@ where T: FamilyLabels {
register_histogram_family(name, BUCKET_MILLISECONDS.iter().copied())
}

pub fn register_histogram_family_in_rows<T>(name: &str) -> FamilyHistogram<T>
where T: FamilyLabels {
register_histogram_family(name, BUCKET_ROWS.iter().copied())
}

pub type FamilyGauge<T> = Family<T, InnerFamilyGauge<T>>;
pub type FamilyCounter<T> = Family<T, InnerFamilyCounter<T>>;
pub type FamilyHistogram<T> = Family<T, InnerFamilyHistogram<T>>;
Expand Down
18 changes: 15 additions & 3 deletions src/common/metrics/src/metrics/external_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::sync::LazyLock;
use std::time::Duration;

use databend_common_base::runtime::metrics::register_counter_family;
use databend_common_base::runtime::metrics::register_histogram_family_in_seconds;
use databend_common_base::runtime::metrics::register_histogram_family_in_milliseconds;
use databend_common_base::runtime::metrics::register_histogram_family_in_rows;
use databend_common_base::runtime::metrics::FamilyCounter;
use databend_common_base::runtime::metrics::FamilyHistogram;

Expand All @@ -28,12 +29,13 @@ const METRIC_RETRY: &str = "external_retry";
const METRIC_ERROR: &str = "external_error";
const METRIC_RUNNING_REQUESTS: &str = "external_running_requests";
const METRIC_REQUESTS: &str = "external_requests";
const METRIC_EXTERNAL_BLOCK_ROWS: &str = "external_block_rows";

static REQUEST_EXTERNAL_DURATION: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_seconds(METRIC_REQUEST_EXTERNAL_DURATION));
LazyLock::new(|| register_histogram_family_in_milliseconds(METRIC_REQUEST_EXTERNAL_DURATION));

static CONNECT_EXTERNAL_DURATION: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_seconds(METRIC_CONNECT_EXTERNAL_DURATION));
LazyLock::new(|| register_histogram_family_in_milliseconds(METRIC_CONNECT_EXTERNAL_DURATION));

static RETRY_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_RETRY));
Expand All @@ -47,6 +49,9 @@ static RUNNING_REQUESTS_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
static REQUESTS_EXTERNAL_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_REQUESTS));

static EXTERNAL_BLOCK_ROWS: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_rows(METRIC_EXTERNAL_BLOCK_ROWS));

const LABEL_FUNCTION_NAME: &str = "function_name";
const LABEL_ERROR_KIND: &str = "error_kind";

Expand All @@ -64,6 +69,13 @@ pub fn record_request_external_duration(function_name: impl Into<String>, durati
.observe(duration.as_millis_f64());
}

pub fn record_request_external_block_rows(function_name: impl Into<String>, rows: usize) {
let labels = &vec![(LABEL_FUNCTION_NAME, function_name.into())];
EXTERNAL_BLOCK_ROWS
.get_or_create(labels)
.observe(rows as f64);
}

pub fn record_retry_external(function_name: impl Into<String>, error_kind: impl Into<String>) {
let labels = &vec![
(LABEL_FUNCTION_NAME, function_name.into()),
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/pipelines/builders/builder_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ impl PipelineBuilder {
))
})
} else {
let endpoints = TransformUdfServer::init_endpoints(self.ctx.clone(), &udf.udf_funcs)?;
self.main_pipeline.try_add_async_transformer(|| {
TransformUdfServer::new(self.ctx.clone(), udf.udf_funcs.clone())
TransformUdfServer::new(self.ctx.clone(), udf.udf_funcs.clone(), endpoints.clone())
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_metrics::external_server::record_connect_external_duration;
use databend_common_metrics::external_server::record_error_external;
use databend_common_metrics::external_server::record_request_external_block_rows;
use databend_common_metrics::external_server::record_request_external_duration;
use databend_common_metrics::external_server::record_retry_external;
use databend_common_metrics::external_server::record_running_requests_external_finish;
Expand All @@ -60,15 +61,13 @@ pub struct TransformUdfServer {
}

impl TransformUdfServer {
pub fn new(ctx: Arc<QueryContext>, funcs: Vec<UdfFunctionDesc>) -> Result<Self> {
pub fn init_endpoints(
ctx: Arc<QueryContext>,
funcs: &[UdfFunctionDesc],
) -> Result<BTreeMap<String, Arc<Endpoint>>> {
let settings = ctx.get_settings();
let connect_timeout = settings.get_external_server_connect_timeout_secs()?;
let request_timeout = settings.get_external_server_request_timeout_secs()?;
let request_batch_rows = settings.get_external_server_request_batch_rows()? as usize;
let request_max_threads = settings.get_external_server_request_max_threads()? as usize;
let retry_times = settings.get_external_server_request_retry_times()? as usize;
let semaphore = Arc::new(Semaphore::new(request_max_threads));

let mut endpoints: BTreeMap<String, Arc<Endpoint>> = BTreeMap::new();
for func in funcs.iter() {
let server_addr = func.udf_type.as_server().unwrap();
Expand All @@ -79,6 +78,20 @@ impl TransformUdfServer {
UDFFlightClient::build_endpoint(server_addr, connect_timeout, request_timeout)?;
endpoints.insert(server_addr.clone(), endpoint);
}
Ok(endpoints)
}

pub fn new(
ctx: Arc<QueryContext>,
funcs: Vec<UdfFunctionDesc>,
endpoints: BTreeMap<String, Arc<Endpoint>>,
) -> Result<Self> {
let settings = ctx.get_settings();
let connect_timeout = settings.get_external_server_connect_timeout_secs()?;
let request_batch_rows = settings.get_external_server_request_batch_rows()? as usize;
let request_max_threads = settings.get_external_server_request_max_threads()? as usize;
let retry_times = settings.get_external_server_request_retry_times()? as usize;
let semaphore = Arc::new(Semaphore::new(request_max_threads));

Ok(Self {
ctx,
Expand Down Expand Up @@ -222,6 +235,7 @@ impl AsyncTransform for TransformUdfServer {
.map(|start| data_block.slice(start..start + batch_rows.min(rows - start)))
.collect();
for func in self.funcs.iter() {
record_request_external_block_rows(func.func_name.clone(), rows);
let server_addr = func.udf_type.as_server().unwrap();
let endpoint = self.endpoints.get(server_addr).unwrap();
let tasks: Vec<_> = batch_blocks
Expand Down
Loading