Skip to content

Conversation

@szehon-ho
Copy link
Member

@szehon-ho szehon-ho commented Nov 4, 2025

What changes were proposed in this pull request?

Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation.

ie,

UPDATE SET new_col = source.new_col 
UPDATE SET struct.new_field = source.struct.new_field
INSERT (old_col, new_col) VALUES (s.old_col, s.new_col)

Why are the changes needed?

#51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement.

Does this PR introduce any user-facing change?

No, MERGE INTO schema evolution is not yet released in Spark 4.1.

How was this patch tested?

Added many unit tests in MergeIntoTableSuiteBase

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Nov 4, 2025
@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch 3 times, most recently from 41731d2 to 6c6de51 Compare November 4, 2025 20:02
@szehon-ho
Copy link
Member Author

@cloud-fan @aokolnychyi can you take a look? i think this is an important improvement to get in before we release MERGE INTO WITH SCHEMA EVOLUTION feature in Spark 4.1, thanks!

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 6c6de51 to 24b1a51 Compare November 4, 2025 20:06
|USING source s
|ON t.pk = s.pk
|WHEN MATCHED THEN
| UPDATE SET dep='software'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is weird, dep is an existing column in the target table, and we for sure do not need to do schema evolution. What was the behavior before this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh its because source table has more colunns but they are not used..

|ON t.pk = s.pk
|WHEN NOT MATCHED THEN
| INSERT (pk, info, dep) VALUES (s.pk,
| named_struct('salary', s.info.salary, 'status', 'active'), 'marketing')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we trigger schema evolution for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discuss offline, refine the logic to be more selective (direct assignment to source column)

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 448bfdf to 8ecc4ad Compare November 6, 2025 22:59
@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 8ecc4ad to abbeb1e Compare November 6, 2025 23:02
private lazy val sourceSchemaForEvolution: StructType =
MergeIntoTable.sourceSchemaForSchemaEvolution(this)

lazy val needSchemaEvolution: Boolean = {
Copy link
Contributor

@cloud-fan cloud-fan Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rule ResolveMergeIntoSchemaEvolution should be triggered as long as MergeIntoTable#schemaEvolutionEnabled is true. These complicated logic should be moved into ResolveMergeIntoSchemaEvolution and the rule returns the merge command unchanged if schema evolution is not needed.

To make ResolveMergeIntoSchemaEvolution more reliable about rule orders, we should wait for the merge assignment values to be resolved before entering the rule. At the beginning of the rule, resolve the merge assignment keys again to make sure rule order does not matter. We can stop earlier if the assignment values are not pure field reference and there is no star.

val assignmentValueExpr = extractFieldPath(assignment.value)
// Valid assignments are: col = s.col or col.nestedField = s.col.nestedField
assignmenKeyExpr.length == path.length && isPrefix(assignmenKeyExpr, path) &&
isSuffix(path, assignmentValueExpr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only to skip the source table qualifier? it seems wrong to trigger schema evolution for col = wrong_table.col which should fail analysis without schema evolution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants