Skip to content

Commit e64a239

Browse files
committed
add support for Expand nodes
1 parent b7c90e0 commit e64a239

File tree

7 files changed

+185
-1
lines changed

7 files changed

+185
-1
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.dataflint.example
2+
3+
import org.apache.spark.sql.{SparkSession, functions => F}
4+
5+
object ExpandExample extends App {
6+
val spark = SparkSession
7+
.builder()
8+
.appName("Expand ETL")
9+
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin")
10+
.config("spark.ui.port", "10000")
11+
.master("local[*]")
12+
.getOrCreate()
13+
14+
val salesFilesLocation = sys.env.getOrElse("SALES_FILES_LOCATION", throw new Exception("SALES_FILES_LOCATION env var not set"))
15+
16+
val df = spark.read.load(salesFilesLocation)
17+
18+
// Query with multiple count distincts
19+
val dfCounts = df.select(
20+
F.countDistinct(F.col("ss_customer_sk")).alias("distinct_customers"),
21+
F.countDistinct(F.col("ss_item_sk")).alias("distinct_items"),
22+
F.countDistinct(F.col("ss_store_sk")).alias("distinct_stores"),
23+
F.countDistinct(F.col("ss_promo_sk")).alias("distinct_promotions")
24+
)
25+
26+
dfCounts.show()
27+
28+
scala.io.StdIn.readLine("job ended, press any key to continue..")
29+
30+
spark.stop()
31+
}
32+

spark-ui/src/components/SqlFlow/StageNode.tsx

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,29 @@ export const StageNode: FC<{
594594
);
595595
}
596596
break;
597+
case "Expand":
598+
if (
599+
parsedPlan.plan.fields !== undefined &&
600+
parsedPlan.plan.fields.length > 0
601+
) {
602+
addTruncatedCodeTooltipMultiline(
603+
dataTable,
604+
"Expanded Fields",
605+
parsedPlan.plan.fields,
606+
);
607+
}
608+
if (parsedPlan.plan.idField !== undefined) {
609+
addTruncatedSmallTooltip(
610+
dataTable,
611+
"ID Field",
612+
parsedPlan.plan.idField,
613+
25,
614+
true,
615+
false,
616+
false,
617+
);
618+
}
619+
break;
597620
}
598621
}
599622

spark-ui/src/interfaces/AppStore.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ export type ParsedGeneratePlan = {
255255
selectedFields: string[];
256256
};
257257

258+
export type ParsedExpandPlan = {
259+
fields: string[];
260+
idField: string;
261+
};
262+
258263
export type ParsedNodePlan =
259264
| { type: "HashAggregate"; plan: ParsedHashAggregatePlan }
260265
| { type: "TakeOrderedAndProject"; plan: ParsedTakeOrderedAndProjectPlan }
@@ -269,7 +274,8 @@ export type ParsedNodePlan =
269274
| { type: "Window"; plan: ParsedWindowPlan }
270275
| { type: "Coalesce"; plan: ParsedCoalescePlan }
271276
| { type: "BatchEvalPython"; plan: ParsedBatchEvalPythonPlan }
272-
| { type: "Generate"; plan: ParsedGeneratePlan };
277+
| { type: "Generate"; plan: ParsedGeneratePlan }
278+
| { type: "Expand"; plan: ParsedExpandPlan };
273279

274280

275281
export interface ExchangeMetrics {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { parseExpand } from "./ExpandParser";
2+
3+
describe("parseExpand", () => {
4+
test("should parse expand plan correctly", () => {
5+
const input =
6+
"Expand [[ss_customer_sk#2, null, null, null, 1], [null, ss_item_sk#1, null, null, 2], [null, null, ss_store_sk#6, null, 3], [null, null, null, ss_promo_sk#7, 4]], [ss_customer_sk#60, ss_item_sk#61, ss_store_sk#62, ss_promo_sk#63, gid#59]";
7+
8+
const result = parseExpand(input);
9+
10+
expect(result).toEqual({
11+
fields: [
12+
"ss_customer_sk",
13+
"ss_item_sk",
14+
"ss_store_sk",
15+
"ss_promo_sk"
16+
],
17+
idField: "gid"
18+
});
19+
});
20+
21+
test("should handle expand plan with different field names", () => {
22+
const input =
23+
"Expand [[field1#1, null, 0], [null, field2#2, 1]], [output1#10, output2#11, group_id#12]";
24+
25+
const result = parseExpand(input);
26+
27+
expect(result).toEqual({
28+
fields: [
29+
"output1",
30+
"output2"
31+
],
32+
idField: "group_id"
33+
});
34+
});
35+
36+
test("should throw error for invalid format", () => {
37+
const input = "Invalid expand format";
38+
39+
expect(() => parseExpand(input)).toThrow("Invalid Expand plan format: could not find output fields");
40+
});
41+
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { ParsedExpandPlan } from "../../interfaces/AppStore";
2+
import { bracedSplit, hashNumbersRemover } from "./PlanParserUtils";
3+
4+
export function parseExpand(input: string): ParsedExpandPlan {
5+
// Clean input by removing hash numbers
6+
const cleanInput = hashNumbersRemover(input);
7+
8+
// Parse the Expand plan: "Expand [[ss_customer_sk#2, null, null, null, 1], [null, ss_item_sk#1, null, null, 2], [null, null, ss_store_sk#6, null, 3], [null, null, null, ss_promo_sk#7, 4]], [ss_customer_sk#60, ss_item_sk#61, ss_store_sk#62, ss_promo_sk#63, gid#59]"
9+
// We need to extract the output fields from the final bracketed section after the nested arrays
10+
11+
// Look for the pattern: ]], [fields...]
12+
const finalBracketRegex = /\]\],\s*\[([^\]]+)\]$/;
13+
const match = cleanInput.match(finalBracketRegex);
14+
15+
if (!match) {
16+
throw new Error("Invalid Expand plan format: could not find output fields");
17+
}
18+
19+
const fieldsStr = match[1];
20+
21+
// Split the fields by comma and clean them up
22+
const allFields = bracedSplit(fieldsStr)
23+
.map(field => field.trim())
24+
.filter(field => field.length > 0);
25+
26+
if (allFields.length === 0) {
27+
throw new Error("Invalid Expand plan format: no output fields found");
28+
}
29+
30+
// Extract the last field as idField and remove it from the fields array
31+
const idField = allFields[allFields.length - 1];
32+
const fields = allFields.slice(0, -1);
33+
34+
return {
35+
fields: fields,
36+
idField: idField
37+
};
38+
}

spark-ui/src/reducers/SqlReducer.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { findLastNodeWithInputRows, generateGraph, getRowsFromMetrics } from "./
2626
import { parseCoalesce } from "./PlanParsers/CoalesceParser";
2727
import { parseCollectLimit } from "./PlanParsers/CollectLimitParser";
2828
import { parseExchange } from "./PlanParsers/ExchangeParser";
29+
import { parseExpand } from "./PlanParsers/ExpandParser";
2930
import { parseFilter } from "./PlanParsers/FilterParser";
3031
import { parseGenerate } from "./PlanParsers/GenerateParser";
3132
import { parseJoin } from "./PlanParsers/JoinParser";
@@ -168,6 +169,11 @@ export function parseNodePlan(
168169
type: "Generate",
169170
plan: parseGenerate(plan.planDescription),
170171
};
172+
case "Expand":
173+
return {
174+
type: "Expand",
175+
plan: parseExpand(plan.planDescription),
176+
};
171177
}
172178
if (node.nodeName.includes("Scan")) {
173179
return {
@@ -733,6 +739,36 @@ function addGenerateMetrics(
733739
return null;
734740
}
735741

742+
function addExpandMetrics(
743+
node: EnrichedSqlNode,
744+
updatedMetrics: EnrichedSqlMetric[],
745+
graph: Graph,
746+
allNodes: EnrichedSqlNode[],
747+
): EnrichedSqlMetric | null {
748+
if (node.nodeName === "Expand") {
749+
const inputNode = findLastNodeWithInputRows(node, graph, allNodes);
750+
if (!inputNode) {
751+
return null;
752+
}
753+
754+
const inputRows = getRowsFromMetrics(inputNode.metrics);
755+
if (inputRows === null || inputRows === 0) {
756+
return null;
757+
}
758+
759+
const outputRows = getRowsFromMetrics(updatedMetrics);
760+
if (outputRows === null) {
761+
return null;
762+
}
763+
764+
const ratio = outputRows / inputRows;
765+
const ratioFormatted = ratio.toFixed(2);
766+
767+
return { name: `Expand Ratio`, value: `${ratioFormatted}X` };
768+
}
769+
return null;
770+
}
771+
736772
function updateNodeMetrics(
737773
node: EnrichedSqlNode,
738774
metrics: EnrichedSqlMetric[],
@@ -744,6 +780,7 @@ function updateNodeMetrics(
744780
const crossJoinFilterRatio = addCrossJoinFilterRatioMetric(node, updatedOriginalMetrics, graph, allNodes);
745781
const joinMetrics = addJoinMetrics(node, updatedOriginalMetrics, graph, allNodes);
746782
const generateMetrics = addGenerateMetrics(node, updatedOriginalMetrics, graph, allNodes);
783+
const expandMetrics = addExpandMetrics(node, updatedOriginalMetrics, graph, allNodes);
747784
return [
748785
...updatedOriginalMetrics,
749786
...(filterRatio !== null
@@ -758,6 +795,9 @@ function updateNodeMetrics(
758795
...(generateMetrics !== null
759796
? [generateMetrics]
760797
: []),
798+
...(expandMetrics !== null
799+
? [expandMetrics]
800+
: []),
761801
];
762802
}
763803

spark-ui/src/reducers/SqlReducerUtils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ const nodeTypeDict: Record<string, NodeType> = {
157157
FlatMapGroupsInPandas: "transformation",
158158
BatchEvalPython: "transformation",
159159
Generate: "transformation",
160+
Expand: "transformation",
160161
};
161162

162163
const nodeRenamerDict: Record<string, string> = {
@@ -225,6 +226,7 @@ const nodeRenamerDict: Record<string, string> = {
225226
ArrowEvalPython: "Select (with Arrow)",
226227
FlatMapGroupsInPandas: "Select Flat (with Pandas)",
227228
BatchEvalPython: "Run Python UDF",
229+
Expand: "Expand",
228230
};
229231

230232
export function extractTotalFromStatisticsMetric(
@@ -280,6 +282,8 @@ export function nodeEnrichedNameBuilder(
280282
return plan.plan.operation;
281283
}
282284
return "Generate";
285+
case "Expand":
286+
return "Expand";
283287
case "Exchange":
284288
if (plan.plan.isBroadcast) {
285289
return "Broadcast";

0 commit comments

Comments
 (0)