-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54172][SQL] Merge Into Schema Evolution should only add referenced columns #52866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
3daaf68
24b1a51
abbeb1e
1461265
451da3e
cee88a2
f23a985
ea00e3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical | |
|
|
||
| import org.apache.spark.{SparkIllegalArgumentException, SparkUnsupportedOperationException} | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, ResolvedProcedure, TypeCheckResult, UnresolvedException, UnresolvedProcedure, ViewSchemaMode} | ||
| import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, ResolvedProcedure, TypeCheckResult, UnresolvedAttribute, UnresolvedException, UnresolvedProcedure, ViewSchemaMode} | ||
| import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} | ||
| import org.apache.spark.sql.catalyst.catalog.{FunctionResource, RoutineLanguage} | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
|
|
@@ -860,7 +860,8 @@ case class MergeIntoTable( | |
| matchedActions: Seq[MergeAction], | ||
| notMatchedActions: Seq[MergeAction], | ||
| notMatchedBySourceActions: Seq[MergeAction], | ||
| withSchemaEvolution: Boolean) extends BinaryCommand with SupportsSubquery { | ||
| withSchemaEvolution: Boolean) | ||
| extends BinaryCommand with SupportsSubquery { | ||
|
|
||
| lazy val aligned: Boolean = { | ||
| val actions = matchedActions ++ notMatchedActions ++ notMatchedBySourceActions | ||
|
|
@@ -892,9 +893,13 @@ case class MergeIntoTable( | |
| case _ => false | ||
| } | ||
|
|
||
| lazy val needSchemaEvolution: Boolean = | ||
| private lazy val sourceSchemaForEvolution: StructType = | ||
| MergeIntoTable.sourceSchemaForSchemaEvolution(this) | ||
|
|
||
| lazy val needSchemaEvolution: Boolean = { | ||
|
||
| schemaEvolutionEnabled && | ||
| MergeIntoTable.schemaChanges(targetTable.schema, sourceTable.schema).nonEmpty | ||
| MergeIntoTable.schemaChanges(targetTable.schema, sourceSchemaForEvolution).nonEmpty | ||
| } | ||
|
|
||
| private def schemaEvolutionEnabled: Boolean = withSchemaEvolution && { | ||
| EliminateSubqueryAliases(targetTable) match { | ||
|
|
@@ -911,6 +916,7 @@ case class MergeIntoTable( | |
| } | ||
|
|
||
| object MergeIntoTable { | ||
|
|
||
| def getWritePrivileges( | ||
| matchedActions: Iterable[MergeAction], | ||
| notMatchedActions: Iterable[MergeAction], | ||
|
|
@@ -990,6 +996,79 @@ object MergeIntoTable { | |
| CaseInsensitiveMap(fieldMap) | ||
| } | ||
| } | ||
|
|
||
| // A pruned version of source schema that only contains columns/nested fields | ||
| // explicitly and directly assigned to a target counterpart in MERGE INTO actions. | ||
| // New columns/nested fields not existing in target will be added for schema evolution. | ||
| def sourceSchemaForSchemaEvolution(merge: MergeIntoTable): StructType = { | ||
|
|
||
| val actions = merge.matchedActions ++ merge.notMatchedActions | ||
| val assignments = actions.collect { | ||
| case a: UpdateAction => a.assignments | ||
| case a: InsertAction => a.assignments | ||
| }.flatten | ||
|
|
||
| val containsStarAction = actions.exists { | ||
| case _: UpdateStarAction => true | ||
| case _: InsertStarAction => true | ||
| case _ => false | ||
| } | ||
|
|
||
| def filterSchema(sourceSchema: StructType, basePath: Seq[String]): StructType = | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| StructType(sourceSchema.flatMap { field => | ||
| val fieldPath = basePath :+ field.name | ||
|
|
||
| field.dataType match { | ||
| // Specifically assigned to in one clause: | ||
| // always keep, including all nested attributes | ||
| case _ if assignments.exists(isEqual(_, fieldPath)) => Some(field) | ||
| // If this is a struct and one of the children is being assigned to in a merge clause, | ||
| // keep it and continue filtering children. | ||
| case struct: StructType if assignments.exists(assign => | ||
| isPrefix(fieldPath, extractFieldPath(assign.key))) => | ||
| Some(field.copy(dataType = filterSchema(struct, fieldPath))) | ||
| // The field isn't assigned to directly or indirectly (i.e. its children) in any non-* | ||
| // clause. Check if it should be kept with any * action. | ||
| case struct: StructType if containsStarAction => | ||
| Some(field.copy(dataType = filterSchema(struct, fieldPath))) | ||
| case _ if containsStarAction => Some(field) | ||
| // The field and its children are not assigned to in any * or non-* action, drop it. | ||
| case _ => None | ||
| } | ||
| }) | ||
|
|
||
| filterSchema(merge.sourceTable.schema, Seq.empty) | ||
| } | ||
|
|
||
| // Helper method to extract field path from an Expression. | ||
| private def extractFieldPath(expr: Expression): Seq[String] = expr match { | ||
| case UnresolvedAttribute(nameParts) => nameParts | ||
| case a: AttributeReference => Seq(a.name) | ||
| case GetStructField(child, ordinal, nameOpt) => | ||
| extractFieldPath(child) :+ nameOpt.getOrElse(s"col$ordinal") | ||
| case _ => Seq.empty | ||
| } | ||
|
|
||
| // Helper method to check if a given field path is a prefix of another path. | ||
| private def isPrefix(prefix: Seq[String], path: Seq[String]): Boolean = | ||
| prefix.length <= path.length && prefix.zip(path).forall { | ||
| case (prefixNamePart, pathNamePart) => | ||
| SQLConf.get.resolver(prefixNamePart, pathNamePart) | ||
| } | ||
|
|
||
| // Helper method to check if a given field path is a suffix of another path. | ||
| private def isSuffix(suffix: Seq[String], path: Seq[String]): Boolean = | ||
| isPrefix(suffix.reverse, path.reverse) | ||
|
|
||
| // Helper method to check if an assignment key is equal to a source column | ||
| // and if the assignment value is the corresponding source column directly | ||
| private def isEqual(assignment: Assignment, path: Seq[String]): Boolean = { | ||
| val assignmenKeyExpr = extractFieldPath(assignment.key) | ||
| val assignmentValueExpr = extractFieldPath(assignment.value) | ||
| // Valid assignments are: col = s.col or col.nestedField = s.col.nestedField | ||
| assignmenKeyExpr.length == path.length && isPrefix(assignmenKeyExpr, path) && | ||
| isSuffix(path, assignmentValueExpr) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this only to skip the source table qualifier? it seems wrong to trigger schema evolution for |
||
| } | ||
| } | ||
|
|
||
| sealed abstract class MergeAction extends Expression with Unevaluable { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.