Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/common/base/src/runtime/workload_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub const MEMORY_QUOTA_KEY: &str = "memory_quota";
pub const QUERY_TIMEOUT_QUOTA_KEY: &str = "query_timeout";
pub const MAX_CONCURRENCY_QUOTA_KEY: &str = "max_concurrency";
pub const QUERY_QUEUED_TIMEOUT_QUOTA_KEY: &str = "query_queued_timeout";
pub const MAX_MEMORY_USAGE_RATIO: &str = "max_memory_usage_ratio";

pub const DEFAULT_MAX_MEMORY_USAGE_RATIO: usize = 25;

#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq, Debug)]
pub enum QuotaValue {
Expand Down Expand Up @@ -57,6 +60,18 @@ impl WorkloadGroup {
pub fn get_quota(&self, key: &'static str) -> Option<QuotaValue> {
self.quotas.get(key).cloned()
}

pub fn get_max_memory_usage_ratio(&self) -> usize {
let Some(QuotaValue::Percentage(v)) = self.quotas.get(MAX_MEMORY_USAGE_RATIO) else {
return DEFAULT_MAX_MEMORY_USAGE_RATIO;
};

if *v == 0 {
return DEFAULT_MAX_MEMORY_USAGE_RATIO;
}

std::cmp::min(*v, 100)
}
}

pub struct WorkloadGroupResource {
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/ast/statements/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl QuotaValueStmt {
"query_timeout" => Self::parse_human_timeout(&v).ok_or("Invalid query timeout value, expected duration (e.g. '30s', '5min', '1h')"),
"max_concurrency" => Self::parse_number(&v).filter(|x| !matches!(x, QuotaValueStmt::Number(0))).ok_or("Invalid max concurrency value, expected positive integer"),
"query_queued_timeout" => Self::parse_human_timeout(&v).ok_or("Invalid queued query timeout value, expected duration (e.g. '30s', '5min', '1h')"),
"max_memory_usage_ratio" => Self::parse_percentage(&v).ok_or("Invalid max_memory_usage_ratio value, expected percentage (e.g. '50%') between 0-100"),
_ => Err("Unknown quota key"),
}
}
Expand Down
51 changes: 30 additions & 21 deletions src/query/management/src/workload/workload_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,8 @@ impl WorkloadGroupResourceManagerInner {
(None, None) => {
return Ok(workload_resource);
}
(None, Some(QuotaValue::Bytes(v))) => {
workload_resource
.max_memory_usage
.store(*v, Ordering::Relaxed);
(None, Some(QuotaValue::Bytes(_v))) => {
self.update_mem_usage(&online_workload_group);
return Ok(workload_resource);
}
(None, Some(QuotaValue::Percentage(v))) => {
Expand Down Expand Up @@ -218,10 +216,8 @@ impl WorkloadGroupResourceManagerInner {
self.percent_normalizer.update(*v);
self.update_mem_usage(&online_workload_group);
}
(Some(QuotaValue::Bytes(_old)), Some(QuotaValue::Bytes(new))) => {
workload_resource
.max_memory_usage
.store(*new, Ordering::Relaxed);
(Some(QuotaValue::Bytes(_old)), Some(QuotaValue::Bytes(_new))) => {
self.update_mem_usage(&online_workload_group);
return Ok(workload_resource);
}
_ => {}
Expand All @@ -239,16 +235,29 @@ impl WorkloadGroupResourceManagerInner {
{
if let Some(v) = self.percent_normalizer.get_normalized(*v) {
let limit = self.global_mem_stat.get_limit();
let usage_ratio = workload_group.meta.get_max_memory_usage_ratio();
if limit > 0 {
workload_group
.max_memory_usage
.store(limit as usize / 100 * v, Ordering::Relaxed);
workload_group.max_memory_usage.store(
limit as usize / 100 * usage_ratio / 100 * v,
Ordering::Relaxed,
);
}
}
} else if let Some(QuotaValue::Bytes(v)) =
workload_group.meta.quotas.get(MEMORY_QUOTA_KEY)
{
workload_group.max_memory_usage.store(*v, Ordering::Relaxed);
let limit = self.global_mem_stat.get_limit();
let usage_ratio = workload_group.meta.get_max_memory_usage_ratio();

let mut memory_usage = *v;
if limit > 0 {
let max_memory_usage = limit as usize / 100 * usage_ratio;
memory_usage = std::cmp::min(max_memory_usage, memory_usage);
}

workload_group
.max_memory_usage
.store(memory_usage, Ordering::Relaxed);
} else {
workload_group.max_memory_usage.store(0, Ordering::Relaxed)
}
Expand Down Expand Up @@ -386,7 +395,7 @@ mod tests {
// Check memory usage was calculated (100% since it's the only workload)
assert_eq!(
workload1.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * 100) as usize
(LIMIT / 100 * 25 / 100 * 100) as usize
);
}

Expand Down Expand Up @@ -423,11 +432,11 @@ mod tests {
// Check memory allocations are calculated correctly
assert_eq!(
resource1.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * 30) as usize
(LIMIT / 100 * 25 / 100 * 30) as usize
); // 30% of total 100
assert_eq!(
resource2.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * 70) as usize
(LIMIT / 100 * 25 / 100 * 70) as usize
); // 70% of total 100

// Drop first workload
Expand All @@ -438,7 +447,7 @@ mod tests {
assert_eq!(inner.percent_normalizer.sum.load(Ordering::Relaxed), 70);
assert_eq!(
resource2.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * 100) as usize
(LIMIT / 100 * 25 / 100 * 100) as usize
); // Now 100% of remaining 70

Ok(())
Expand All @@ -464,11 +473,11 @@ mod tests {

assert_eq!(
resource1.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * 50) as usize
(LIMIT / 100 * 25 / 100 * 50) as usize
);
assert_eq!(
resource2.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * 50) as usize
(LIMIT / 100 * 25 / 100 * 50) as usize
);

workload_mgr
Expand All @@ -493,15 +502,15 @@ mod tests {
// Memory usage should be recalculated
assert_eq!(
resource1.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * (70 * 100 / (70 + 50))) as usize
(LIMIT / 100 * 25 / 100 * (70 * 100 / (70 + 50))) as usize
);
assert_eq!(
resource2.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * (50 * 100 / (70 + 50))) as usize
(LIMIT / 100 * 25 / 100 * (50 * 100 / (70 + 50))) as usize
);
assert_eq!(
updated_resource.max_memory_usage.load(Ordering::Relaxed),
(LIMIT / 100 * (70 * 100 / (70 + 50))) as usize
(LIMIT / 100 * 25 / 100 * (70 * 100 / (70 + 50))) as usize
);

Ok(())
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ pub(crate) async fn query_handler(
}
};

log::info!(
"[Workload-Group] attach workload group {}({}) for query {}, quotas: {:?}",
workload_group.meta.name,
workload_group.meta.id,
ctx.query_id,
workload_group.meta.quotas
);

parent_mem_stat = ParentMemStat::Normal(workload_group.mem_stat.clone());
tracking_workload_group = Some(workload_group);
}
Expand Down
Loading