File tree Expand file tree Collapse file tree 5 files changed +50
-45
lines changed
service/src/pipelines/builders
storages/fuse/src/operations Expand file tree Collapse file tree 5 files changed +50
-45
lines changed Original file line number Diff line number Diff line change @@ -151,24 +151,25 @@ impl Runtime {
151151
152152 let n = name. clone ( ) ;
153153 // Block the runtime to shutdown.
154- let join_handler = Thread :: spawn ( move || {
155- let _ = runtime. block_on ( recv_stop) ;
156- info ! (
157- "Runtime({:?}) received shutdown signal, start to shut down" ,
158- n
159- ) ;
160-
161- match !cfg ! ( debug_assertions) {
162- true => false ,
163- false => {
164- let instant = Instant :: now ( ) ;
165- // We wait up to 3 seconds to complete the runtime shutdown.
166- runtime. shutdown_timeout ( Duration :: from_secs ( 3 ) ) ;
167-
168- instant. elapsed ( ) >= Duration :: from_secs ( 3 )
154+ let join_handler =
155+ Thread :: named_spawn ( n. as_ref ( ) . map ( |n| format ! ( "wait-to-drop-{n}" ) ) , move || {
156+ let _ = runtime. block_on ( recv_stop) ;
157+ info ! (
158+ "Runtime({:?}) received shutdown signal, start to shut down" ,
159+ n
160+ ) ;
161+
162+ match !cfg ! ( debug_assertions) {
163+ true => false ,
164+ false => {
165+ let instant = Instant :: now ( ) ;
166+ // We wait up to 3 seconds to complete the runtime shutdown.
167+ runtime. shutdown_timeout ( Duration :: from_secs ( 3 ) ) ;
168+
169+ instant. elapsed ( ) >= Duration :: from_secs ( 3 )
170+ }
169171 }
170- }
171- } ) ;
172+ } ) ;
172173
173174 Ok ( Runtime {
174175 handle,
Original file line number Diff line number Diff line change @@ -73,18 +73,20 @@ impl PipelineBuilder {
7373 let column_ids = compact_block. column_ids . clone ( ) ;
7474 self . main_pipeline . set_on_init ( move || {
7575 let ctx = query_ctx. clone ( ) ;
76- let partitions = Runtime :: with_worker_threads ( 2 , None ) ?. block_on ( async move {
77- let partitions = BlockCompactMutator :: build_compact_tasks (
78- ctx. clone ( ) ,
79- column_ids. clone ( ) ,
80- cluster_key_id,
81- thresholds,
82- lazy_parts,
83- )
84- . await ?;
76+ let partitions =
77+ Runtime :: with_worker_threads ( 2 , Some ( "build_compact_tasks" . to_string ( ) ) ) ?
78+ . block_on ( async move {
79+ let partitions = BlockCompactMutator :: build_compact_tasks (
80+ ctx. clone ( ) ,
81+ column_ids. clone ( ) ,
82+ cluster_key_id,
83+ thresholds,
84+ lazy_parts,
85+ )
86+ . await ?;
8587
86- Result :: < _ > :: Ok ( partitions)
87- } ) ?;
88+ Result :: < _ > :: Ok ( partitions)
89+ } ) ?;
8890
8991 let partitions = Partitions :: create ( PartitionsShuffleKind :: Mod , partitions) ;
9092 query_ctx. set_partitions ( partitions) ?;
Original file line number Diff line number Diff line change @@ -68,20 +68,21 @@ impl PipelineBuilder {
6868 segment_locations,
6969 block_count : None ,
7070 } ;
71- Runtime :: with_worker_threads ( 2 , None ) ?. block_on ( async move {
72- let ( partitions, _) = table_clone
73- . do_mutation_block_pruning (
74- ctx_clone,
75- filters_clone,
76- projection,
77- prune_ctx,
78- true ,
79- true ,
80- )
81- . await ?;
82- ctx. set_partitions ( partitions) ?;
83- Ok ( ( ) )
84- } ) ?;
71+ Runtime :: with_worker_threads ( 2 , Some ( "do_mutation_block_pruning" . to_string ( ) ) ) ?
72+ . block_on ( async move {
73+ let ( partitions, _) = table_clone
74+ . do_mutation_block_pruning (
75+ ctx_clone,
76+ filters_clone,
77+ projection,
78+ prune_ctx,
79+ true ,
80+ true ,
81+ )
82+ . await ?;
83+ ctx. set_partitions ( partitions) ?;
84+ Ok ( ( ) )
85+ } ) ?;
8586 } else {
8687 self . ctx
8788 . set_partitions ( mutation_source. partitions . clone ( ) ) ?;
Original file line number Diff line number Diff line change @@ -209,7 +209,8 @@ impl FuseTable {
209209 // We cannot use the runtime associated with the query to avoid increasing its lifetime.
210210 GlobalIORuntime :: instance ( ) . spawn ( async move {
211211 // avoid block global io runtime
212- let runtime = Runtime :: with_worker_threads ( 2 , None ) ?;
212+ let runtime =
213+ Runtime :: with_worker_threads ( 2 , Some ( "prune-snap-blk" . to_string ( ) ) ) ?;
213214 let handler = runtime. spawn ( async move {
214215 match table
215216 . prune_snapshot_blocks (
Original file line number Diff line number Diff line change @@ -200,7 +200,7 @@ impl FuseTable {
200200 // We cannot use the runtime associated with the query to avoid increasing its lifetime.
201201 GlobalIORuntime :: instance ( ) . spawn ( async move {
202202 // avoid block global io runtime
203- let runtime = Runtime :: with_worker_threads ( 2 , None ) ?;
203+ let runtime = Runtime :: with_worker_threads ( 2 , Some ( "send-parts" . to_string ( ) ) ) ?;
204204
205205 let join_handler = runtime. spawn ( async move {
206206 for part in part. partitions {
@@ -241,7 +241,7 @@ impl FuseTable {
241241 // We cannot use the runtime associated with the query to avoid increasing its lifetime.
242242 GlobalIORuntime :: instance ( ) . spawn ( async move {
243243 // avoid block global io runtime
244- let runtime = Runtime :: with_worker_threads ( 2 , None ) ?;
244+ let runtime = Runtime :: with_worker_threads ( 2 , Some ( "prune-seg" . to_string ( ) ) ) ?;
245245 let join_handler = runtime. spawn ( async move {
246246 let segment_pruned_result =
247247 pruner. clone ( ) . segment_pruning ( lazy_init_segments) . await ?;
You can’t perform that action at this time.
0 commit comments