-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54172][SQL] Merge Into Schema Evolution should only add referenced columns #52866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-54172][SQL] Merge Into Schema Evolution should only add referenced columns #52866
Conversation
41731d2 to
6c6de51
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
|
@cloud-fan @aokolnychyi can you take a look? i think this is an important improvement to get in before we release MERGE INTO WITH SCHEMA EVOLUTION feature in Spark 4.1, thanks! |
6c6de51 to
24b1a51
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Show resolved
Hide resolved
| |USING source s | ||
| |ON t.pk = s.pk | ||
| |WHEN MATCHED THEN | ||
| | UPDATE SET dep='software' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is weird, dep is an existing column in the target table, and we for sure do not need to do schema evolution. What was the behavior before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh its because source table has more colunns but they are not used..
| |ON t.pk = s.pk | ||
| |WHEN NOT MATCHED THEN | ||
| | INSERT (pk, info, dep) VALUES (s.pk, | ||
| | named_struct('salary', s.info.salary, 'status', 'active'), 'marketing') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we trigger schema evolution for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discuss offline, refine the logic to be more selective (direct assignment to source column)
448bfdf to
8ecc4ad
Compare
…ignment where value is same name in source
8ecc4ad to
abbeb1e
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
| private lazy val sourceSchemaForEvolution: StructType = | ||
| MergeIntoTable.sourceSchemaForSchemaEvolution(this) | ||
|
|
||
| lazy val needSchemaEvolution: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the rule ResolveMergeIntoSchemaEvolution should be triggered as long as MergeIntoTable#schemaEvolutionEnabled is true. These complicated logic should be moved into ResolveMergeIntoSchemaEvolution and the rule returns the merge command unchanged if schema evolution is not needed.
To make ResolveMergeIntoSchemaEvolution more reliable about rule orders, we should wait for the merge assignment values to be resolved before entering the rule. At the beginning of the rule, resolve the merge assignment keys again to make sure rule order does not matter. We can stop earlier if the assignment values are not pure field reference and there is no star.
| val assignmentValueExpr = extractFieldPath(assignment.value) | ||
| // Valid assignments are: col = s.col or col.nestedField = s.col.nestedField | ||
| assignmenKeyExpr.length == path.length && isPrefix(assignmenKeyExpr, path) && | ||
| isSuffix(path, assignmentValueExpr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this only to skip the source table qualifier? it seems wrong to trigger schema evolution for col = wrong_table.col which should fail analysis without schema evolution.
What changes were proposed in this pull request?
Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation.
ie,
Why are the changes needed?
#51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement.
Does this PR introduce any user-facing change?
No, MERGE INTO schema evolution is not yet released in Spark 4.1.
How was this patch tested?
Added many unit tests in MergeIntoTableSuiteBase
Was this patch authored or co-authored using generative AI tooling?
No