Skip to content

Commit 806680f

Browse files
committed
Creating static query execution json parser
1 parent c3da148 commit 806680f

File tree

2 files changed

+275
-47
lines changed

2 files changed

+275
-47
lines changed

integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/StaticQueryExecutionParser.java

Lines changed: 215 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import java.io.File;
1616
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.net.URI;
1719
import java.time.ZonedDateTime;
1820
import java.util.*;
1921
import java.util.stream.Collectors;
@@ -30,11 +32,38 @@ public class StaticQueryExecutionParser {
3032
private final Map<String, String> datasetIdCache;
3133

3234
public StaticQueryExecutionParser() {
33-
this.openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
35+
// Create producer URI without relying on static initialization
36+
URI producerUri = createProducerUri();
37+
this.openLineage = new OpenLineage(producerUri);
3438
this.objectMapper = new ObjectMapper();
3539
this.datasetIdCache = new HashMap<>();
3640
}
3741

42+
private URI createProducerUri() {
43+
try {
44+
String version = getVersionSafely();
45+
return URI.create(
46+
String.format("https://github.com/OpenLineage/OpenLineage/tree/%s/integration/spark", version));
47+
} catch (Exception e) {
48+
log.warn("Failed to load version from properties, using default URI: {}", e.getMessage());
49+
return URI.create("https://github.com/OpenLineage/OpenLineage/tree/main/integration/spark");
50+
}
51+
}
52+
53+
private String getVersionSafely() {
54+
try {
55+
Properties properties = new Properties();
56+
InputStream is = this.getClass().getResourceAsStream("/version.properties");
57+
if (is != null) {
58+
properties.load(is);
59+
return properties.getProperty("version", "main");
60+
}
61+
} catch (Exception e) {
62+
log.debug("Could not load version properties: {}", e.getMessage());
63+
}
64+
return "main";
65+
}
66+
3867
/**
3968
* Main entry point to parse a JSON execution plan file and generate OpenLineage event
4069
*/
@@ -300,13 +329,10 @@ private OpenLineage.InputDataset createMockInputDataset() {
300329
* Extract output dataset from the execution plan
301330
*/
302331
private OpenLineage.OutputDataset extractOutputDataset(ExecutionPlanContext context, String jobName) {
303-
PlanNode rootNode = context.getRootNode();
304-
if (rootNode == null) {
305-
throw new IllegalStateException("No root node found in execution plan");
306-
}
307-
308332
String outputId = jobName + "_output";
309-
OpenLineage.SchemaDatasetFacet schema = extractSchemaFromNode(rootNode.getNode());
333+
334+
// Extract schema from the final output (first node in array-based plans)
335+
OpenLineage.SchemaDatasetFacet schema = extractOutputSchema(context);
310336

311337
return openLineage.newOutputDatasetBuilder()
312338
.namespace("memory://dataframes")
@@ -324,16 +350,13 @@ private OpenLineage.ColumnLineageDatasetFacet buildColumnLineage(ExecutionPlanCo
324350
OpenLineage.ColumnLineageDatasetFacetFieldsBuilder fieldsBuilder =
325351
openLineage.newColumnLineageDatasetFacetFieldsBuilder();
326352

327-
PlanNode rootNode = context.getRootNode();
328-
if (rootNode == null) return null;
329-
330-
// Extract output columns
331-
List<String> outputColumns = extractColumnNames(rootNode.getNode());
353+
// Extract output columns from the final transformation (first node)
354+
List<String> outputColumns = extractOutputColumnNames(context);
332355

333356
// For each output column, trace back to input columns
334357
for (String outputColumn : outputColumns) {
335-
OpenLineage.ColumnLineageDatasetFacetFieldsAdditional field = traceColumnLineage(
336-
outputColumn, rootNode, context);
358+
OpenLineage.ColumnLineageDatasetFacetFieldsAdditional field = traceColumnLineageImproved(
359+
outputColumn, context);
337360
if (field != null) {
338361
fieldsBuilder.put(outputColumn, field);
339362
}
@@ -350,7 +373,91 @@ private OpenLineage.ColumnLineageDatasetFacet buildColumnLineage(ExecutionPlanCo
350373
}
351374

352375
/**
353-
* Trace lineage for a specific column
376+
* Improved column lineage tracing for a specific column
377+
*/
378+
private OpenLineage.ColumnLineageDatasetFacetFieldsAdditional traceColumnLineageImproved(
379+
String columnName, ExecutionPlanContext context) {
380+
381+
List<OpenLineage.InputField> inputFields = new ArrayList<>();
382+
Set<String> processedDatasets = new HashSet<>();
383+
384+
// Trace through input sources to find columns with matching names
385+
for (PlanNode inputSource : context.getInputSources()) {
386+
String datasetId = generateDatasetId(inputSource);
387+
388+
// Avoid duplicates for the same dataset
389+
if (processedDatasets.contains(datasetId)) {
390+
continue;
391+
}
392+
processedDatasets.add(datasetId);
393+
394+
List<String> inputColumns = extractColumnNames(inputSource.getNode());
395+
if (inputColumns.contains(columnName)) {
396+
OpenLineage.InputField inputField =
397+
openLineage.newInputFieldBuilder()
398+
.namespace("memory://dataframes")
399+
.name(datasetId)
400+
.field(columnName)
401+
.build();
402+
403+
inputFields.add(inputField);
404+
}
405+
}
406+
407+
// If no direct matches found, try to trace through transformations
408+
if (inputFields.isEmpty()) {
409+
inputFields = traceColumnThroughTransformations(columnName, context);
410+
}
411+
412+
if (inputFields.isEmpty()) {
413+
return null;
414+
}
415+
416+
return openLineage.newColumnLineageDatasetFacetFieldsAdditionalBuilder()
417+
.inputFields(inputFields)
418+
.transformationDescription("Column lineage traced through Spark execution plan")
419+
.transformationType(inputFields.size() == 1 ? "DIRECT" : "INDIRECT")
420+
.build();
421+
}
422+
423+
/**
424+
* Trace column lineage through transformations when direct matching fails
425+
*/
426+
private List<OpenLineage.InputField> traceColumnThroughTransformations(String columnName, ExecutionPlanContext context) {
427+
List<OpenLineage.InputField> inputFields = new ArrayList<>();
428+
Set<String> processedDatasets = new HashSet<>();
429+
430+
// For complex transformations, fall back to mapping all input columns
431+
// This is a simplified approach - in practice, you'd analyze the transformation logic
432+
for (PlanNode inputSource : context.getInputSources()) {
433+
String datasetId = generateDatasetId(inputSource);
434+
435+
if (processedDatasets.contains(datasetId)) {
436+
continue;
437+
}
438+
processedDatasets.add(datasetId);
439+
440+
List<String> inputColumns = extractColumnNames(inputSource.getNode());
441+
if (!inputColumns.isEmpty()) {
442+
// Use the first column as a representative (simplified approach)
443+
String firstColumn = inputColumns.get(0);
444+
445+
OpenLineage.InputField inputField =
446+
openLineage.newInputFieldBuilder()
447+
.namespace("memory://dataframes")
448+
.name(datasetId)
449+
.field(firstColumn)
450+
.build();
451+
452+
inputFields.add(inputField);
453+
}
454+
}
455+
456+
return inputFields;
457+
}
458+
459+
/**
460+
* Trace lineage for a specific column (legacy method)
354461
*/
355462
private OpenLineage.ColumnLineageDatasetFacetFieldsAdditional traceColumnLineage(
356463
String columnName, PlanNode node, ExecutionPlanContext context) {
@@ -385,6 +492,26 @@ private OpenLineage.ColumnLineageDatasetFacetFieldsAdditional traceColumnLineage
385492
.build();
386493
}
387494

495+
/**
496+
* Extract output schema from the execution plan context
497+
*/
498+
private OpenLineage.SchemaDatasetFacet extractOutputSchema(ExecutionPlanContext context) {
499+
List<OpenLineage.SchemaDatasetFacetFields> fields = new ArrayList<>();
500+
501+
// For array-based plans, extract schema from the first node (final output)
502+
if (context.getAllNodesArray() != null && !context.getAllNodesArray().isEmpty()) {
503+
JsonNode outputNode = context.getAllNodesArray().get(0);
504+
extractSchemaFromProjectList(outputNode, fields);
505+
} else if (context.getRootNode() != null) {
506+
// Fallback to root node
507+
extractSchemaFromOutput(context.getRootNode().getNode(), fields);
508+
}
509+
510+
return openLineage.newSchemaDatasetFacetBuilder()
511+
.fields(fields)
512+
.build();
513+
}
514+
388515
/**
389516
* Extract schema from a plan node
390517
*/
@@ -403,6 +530,25 @@ private OpenLineage.SchemaDatasetFacet extractSchemaFromNode(JsonNode node) {
403530
.build();
404531
}
405532

533+
/**
534+
* Extract schema from projectList field (for Project nodes)
535+
*/
536+
private void extractSchemaFromProjectList(JsonNode node, List<OpenLineage.SchemaDatasetFacetFields> fields) {
537+
JsonNode projectList = node.path("projectList");
538+
if (projectList.isArray()) {
539+
for (JsonNode projectionArray : projectList) {
540+
if (projectionArray.isArray() && projectionArray.size() > 0) {
541+
// Each projection is an array, get the first element which describes the output column
542+
JsonNode columnNode = projectionArray.get(0);
543+
OpenLineage.SchemaDatasetFacetFields field = extractFieldFromAttributeReference(columnNode);
544+
if (field != null) {
545+
fields.add(field);
546+
}
547+
}
548+
}
549+
}
550+
}
551+
406552
/**
407553
* Extract schema from output field
408554
*/
@@ -412,13 +558,13 @@ private void extractSchemaFromOutput(JsonNode node, List<OpenLineage.SchemaDatas
412558
for (JsonNode outputItem : output) {
413559
if (outputItem.isArray()) {
414560
for (JsonNode fieldNode : outputItem) {
415-
OpenLineage.SchemaDatasetFacetFields field = extractFieldFromNode(fieldNode);
561+
OpenLineage.SchemaDatasetFacetFields field = extractFieldFromAttributeReference(fieldNode);
416562
if (field != null) {
417563
fields.add(field);
418564
}
419565
}
420566
} else {
421-
OpenLineage.SchemaDatasetFacetFields field = extractFieldFromNode(outputItem);
567+
OpenLineage.SchemaDatasetFacetFields field = extractFieldFromAttributeReference(outputItem);
422568
if (field != null) {
423569
fields.add(field);
424570
}
@@ -434,7 +580,7 @@ private void extractSchemaFromAttributes(JsonNode node, List<OpenLineage.SchemaD
434580
JsonNode attributes = node.path("attributes");
435581
if (attributes.isArray()) {
436582
for (JsonNode attr : attributes) {
437-
OpenLineage.SchemaDatasetFacetFields field = extractFieldFromNode(attr);
583+
OpenLineage.SchemaDatasetFacetFields field = extractFieldFromAttributeReference(attr);
438584
if (field != null) {
439585
fields.add(field);
440586
}
@@ -443,20 +589,23 @@ private void extractSchemaFromAttributes(JsonNode node, List<OpenLineage.SchemaD
443589
}
444590

445591
/**
446-
* Extract field information from a node
592+
* Extract field information from an AttributeReference node
447593
*/
448-
private OpenLineage.SchemaDatasetFacetFields extractFieldFromNode(JsonNode fieldNode) {
594+
private OpenLineage.SchemaDatasetFacetFields extractFieldFromAttributeReference(JsonNode fieldNode) {
595+
// Handle both direct AttributeReference and Alias nodes
449596
String name = fieldNode.path("name").asText();
450597
String dataType = fieldNode.path("dataType").asText();
451-
452-
// Handle different dataType structures
453-
if (dataType.isEmpty()) {
454-
JsonNode dataTypeNode = fieldNode.path("dataType");
455-
if (dataTypeNode.isObject()) {
456-
dataType = dataTypeNode.path("type").asText();
457-
if (dataType.isEmpty()) {
458-
dataType = dataTypeNode.path("class").asText();
459-
}
598+
boolean nullable = fieldNode.path("nullable").asBoolean(true);
599+
600+
// Handle Alias nodes that wrap AttributeReference
601+
String nodeClass = fieldNode.path("class").asText();
602+
if (nodeClass.contains("Alias") && fieldNode.has("child")) {
603+
// For Alias nodes, use the alias name but get type from the child
604+
JsonNode childRef = fieldNode.path("child");
605+
if (childRef.isInt()) {
606+
// This is a reference to another node by index - we'll keep the alias name
607+
// but use a generic type for now
608+
dataType = "unknown";
460609
}
461610
}
462611

@@ -471,6 +620,43 @@ private OpenLineage.SchemaDatasetFacetFields extractFieldFromNode(JsonNode field
471620
.build();
472621
}
473622

623+
/**
624+
* Extract output column names from the execution plan context
625+
*/
626+
private List<String> extractOutputColumnNames(ExecutionPlanContext context) {
627+
List<String> columnNames = new ArrayList<>();
628+
629+
// For array-based plans, extract column names from the first node (final output)
630+
if (context.getAllNodesArray() != null && !context.getAllNodesArray().isEmpty()) {
631+
JsonNode outputNode = context.getAllNodesArray().get(0);
632+
extractColumnNamesFromProjectList(outputNode, columnNames);
633+
} else if (context.getRootNode() != null) {
634+
// Fallback to root node
635+
extractColumnNamesFromOutput(context.getRootNode().getNode(), columnNames);
636+
}
637+
638+
return columnNames;
639+
}
640+
641+
/**
642+
* Extract column names from projectList field
643+
*/
644+
private void extractColumnNamesFromProjectList(JsonNode node, List<String> columnNames) {
645+
JsonNode projectList = node.path("projectList");
646+
if (projectList.isArray()) {
647+
for (JsonNode projectionArray : projectList) {
648+
if (projectionArray.isArray() && projectionArray.size() > 0) {
649+
// Each projection is an array, get the first element which describes the output column
650+
JsonNode columnNode = projectionArray.get(0);
651+
String name = columnNode.path("name").asText();
652+
if (!name.isEmpty()) {
653+
columnNames.add(name);
654+
}
655+
}
656+
}
657+
}
658+
}
659+
474660
/**
475661
* Extract column names from a node
476662
*/

0 commit comments

Comments
 (0)