File tree Expand file tree Collapse file tree 3 files changed +7
-5
lines changed
ee/src/storages/fuse/operations Expand file tree Collapse file tree 3 files changed +7
-5
lines changed Original file line number Diff line number Diff line change @@ -173,8 +173,10 @@ async fn vacuum_by_duration(
173173 Ok ( removed_total)
174174}
175175
176+ // Vacuum temporary files by query hook
177+ // If query was killed, we still need to clean up the temporary files
176178async fn vacuum_query_hook (
177- abort_checker : AbortChecker ,
179+ _abort_checker : AbortChecker ,
178180 temporary_dir : & str ,
179181 nodes : & [ usize ] ,
180182 query_id : & str ,
@@ -198,7 +200,6 @@ async fn vacuum_query_hook(
198200 . filter_map ( |x| x. is_ok ( ) . then ( || x. unwrap ( ) ) ) ;
199201
200202 for ( meta_file_path, buffer) in metas {
201- abort_checker. try_check_aborting ( ) ?;
202203 let removed = vacuum_by_meta_buffer (
203204 & meta_file_path,
204205 temporary_dir,
Original file line number Diff line number Diff line change @@ -45,11 +45,12 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
4545
4646 let cluster = query_ctx. get_cluster ( ) ;
4747 let query_id = query_ctx. get_id ( ) ;
48-
48+ let abort_checker = query_ctx . clone ( ) . get_abort_checker ( ) ;
4949 let mut node_files = HashMap :: new ( ) ;
5050 for node in cluster. nodes . iter ( ) {
5151 let stats = query_ctx. get_spill_file_stats ( Some ( node. id . clone ( ) ) ) ;
52- if stats. file_nums != 0 {
52+ // if query was aborted, we can't trust the stats
53+ if stats. file_nums != 0 || abort_checker. try_check_aborting ( ) . is_err ( ) {
5354 if let Some ( index) = cluster. index_of_nodeid ( & node. id ) {
5455 node_files. insert ( index, stats. file_nums ) ;
5556 }
@@ -65,7 +66,6 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
6566 ) ;
6667
6768 let nodes = node_files. keys ( ) . cloned ( ) . collect :: < Vec < usize > > ( ) ;
68- let abort_checker = query_ctx. clone ( ) . get_abort_checker ( ) ;
6969 let _ = GlobalIORuntime :: instance ( ) . block_on :: < ( ) , ErrorCode , _ > ( async move {
7070 let removed_files = handler
7171 . do_vacuum_temporary_files (
Original file line number Diff line number Diff line change @@ -401,6 +401,7 @@ impl QueryContext {
401401 pub fn get_spill_file_stats ( & self , node_id : Option < String > ) -> SpillProgress {
402402 let r = self . shared . cluster_spill_progress . read ( ) ;
403403 let node_id = node_id. unwrap_or ( self . get_cluster ( ) . local_id ( ) ) ;
404+
404405 r. get ( & node_id) . cloned ( ) . unwrap_or ( SpillProgress :: default ( ) )
405406 }
406407
You can’t perform that action at this time.
0 commit comments