Skip to content

Commit 996a147

Browse files
committed
Support stage identification via stage id in metrics
1 parent e06af25 commit 996a147

File tree

4 files changed

+33
-26
lines changed

4 files changed

+33
-26
lines changed

spark-ui/src/interfaces/AppStore.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ export interface EnrichedSqlNode {
302302
rddScopeId: string | undefined;
303303
icebergCommit: IcebergCommitsInfo | undefined;
304304
cachedStorage: RddStorageInfo | undefined;
305+
nodeIdFromMetrics: number | undefined;
305306
}
306307

307308
export interface SQLNodeExchangeStageData {
@@ -322,6 +323,7 @@ export interface SQLNodeStageData {
322323
export interface EnrichedSqlMetric {
323324
name: string;
324325
value: string;
326+
stageId?: number | undefined;
325327
}
326328

327329
export interface EnrichedSqlEdge {

spark-ui/src/reducers/SQLNodeStageReducer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
import { calculatePercentage } from "../utils/FormatUtils";
1111
import { generateGraph } from "./PlanGraphUtils";
1212
import { calculateNodeToStorageInfo } from "./SqlReducer";
13-
import { findExchangeStageIds, findStageIdFromMetrics, isExchangeNode } from "./SqlReducerUtils";
13+
import { findExchangeStageIds, isExchangeNode } from "./SqlReducerUtils";
1414

1515
export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkStagesStore): EnrichedSparkSQL {
1616
let nodes = sql.nodes;
@@ -316,8 +316,8 @@ export function calculateSqlStage(
316316
// Use exchange-specific metric parsing
317317
const { readStageId, writeStageId } = findExchangeStageIds(node.metrics);
318318
stageData = createExchangeStageData(readStageId, writeStageId, stages);
319-
} else if (databricksRddStageId !== undefined || stageCodegen?.stage?.stageId !== undefined) {
320-
metricsStageIdHint = findStageIdFromMetrics(node.metrics);
319+
} else {
320+
metricsStageIdHint = stageCodegen?.nodeIdFromMetrics ?? node.nodeIdFromMetrics;
321321
}
322322

323323
stageData = stageData ?? stageDataFromStage(

spark-ui/src/reducers/SqlReducer.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import {
4242
calcNodeMetrics,
4343
calcNodeType,
4444
extractTotalFromStatisticsMetric,
45+
findStageIdFromMetrics,
4546
nodeEnrichedNameBuilder,
4647
} from "./SqlReducerUtils";
4748

@@ -251,7 +252,8 @@ function calculateSql(
251252
.filter((node) => node.isCodegenNode)
252253
.map((node) => {
253254
const codegenDuration = calcCodegenDuration(node.metrics);
254-
return { ...node, codegenDuration: codegenDuration };
255+
const nodeIdFromMetrics = findStageIdFromMetrics(node.metrics);
256+
return { ...node, codegenDuration: codegenDuration, nodeIdFromMetrics: nodeIdFromMetrics };
255257
});
256258

257259
const onlyGraphNodes = typeEnrichedNodes.filter(
@@ -276,8 +278,12 @@ function calculateSql(
276278
node.metrics,
277279
);
278280

281+
const nodeIdFromMetrics = findStageIdFromMetrics(node.metrics);
282+
console.log(`node.metrics from sql plan: ${JSON.stringify(node.metrics)}, nodeIdFromMetrics: ${nodeIdFromMetrics}`);
283+
279284
return {
280285
...node,
286+
nodeIdFromMetrics: nodeIdFromMetrics,
281287
metrics: updateNodeMetrics(node, node.metrics, graph, onlyGraphNodes),
282288
enrichedName: updateNodeEnrichedName(node, onlyGraphNodes, graph),
283289
parsedPlan: updateParsedPlan(node, onlyGraphNodes, graph),
@@ -449,13 +455,15 @@ export function updateSqlNodeMetrics(
449455

450456
// TODO: cache the graph
451457
const graph = generateGraph(runningSql.edges, runningSql.nodes);
458+
const nodeIdFromMetrics = findStageIdFromMetrics(matchedMetricsNodes[0].metrics);
452459
const metrics = updateNodeMetrics(node, matchedMetricsNodes[0].metrics, graph, runningSql.nodes);
453460
const exchangeMetrics = calcExchangeMetrics(node.nodeName, metrics);
454461

455462
// TODO: maybe do a smarter replacement, or send only the initialized metrics
456463
return {
457464
...node,
458465
metrics: metrics,
466+
nodeIdFromMetrics: nodeIdFromMetrics,
459467
exchangeMetrics: exchangeMetrics,
460468
};
461469
});
@@ -468,12 +476,13 @@ export function updateSqlNodeMetrics(
468476
return node;
469477
}
470478

479+
const nodeIdFromMetrics = findStageIdFromMetrics(matchedMetricsNodes[0].metrics);
471480
const metrics = calcNodeMetrics(node.type, matchedMetricsNodes[0].metrics);
472481
const codegenDuration = calcCodegenDuration(metrics);
473-
474482
return {
475483
...node,
476484
codegenDuration: codegenDuration,
485+
nodeIdFromMetrics: nodeIdFromMetrics,
477486
};
478487
});
479488
const nodesWithStorageInfo = calculateNodeToStorageInfo(stages, nodes);

spark-ui/src/reducers/SqlReducerUtils.ts

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,12 @@ export function calcNodeMetrics(
391391
return (
392392
metrics
393393
.filter((metric) => allowList.includes(metric.name))
394+
.map((metric) => {
395+
const stageId = extractStageFromSummaryMetric(metric.value);
396+
return stageId === undefined
397+
? metric
398+
: { ...metric, stageId: stageId };
399+
})
394400
.map((metric) => {
395401
const valueTransformer = metricsValueTransformer[metric.name];
396402
if (valueTransformer === undefined) {
@@ -420,7 +426,7 @@ export function calcNodeMetrics(
420426
* @returns The stage ID as a number, or undefined if parsing fails or no stage info is found
421427
*/
422428
export function extractStageFromSummaryMetric(metricValue: string): number | undefined {
423-
if (!metricValue) {
429+
if (!metricValue || !metricValue.includes("(stage")) {
424430
return undefined;
425431
}
426432

@@ -450,13 +456,12 @@ export function extractStageFromSummaryMetric(metricValue: string): number | und
450456
*/
451457
export function findStageIdFromMetrics(metrics: EnrichedSqlMetric[]): number | undefined {
452458
for (const metric of metrics) {
453-
if (metric.value && metric.value.includes("(stage")) {
454-
const stageId = extractStageFromSummaryMetric(metric.value);
455-
if (stageId !== undefined) {
456-
return stageId;
457-
}
459+
const stageId = extractStageFromSummaryMetric(metric.value);
460+
if (stageId !== undefined) {
461+
return stageId;
458462
}
459463
}
464+
460465
return undefined;
461466
}
462467

@@ -491,15 +496,15 @@ export function isExchangeNode(nodeName: string): boolean {
491496
const EXCHANGE_READ_STAGE_METRICS = [
492497
"local bytes read",
493498
"fetch wait time total"
494-
] as const;
499+
];
495500

496501
/**
497502
* Metric names that indicate write stage information for Exchange nodes.
498503
*/
499504
const EXCHANGE_WRITE_STAGE_METRICS = [
500505
"shuffle write time",
501506
"shuffle bytes written"
502-
] as const;
507+
];
503508

504509
/**
505510
* Extracts stage information from Exchange node metrics for read and write stages.
@@ -514,23 +519,14 @@ export function findExchangeStageIds(metrics: EnrichedSqlMetric[]): {
514519
let writeStageId: number | undefined;
515520

516521
for (const metric of metrics) {
517-
if (!metric.value || !metric.value.includes("(stage")) {
518-
continue;
519-
}
520-
521-
const stageId = extractStageFromSummaryMetric(metric.value);
522-
if (stageId === undefined) {
523-
continue;
524-
}
525-
526522
// Check if this metric indicates a read stage
527-
if (EXCHANGE_READ_STAGE_METRICS.includes(metric.name as any)) {
528-
readStageId = stageId;
523+
if (EXCHANGE_READ_STAGE_METRICS.includes(metric.name)) {
524+
readStageId = metric.stageId;
529525
}
530526

531527
// Check if this metric indicates a write stage
532-
if (EXCHANGE_WRITE_STAGE_METRICS.includes(metric.name as any)) {
533-
writeStageId = stageId;
528+
if (EXCHANGE_WRITE_STAGE_METRICS.includes(metric.name)) {
529+
writeStageId = metric.stageId;
534530
}
535531
}
536532

0 commit comments

Comments
 (0)