Skip to content

Commit ac03117

Browse files
authored
fix: deadlock in table lock (#16632)
* fix * fix * update
1 parent 894656f commit ac03117

File tree

10 files changed

+217
-165
lines changed

10 files changed

+217
-165
lines changed

src/query/service/src/interpreters/interpreter.rs

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ use std::collections::BTreeMap;
1616
use std::sync::Arc;
1717
use std::time::SystemTime;
1818

19+
use databend_common_ast::ast::AlterTableAction;
20+
use databend_common_ast::ast::AlterTableStmt;
1921
use databend_common_ast::ast::Literal;
22+
use databend_common_ast::ast::ModifyColumnAction;
23+
use databend_common_ast::ast::OptimizeTableAction;
24+
use databend_common_ast::ast::OptimizeTableStmt;
2025
use databend_common_ast::ast::Statement;
2126
use databend_common_base::base::short_sql;
2227
use databend_common_base::runtime::profile::get_statistics_desc;
@@ -54,7 +59,10 @@ use crate::pipelines::executor::PipelineCompleteExecutor;
5459
use crate::pipelines::executor::PipelinePullingExecutor;
5560
use crate::pipelines::PipelineBuildResult;
5661
use crate::schedulers::ServiceQueryExecutor;
62+
use crate::sessions::AcquireQueueGuard;
63+
use crate::sessions::QueriesQueueManager;
5764
use crate::sessions::QueryContext;
65+
use crate::sessions::QueryEntry;
5866
use crate::sessions::SessionManager;
5967
use crate::stream::DataBlockStream;
6068
use crate::stream::ProgressStream;
@@ -202,17 +210,18 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
202210
/// 2. Execute the plan -- interpreter
203211
///
204212
/// This function is used to plan the SQL. If an error occurs, we will log the query start and finished.
205-
pub async fn interpreter_plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<(Plan, PlanExtras)> {
206-
let mut planner = Planner::new_with_query_executor(
207-
ctx.clone(),
208-
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
209-
);
210-
let result = planner.plan_sql(sql).await;
213+
pub async fn interpreter_plan_sql(
214+
ctx: Arc<QueryContext>,
215+
sql: &str,
216+
acquire_queue: bool,
217+
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
218+
let result = plan_sql(ctx.clone(), sql, acquire_queue).await;
219+
211220
let short_sql = short_sql(
212221
sql.to_string(),
213222
ctx.get_settings().get_short_sql_max_length()?,
214223
);
215-
let mut stmt = if let Ok((_, extras)) = &result {
224+
let mut stmt = if let Ok((_, extras, _)) = &result {
216225
Some(extras.statement.clone())
217226
} else {
218227
// Only log if there's an error
@@ -227,6 +236,42 @@ pub async fn interpreter_plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<(
227236
result
228237
}
229238

239+
async fn plan_sql(
240+
ctx: Arc<QueryContext>,
241+
sql: &str,
242+
acquire_queue: bool,
243+
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
244+
let mut planner = Planner::new_with_query_executor(
245+
ctx.clone(),
246+
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
247+
);
248+
249+
// Parse the SQL query, get extract additional information.
250+
let extras = planner.parse_sql(sql)?;
251+
if !acquire_queue {
252+
// If queue guard is not required, plan the statement directly.
253+
let plan = planner.plan_stmt(&extras.statement).await?;
254+
return Ok((plan, extras, AcquireQueueGuard::create(None)));
255+
}
256+
257+
let need_acquire_lock = need_acquire_lock(ctx.clone(), &extras.statement);
258+
if need_acquire_lock {
259+
// If a lock is required, acquire the queue guard before
260+
// planning the statement, to avoid potential deadlocks.
261+
// See PR https://github.com/databendlabs/databend/pull/16632
262+
let query_entry = QueryEntry::create_entry(&ctx, &extras, true)?;
263+
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
264+
let plan = planner.plan_stmt(&extras.statement).await?;
265+
Ok((plan, extras, guard))
266+
} else {
267+
// No lock is needed, plan the statement first, then acquire the queue guard.
268+
let plan = planner.plan_stmt(&extras.statement).await?;
269+
let query_entry = QueryEntry::create(&ctx, &plan, &extras)?;
270+
let guard = QueriesQueueManager::instance().acquire(query_entry).await?;
271+
Ok((plan, extras, guard))
272+
}
273+
}
274+
230275
fn attach_query_hash(ctx: &Arc<QueryContext>, stmt: &mut Option<Statement>, sql: &str) {
231276
let (query_hash, query_parameterized_hash) = if let Some(stmt) = stmt {
232277
let query_hash = format!("{:x}", Md5::digest(stmt.to_string()));
@@ -299,3 +344,30 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
299344
Err(error) => Err(error.clone()),
300345
}
301346
}
347+
348+
/// Check if the statement need acquire a table lock.
349+
fn need_acquire_lock(ctx: Arc<QueryContext>, stmt: &Statement) -> bool {
350+
if !ctx.get_settings().get_enable_table_lock().unwrap_or(false) {
351+
return false;
352+
}
353+
354+
match stmt {
355+
Statement::Replace(_)
356+
| Statement::MergeInto(_)
357+
| Statement::Update(_)
358+
| Statement::Delete(_)
359+
| Statement::TruncateTable(_) => true,
360+
Statement::OptimizeTable(OptimizeTableStmt { action, .. }) => matches!(
361+
action,
362+
OptimizeTableAction::All | OptimizeTableAction::Compact { .. }
363+
),
364+
Statement::AlterTable(AlterTableStmt { action, .. }) => matches!(
365+
action,
366+
AlterTableAction::ReclusterTable { .. }
367+
| AlterTableAction::ModifyColumn {
368+
action: ModifyColumnAction::SetDataType(_),
369+
}
370+
),
371+
_ => false,
372+
}
373+
}

src/query/service/src/servers/http/clickhouse_handler.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use databend_common_expression::DataSchemaRef;
2727
use databend_common_formats::ClickhouseFormatType;
2828
use databend_common_formats::FileFormatOptionsExt;
2929
use databend_common_formats::FileFormatTypeExt;
30-
use databend_common_sql::Planner;
3130
use fastrace::func_path;
3231
use fastrace::prelude::*;
3332
use futures::StreamExt;
@@ -56,9 +55,7 @@ use crate::interpreters::InterpreterFactory;
5655
use crate::interpreters::InterpreterPtr;
5756
use crate::servers::http::middleware::sanitize_request_headers;
5857
use crate::servers::http::v1::HttpQueryContext;
59-
use crate::sessions::QueriesQueueManager;
6058
use crate::sessions::QueryContext;
61-
use crate::sessions::QueryEntry;
6259
use crate::sessions::SessionType;
6360
use crate::sessions::TableContext;
6461

@@ -256,16 +253,11 @@ pub async fn clickhouse_handler_get(
256253
let default_format = get_default_format(&params, headers).map_err(BadRequest)?;
257254
let sql = params.query();
258255
// Use interpreter_plan_sql, we can write the query log if an error occurs.
259-
let (plan, extras) = interpreter_plan_sql(context.clone(), &sql)
256+
let (plan, extras, _guard) = interpreter_plan_sql(context.clone(), &sql, true)
260257
.await
261258
.map_err(|err| err.display_with_sql(&sql))
262259
.map_err(BadRequest)?;
263260

264-
let query_entry = QueryEntry::create(&context, &plan, &extras).map_err(BadRequest)?;
265-
let _guard = QueriesQueueManager::instance()
266-
.acquire(query_entry)
267-
.await
268-
.map_err(BadRequest)?;
269261
let format = get_format_with_default(extras.format, default_format)?;
270262
let interpreter = InterpreterFactory::get(context.clone(), &plan)
271263
.await
@@ -345,19 +337,11 @@ pub async fn clickhouse_handler_post(
345337
};
346338
info!("receive clickhouse http post, (query + body) = {}", &msg);
347339

348-
let mut planner = Planner::new(ctx.clone());
349-
let (mut plan, extras) = planner
350-
.plan_sql(&sql)
340+
let (mut plan, extras, _guard) = interpreter_plan_sql(ctx.clone(), &sql, true)
351341
.await
352342
.map_err(|err| err.display_with_sql(&sql))
353343
.map_err(BadRequest)?;
354344

355-
let entry = QueryEntry::create(&ctx, &plan, &extras).map_err(BadRequest)?;
356-
let _guard = QueriesQueueManager::instance()
357-
.acquire(entry)
358-
.await
359-
.map_err(BadRequest)?;
360-
361345
let mut handle = None;
362346
let output_schema = plan.schema();
363347

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,8 @@ use crate::servers::http::v1::http_query_handlers::QueryResponseField;
4545
use crate::servers::http::v1::query::http_query::ResponseState;
4646
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
4747
use crate::sessions::AcquireQueueGuard;
48-
use crate::sessions::QueriesQueueManager;
4948
use crate::sessions::QueryAffect;
5049
use crate::sessions::QueryContext;
51-
use crate::sessions::QueryEntry;
5250
use crate::sessions::Session;
5351
use crate::sessions::TableContext;
5452

@@ -346,32 +344,15 @@ impl ExecuteState {
346344
info!("http query prepare to plan sql");
347345

348346
// Use interpreter_plan_sql, we can write the query log if an error occurs.
349-
let (plan, extras) = interpreter_plan_sql(ctx.clone(), &sql)
347+
let (plan, _, queue_guard) = interpreter_plan_sql(ctx.clone(), &sql, true)
350348
.await
351349
.map_err(|err| err.display_with_sql(&sql))
352350
.with_context(make_error)?;
353-
354-
let query_queue_manager = QueriesQueueManager::instance();
355-
356-
info!(
357-
"http query preparing to acquire from query queue, length: {}",
358-
query_queue_manager.length()
359-
);
360-
361-
let entry = QueryEntry::create(&ctx, &plan, &extras).with_context(make_error)?;
362-
let queue_guard = query_queue_manager
363-
.acquire(entry)
364-
.await
365-
.with_context(make_error)?;
366351
{
367352
// set_var may change settings
368353
let mut guard = format_settings.write();
369354
*guard = Some(ctx.get_format_settings().with_context(make_error)?);
370355
}
371-
info!(
372-
"http query finished acquiring from queue, length: {}",
373-
query_queue_manager.length()
374-
);
375356

376357
let interpreter = InterpreterFactory::get(ctx.clone(), &plan)
377358
.await

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ use crate::servers::mysql::writers::ProgressReporter;
5656
use crate::servers::mysql::writers::QueryResult;
5757
use crate::servers::mysql::MySQLFederated;
5858
use crate::servers::mysql::MYSQL_VERSION;
59-
use crate::sessions::QueriesQueueManager;
6059
use crate::sessions::QueryContext;
61-
use crate::sessions::QueryEntry;
6260
use crate::sessions::Session;
6361
use crate::sessions::TableContext;
6462
use crate::stream::DataBlockStream;
@@ -377,10 +375,7 @@ impl InteractiveWorkerBase {
377375
context.set_id(query_id);
378376

379377
// Use interpreter_plan_sql, we can write the query log if an error occurs.
380-
let (plan, extras) = interpreter_plan_sql(context.clone(), query).await?;
381-
382-
let entry = QueryEntry::create(&context, &plan, &extras)?;
383-
let _guard = QueriesQueueManager::instance().acquire(entry).await?;
378+
let (plan, _, _guard) = interpreter_plan_sql(context.clone(), query, true).await?;
384379

385380
let interpreter = InterpreterFactory::get(context.clone(), &plan).await?;
386381
let has_result_set = plan.has_result_set();

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ use databend_common_metrics::session::incr_session_queue_acquire_error_count;
4040
use databend_common_metrics::session::incr_session_queue_acquire_timeout_count;
4141
use databend_common_metrics::session::record_session_queue_acquire_duration_ms;
4242
use databend_common_metrics::session::set_session_queued_queries;
43+
use databend_common_sql::plans::ModifyColumnAction;
44+
use databend_common_sql::plans::ModifyTableColumnPlan;
4345
use databend_common_sql::plans::Plan;
4446
use databend_common_sql::PlanExtras;
4547
use log::info;
@@ -131,6 +133,11 @@ impl<Data: QueueData> QueueManager<Data> {
131133

132134
pub async fn acquire(self: &Arc<Self>, data: Data) -> Result<AcquireQueueGuard> {
133135
if data.need_acquire_to_queue() {
136+
info!(
137+
"preparing to acquire from query queue, length: {}",
138+
self.length()
139+
);
140+
134141
let timeout = data.timeout();
135142
let future = AcquireQueueFuture::create(
136143
Arc::new(data),
@@ -141,6 +148,8 @@ impl<Data: QueueData> QueueManager<Data> {
141148

142149
return match future.await {
143150
Ok(v) => {
151+
info!("finished acquiring from queue, length: {}", self.length());
152+
144153
inc_session_running_acquired_queries();
145154
record_session_queue_acquire_duration_ms(
146155
start_time.elapsed().unwrap_or_default(),
@@ -316,7 +325,7 @@ pub struct QueryEntry {
316325
}
317326

318327
impl QueryEntry {
319-
fn create_entry(
328+
pub fn create_entry(
320329
ctx: &Arc<QueryContext>,
321330
plan_extras: &PlanExtras,
322331
need_acquire_to_queue: bool,
@@ -402,12 +411,19 @@ impl QueryEntry {
402411
| Plan::VacuumTable(_)
403412
| Plan::VacuumTemporaryFiles(_)
404413
| Plan::RefreshIndex(_)
414+
| Plan::ReclusterTable { .. }
405415
| Plan::TruncateTable(_) => {
406416
return true;
407417
}
408418
Plan::DropTable(v) if v.all => {
409419
return true;
410420
}
421+
Plan::ModifyTableColumn(box ModifyTableColumnPlan {
422+
action: ModifyColumnAction::SetDataType(_),
423+
..
424+
}) => {
425+
return true;
426+
}
411427

412428
// Light actions.
413429
_ => {

src/query/service/src/table_functions/others/suggested_background_tasks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl SuggestedBackgroundTasksSource {
160160
sql: String,
161161
) -> Result<Option<RecordBatch>> {
162162
// Use interpreter_plan_sql, we can write the query log if an error occurs.
163-
let (plan, _) = interpreter_plan_sql(ctx.clone(), sql.as_str()).await?;
163+
let (plan, _, _) = interpreter_plan_sql(ctx.clone(), sql.as_str(), false).await?;
164164

165165
let data_schema = plan.schema();
166166
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;

src/query/service/tests/it/servers/admin/v1/status.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async fn run_query(query_ctx: &Arc<QueryContext>) -> Result<Arc<dyn Interpreter>
6464
.get_current_session()
6565
.set_authed_user(user, None)
6666
.await?;
67-
let (plan, _) = interpreter_plan_sql(query_ctx.clone(), sql).await?;
67+
let (plan, _, _) = interpreter_plan_sql(query_ctx.clone(), sql, false).await?;
6868

6969
InterpreterFactory::get(query_ctx.clone(), &plan).await
7070
}

0 commit comments

Comments
 (0)