@@ -12,8 +12,12 @@ use prometheus_client::collector::Collector;
1212use prometheus_client:: encoding:: EncodeMetric ;
1313use prometheus_client:: metrics:: counter:: ConstCounter ;
1414
15- use super :: SessionManager ;
15+ use crate :: sessions :: SessionManager ;
1616
17+ /// [`SessionManagerMetricsCollector`] dumps the progress metrics of scan/write/spills
18+ /// from the [`SessionManager`]'s running queries to the prometheus. To avoid the progress
19+ /// metrics being decreased, we also need to accumulate these progress values after the query
20+ /// is finished.
1721#[ derive( Clone ) ]
1822pub struct SessionManagerMetricsCollector {
1923 inner : Arc < Mutex < SessionManagerMetricsCollectorInner > > ,
@@ -82,81 +86,66 @@ impl Collector for SessionManagerMetricsCollector {
8286 }
8387 } ;
8488
85- let mut scan_bytes = 0 ;
86- let mut scan_rows = 0 ;
87- let mut write_bytes = 0 ;
88- let mut write_rows = 0 ;
89- let mut spill_bytes = 0 ;
90- let mut spill_rows = 0 ;
91- // TODO: ensure the process is RUNNING
89+ let ( mut scan_progress, mut write_progress, mut spill_progress) = {
90+ let guard = self . inner . lock ( ) ;
91+ (
92+ guard. finished_scan_total . clone ( ) ,
93+ guard. finished_write_total . clone ( ) ,
94+ guard. finished_spill_total . clone ( ) ,
95+ )
96+ } ;
9297 for process in processes {
9398 if let Some ( scan) = & process. scan_progress_value {
94- scan_bytes += scan. bytes ;
95- scan_rows += scan. rows ;
99+ scan_progress = scan_progress. add ( scan) ;
96100 }
97101 if let Some ( write) = & process. write_progress_value {
98- write_bytes += write. bytes ;
99- write_rows += write. rows ;
102+ write_progress = write_progress. add ( write) ;
100103 }
101104 if let Some ( spill) = & process. spill_progress_value {
102- spill_bytes += spill. bytes ;
103- spill_rows += spill. rows ;
105+ spill_progress = spill_progress. add ( spill) ;
104106 }
105107 }
106108
107- let scan_rows_counter = ConstCounter :: new ( scan_rows as f64 ) ;
108- let scan_rows_encoder = encoder. encode_descriptor (
109- METRIC_QUERY_SCAN_PROGRESS_ROWS ,
110- "Total scan rows in progress." ,
111- None ,
112- scan_rows_counter. metric_type ( ) ,
113- ) ?;
114- scan_rows_counter. encode ( scan_rows_encoder) ?;
115-
116- let scan_bytes_counter = ConstCounter :: new ( scan_bytes as f64 ) ;
117- let scan_bytes_encoder = encoder. encode_descriptor (
118- METRIC_QUERY_SCAN_PROGRESS_BYTES ,
119- "Total scan bytes in progress." ,
120- None ,
121- scan_bytes_counter. metric_type ( ) ,
122- ) ?;
123- scan_bytes_counter. encode ( scan_bytes_encoder) ?;
124-
125- let write_rows_counter = ConstCounter :: new ( write_rows as f64 ) ;
126- let write_rows_encoder = encoder. encode_descriptor (
127- METRIC_QUERY_WRITE_PROGRESS_ROWS ,
128- "Total write rows in progress." ,
129- None ,
130- write_rows_counter. metric_type ( ) ,
131- ) ?;
132- write_rows_counter. encode ( write_rows_encoder) ?;
133-
134- let write_bytes_counter = ConstCounter :: new ( write_bytes as f64 ) ;
135- let write_bytes_encoder = encoder. encode_descriptor (
136- METRIC_QUERY_WRITE_PROGRESS_BYTES ,
137- "Total write bytes in progress." ,
138- None ,
139- write_bytes_counter. metric_type ( ) ,
140- ) ?;
141- write_bytes_counter. encode ( write_bytes_encoder) ?;
142-
143- let spill_rows_counter = ConstCounter :: new ( spill_rows as f64 ) ;
144- let spill_rows_encoder = encoder. encode_descriptor (
145- METRIC_QUERY_SPILL_PROGRESS_ROWS ,
146- "Total spill rows in progress." ,
147- None ,
148- spill_rows_counter. metric_type ( ) ,
149- ) ?;
150- spill_rows_counter. encode ( spill_rows_encoder) ?;
109+ let metrics = vec ! [
110+ (
111+ METRIC_QUERY_SCAN_PROGRESS_ROWS ,
112+ scan_progress. rows as f64 ,
113+ "Total scan rows in progress." ,
114+ ) ,
115+ (
116+ METRIC_QUERY_SCAN_PROGRESS_BYTES ,
117+ scan_progress. bytes as f64 ,
118+ "Total scan bytes in progress." ,
119+ ) ,
120+ (
121+ METRIC_QUERY_WRITE_PROGRESS_ROWS ,
122+ write_progress. rows as f64 ,
123+ "Total write rows in progress." ,
124+ ) ,
125+ (
126+ METRIC_QUERY_WRITE_PROGRESS_BYTES ,
127+ write_progress. bytes as f64 ,
128+ "Total write bytes in progress." ,
129+ ) ,
130+ (
131+ METRIC_QUERY_SPILL_PROGRESS_ROWS ,
132+ spill_progress. rows as f64 ,
133+ "Total spill rows in progress." ,
134+ ) ,
135+ (
136+ METRIC_QUERY_SPILL_PROGRESS_BYTES ,
137+ spill_progress. bytes as f64 ,
138+ "Total spill bytes in progress." ,
139+ ) ,
140+ ] ;
141+
142+ for ( name, value, help) in metrics {
143+ let counter = ConstCounter :: new ( value) ;
144+ let counter_encoder =
145+ encoder. encode_descriptor ( name, help, None , counter. metric_type ( ) ) ?;
146+ counter. encode ( counter_encoder) ?;
147+ }
151148
152- let spill_bytes_counter = ConstCounter :: new ( spill_bytes as f64 ) ;
153- let spill_bytes_encoder = encoder. encode_descriptor (
154- METRIC_QUERY_SPILL_PROGRESS_BYTES ,
155- "Total spill bytes in progress." ,
156- None ,
157- spill_bytes_counter. metric_type ( ) ,
158- ) ?;
159- spill_bytes_counter. encode ( spill_bytes_encoder) ?;
160149 Ok ( ( ) )
161150 }
162151}
0 commit comments