Skip to content

Commit 9a8784e

Browse files
authored
chore(query): improve hook vacuum temp after query (#17125)
* chore(query): improve ctx lock * chore(query): improve vacuum * send spill nums via progress in cluster * send spill nums via progress in cluster
1 parent 0c94ab5 commit 9a8784e

File tree

9 files changed

+193
-74
lines changed

9 files changed

+193
-74
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,15 @@ pub async fn do_vacuum_temporary_files(
4545
}
4646

4747
match options {
48-
VacuumTempOptions::QueryHook(nodes_num, query_id) => {
49-
vacuum_query_hook(abort_checker, &temporary_dir, *nodes_num, query_id, limit).await
48+
VacuumTempOptions::QueryHook(nodes, query_id) => {
49+
vacuum_query_hook(
50+
abort_checker,
51+
&temporary_dir,
52+
nodes.as_slice(),
53+
query_id,
54+
limit,
55+
)
56+
.await
5057
}
5158
VacuumTempOptions::VacuumCommand(duration) => {
5259
vacuum_by_duration(abort_checker, &temporary_dir, limit, duration).await
@@ -172,41 +179,53 @@ async fn vacuum_by_duration(
172179
async fn vacuum_query_hook(
173180
abort_checker: AbortChecker,
174181
temporary_dir: &str,
175-
nodes_num: usize,
182+
nodes: &[usize],
176183
query_id: &str,
177184
mut limit: usize,
178185
) -> Result<usize> {
179186
let mut removed_total = 0;
187+
let metas_f = nodes
188+
.iter()
189+
.map(|i| async move {
190+
let operator = DataOperator::instance().operator();
191+
let meta_file_path =
192+
format!("{}/{}_{}{}", temporary_dir, query_id, i, SPILL_META_SUFFIX);
193+
let buffer = operator.read(&meta_file_path).await?;
194+
std::result::Result::<(String, Buffer), opendal::Error>::Ok((meta_file_path, buffer))
195+
})
196+
.collect::<Vec<_>>();
180197

181-
for i in 0..nodes_num {
182-
if limit == 0 {
183-
break;
184-
}
198+
let metas = futures_util::future::join_all(metas_f)
199+
.await
200+
.into_iter()
201+
.filter_map(|x| x.is_ok().then(|| x.unwrap()));
202+
203+
for (meta_file_path, buffer) in metas {
185204
abort_checker.try_check_aborting()?;
186-
let meta_file_path = format!("{}/{}_{}{}", temporary_dir, query_id, i, SPILL_META_SUFFIX);
187-
let removed =
188-
vacuum_by_meta(temporary_dir, &meta_file_path, limit, &mut removed_total).await?;
205+
let removed = vacuum_by_meta_buffer(
206+
&meta_file_path,
207+
temporary_dir,
208+
buffer,
209+
limit,
210+
&mut removed_total,
211+
)
212+
.await?;
189213
limit = limit.saturating_sub(removed);
190214
}
215+
191216
Ok(removed_total)
192217
}
193218

194-
async fn vacuum_by_meta(
195-
temporary_dir: &str,
219+
async fn vacuum_by_meta_buffer(
196220
meta_file_path: &str,
221+
temporary_dir: &str,
222+
meta: Buffer,
197223
limit: usize,
198224
removed_total: &mut usize,
199225
) -> Result<usize> {
200226
let operator = DataOperator::instance().operator();
201-
let meta: Buffer;
202-
let r = operator.read(meta_file_path).await;
203-
match r {
204-
Ok(r) => meta = r,
205-
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(0),
206-
Err(e) => return Err(e.into()),
207-
}
208-
let meta = meta.to_bytes();
209227
let start_time = Instant::now();
228+
let meta = meta.to_bytes();
210229
let files: Vec<String> = meta.lines().map(|x| Ok(x?)).collect::<Result<Vec<_>>>()?;
211230

212231
let (to_be_removed, remain) = files.split_at(limit.min(files.len()));
@@ -240,6 +259,23 @@ async fn vacuum_by_meta(
240259
Ok(cur_removed)
241260
}
242261

262+
async fn vacuum_by_meta(
263+
temporary_dir: &str,
264+
meta_file_path: &str,
265+
limit: usize,
266+
removed_total: &mut usize,
267+
) -> Result<usize> {
268+
let operator = DataOperator::instance().operator();
269+
let meta: Buffer;
270+
let r = operator.read(meta_file_path).await;
271+
match r {
272+
Ok(r) => meta = r,
273+
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(0),
274+
Err(e) => return Err(e.into()),
275+
}
276+
vacuum_by_meta_buffer(meta_file_path, temporary_dir, meta, limit, removed_total).await
277+
}
278+
243279
async fn vacuum_by_list_dir(
244280
dir_path: &str,
245281
limit: usize,

src/query/ee_features/vacuum_handler/src/vacuum_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ pub trait VacuumHandler: Sync + Send {
5858

5959
#[derive(Debug, Clone)]
6060
pub enum VacuumTempOptions {
61-
// nodes_num, query_id
62-
QueryHook(usize, String),
61+
// nodes, query_id
62+
QueryHook(Vec<usize>, String),
6363
VacuumCommand(Option<Duration>),
6464
}
6565

src/query/service/src/clusters/cluster.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub trait ClusterHelper {
8080
fn is_local(&self, node: &NodeInfo) -> bool;
8181
fn local_id(&self) -> String;
8282
fn ordered_index(&self) -> usize;
83+
fn index_of_nodeid(&self, node_id: &str) -> Option<usize>;
8384

8485
fn get_nodes(&self) -> Vec<Arc<NodeInfo>>;
8586

@@ -125,6 +126,12 @@ impl ClusterHelper for Cluster {
125126
.unwrap_or(0)
126127
}
127128

129+
fn index_of_nodeid(&self, node_id: &str) -> Option<usize> {
130+
let mut nodes = self.get_nodes();
131+
nodes.sort_by(|a, b| a.id.cmp(&b.id));
132+
nodes.iter().position(|x| x.id == node_id)
133+
}
134+
128135
fn get_nodes(&self) -> Vec<Arc<NodeInfo>> {
129136
self.nodes.to_vec()
130137
}

src/query/service/src/interpreters/hook/vacuum_hook.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617

1718
use databend_common_base::runtime::GlobalIORuntime;
1819
use databend_common_catalog::table_context::TableContext;
20+
use databend_common_exception::ErrorCode;
1921
use databend_common_exception::Result;
2022
use databend_common_license::license::Feature::Vacuum;
2123
use databend_common_license::license_manager::LicenseManagerSwitch;
22-
use databend_common_storage::DataOperator;
2324
use databend_enterprise_vacuum_handler::get_vacuum_handler;
2425
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions;
2526
use databend_storages_common_cache::TempDirManager;
2627
use log::warn;
27-
use opendal::Buffer;
2828
use rand::Rng;
2929

3030
use crate::clusters::ClusterHelper;
@@ -43,16 +43,35 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
4343
{
4444
let handler = get_vacuum_handler();
4545

46-
let cluster_nodes = query_ctx.get_cluster().get_nodes().len();
46+
let cluster = query_ctx.get_cluster();
4747
let query_id = query_ctx.get_id();
4848

49+
let mut node_files = HashMap::new();
50+
for node in cluster.nodes.iter() {
51+
let num = query_ctx.get_spill_file_nums(Some(node.id.clone()));
52+
if num != 0 {
53+
if let Some(index) = cluster.index_of_nodeid(&node.id) {
54+
node_files.insert(index, num);
55+
}
56+
}
57+
}
58+
if node_files.is_empty() {
59+
return Ok(());
60+
}
61+
62+
log::info!(
63+
"Vacuum temporary files by hook, node files: {:?}",
64+
node_files
65+
);
66+
67+
let nodes = node_files.keys().cloned().collect::<Vec<usize>>();
4968
let abort_checker = query_ctx.clone().get_abort_checker();
50-
let _ = GlobalIORuntime::instance().block_on(async move {
69+
let _ = GlobalIORuntime::instance().block_on::<(), ErrorCode, _>(async move {
5170
let removed_files = handler
5271
.do_vacuum_temporary_files(
5372
abort_checker,
5473
spill_prefix.clone(),
55-
&VacuumTempOptions::QueryHook(cluster_nodes, query_id),
74+
&VacuumTempOptions::QueryHook(nodes, query_id),
5675
vacuum_limit as usize,
5776
)
5877
.await;
@@ -61,15 +80,6 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
6180
log::warn!("Vacuum temporary files has error: {:?}", cause);
6281
}
6382

64-
if vacuum_limit != 0 && matches!(removed_files, Ok(res) if res == vacuum_limit as usize)
65-
{
66-
// Have not been removed files
67-
let op = DataOperator::instance().operator();
68-
op.create_dir(&format!("{}/", spill_prefix)).await?;
69-
op.write(&format!("{}/finished", spill_prefix), Buffer::new())
70-
.await?;
71-
}
72-
7383
Ok(())
7484
});
7585
}

src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl StatisticsReceiver {
4444
let mut exchange_handler = Vec::with_capacity(statistics_exchanges.len());
4545
let runtime = Runtime::with_worker_threads(2, Some(String::from("StatisticsReceiver")))?;
4646

47-
for (_source, exchange) in statistics_exchanges.into_iter() {
47+
for (source_target, exchange) in statistics_exchanges.into_iter() {
4848
let rx = exchange.convert_to_receiver();
4949
exchange_handler.push(runtime.spawn({
5050
let ctx = ctx.clone();
@@ -62,7 +62,11 @@ impl StatisticsReceiver {
6262
return Ok(());
6363
}
6464
Either::Left((Ok(false), recv)) => {
65-
match StatisticsReceiver::recv_data(&ctx, recv.await) {
65+
match StatisticsReceiver::recv_data(
66+
&ctx,
67+
&source_target,
68+
recv.await,
69+
) {
6670
Ok(true) => {
6771
return Ok(());
6872
}
@@ -71,7 +75,11 @@ impl StatisticsReceiver {
7175
return Err(cause);
7276
}
7377
_ => loop {
74-
match StatisticsReceiver::recv_data(&ctx, rx.recv().await) {
78+
match StatisticsReceiver::recv_data(
79+
&ctx,
80+
&source_target,
81+
rx.recv().await,
82+
) {
7583
Ok(true) => {
7684
return Ok(());
7785
}
@@ -86,7 +94,7 @@ impl StatisticsReceiver {
8694
}
8795
}
8896
Either::Right((res, left)) => {
89-
match StatisticsReceiver::recv_data(&ctx, res) {
97+
match StatisticsReceiver::recv_data(&ctx, &source_target, res) {
9098
Ok(true) => {
9199
return Ok(());
92100
}
@@ -113,7 +121,11 @@ impl StatisticsReceiver {
113121
})
114122
}
115123

116-
fn recv_data(ctx: &Arc<QueryContext>, recv_data: Result<Option<DataPacket>>) -> Result<bool> {
124+
fn recv_data(
125+
ctx: &Arc<QueryContext>,
126+
source_target: &str,
127+
recv_data: Result<Option<DataPacket>>,
128+
) -> Result<bool> {
117129
match recv_data {
118130
Ok(None) => Ok(true),
119131
Err(transport_error) => Err(transport_error),
@@ -122,7 +134,7 @@ impl StatisticsReceiver {
122134
Ok(Some(DataPacket::FragmentData(_))) => unreachable!(),
123135
Ok(Some(DataPacket::SerializeProgress(progress))) => {
124136
for progress_info in progress {
125-
progress_info.inc(ctx);
137+
progress_info.inc(source_target, ctx);
126138
}
127139

128140
Ok(false)

src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ impl StatisticsSender {
230230
progress_info.push(ProgressInfo::ResultProgress(result_progress_values));
231231
}
232232

233+
let spill_file_nums = ctx.get_spill_file_nums(None);
234+
if spill_file_nums != 0 {
235+
progress_info.push(ProgressInfo::SpillTotalFileNums(spill_file_nums));
236+
}
233237
progress_info
234238
}
235239

src/query/service/src/servers/flight/v1/packets/packet_data_progressinfo.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,18 @@ pub enum ProgressInfo {
3333
ScanProgress(ProgressValues),
3434
WriteProgress(ProgressValues),
3535
ResultProgress(ProgressValues),
36+
SpillTotalFileNums(usize),
3637
}
3738

3839
impl ProgressInfo {
39-
pub fn inc(&self, ctx: &Arc<QueryContext>) {
40+
pub fn inc(&self, source_target: &str, ctx: &Arc<QueryContext>) {
4041
match self {
4142
ProgressInfo::ScanProgress(values) => ctx.get_scan_progress().incr(values),
4243
ProgressInfo::WriteProgress(values) => ctx.get_write_progress().incr(values),
4344
ProgressInfo::ResultProgress(values) => ctx.get_result_progress().incr(values),
45+
ProgressInfo::SpillTotalFileNums(values) => {
46+
ctx.set_cluster_spill_file_nums(source_target, *values)
47+
}
4448
};
4549
}
4650

@@ -49,6 +53,11 @@ impl ProgressInfo {
4953
ProgressInfo::ScanProgress(values) => (1_u8, values),
5054
ProgressInfo::WriteProgress(values) => (2_u8, values),
5155
ProgressInfo::ResultProgress(values) => (3_u8, values),
56+
ProgressInfo::SpillTotalFileNums(values) => {
57+
bytes.write_u8(4)?;
58+
bytes.write_u64::<BigEndian>(values as u64)?;
59+
return Ok(());
60+
}
5261
};
5362

5463
bytes.write_u8(info_type)?;
@@ -59,6 +68,12 @@ impl ProgressInfo {
5968

6069
pub fn read<T: Read>(bytes: &mut T) -> Result<ProgressInfo> {
6170
let info_type = bytes.read_u8()?;
71+
72+
if info_type == 4 {
73+
let values = bytes.read_u64::<BigEndian>()? as usize;
74+
return Ok(ProgressInfo::SpillTotalFileNums(values));
75+
}
76+
6277
let rows = bytes.read_u64::<BigEndian>()? as usize;
6378
let bytes = bytes.read_u64::<BigEndian>()? as usize;
6479

0 commit comments

Comments
 (0)