Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,13 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
*/
def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
val normalized = QueryExecution.normalize(spark, plan)
recacheByCondition(spark, _.plan.exists(_.sameResult(normalized)))
// Only use fresh plan for entries that directly match it (not dependent entries like views)
val directMatcher = (cachedPlan: LogicalPlan) => normalized.sameResult(cachedPlan)
recacheByCondition(
spark,
cd => cd.plan.exists(_.sameResult(normalized)),
freshPlan = Some(normalized),
directMatchCondition = Some(directMatcher))
}

/**
Expand All @@ -331,15 +337,55 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
def shouldInvalidate(entry: CachedData): Boolean = {
entry.plan.exists(isMatchedTableOrView(_, name, spark.sessionState.conf, includeTimeTravel))
}
recacheByCondition(spark, shouldInvalidate)

// For V2 tables, resolve a fresh plan to get updated table metadata
val freshPlanOpt = resolveFreshPlan(spark, name)

val directMatchCondition = freshPlanOpt.map { _ =>
val resolver = spark.sessionState.conf.resolver
def isSameName(nameInCache: Seq[String]): Boolean = {
nameInCache.length == name.length && nameInCache.zip(name).forall(resolver.tupled)
}

(plan: LogicalPlan) => EliminateSubqueryAliases(plan) match {
case r: DataSourceV2Relation if r.catalog.isDefined && r.identifier.isDefined =>
val nameInCache = r.identifier.get.toQualifiedNameParts(r.catalog.get)
isSameName(nameInCache)
case r: LogicalRelation if r.catalogTable.isDefined =>
isSameName(r.catalogTable.get.identifier.nameParts)
case h: HiveTableRelation =>
isSameName(h.tableMeta.identifier.nameParts)
case _ => false
}
}

recacheByCondition(spark, shouldInvalidate, freshPlanOpt, directMatchCondition)
}

private def resolveFreshPlan(spark: SparkSession, name: Seq[String]): Option[LogicalPlan] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
try {
val tableName = name.quoted
val relation = spark.table(tableName).queryExecution.analyzed
Some(QueryExecution.normalize(spark, relation))
} catch {
case _: Exception => None
}
}

/**
* Re-caches all the cache entries that satisfies the given `condition`.
* @param spark SparkSession
* @param condition Condition to filter cache entries to recache
* @param freshPlan Optional fresh plan to use for re-execution and as the new cached plan.
* @param directMatchCondition Optional condition to determine if an entry should use the fresh
* plan. If None, all matched entries use the fresh plan.
*/
private def recacheByCondition(
spark: SparkSession,
condition: CachedData => Boolean): Unit = {
condition: CachedData => Boolean,
freshPlan: Option[LogicalPlan] = None,
directMatchCondition: Option[LogicalPlan => Boolean] = None): Unit = {
val needToRecache = cachedData.filter(condition)
this.synchronized {
// Remove the cache entry before creating a new ones.
Expand All @@ -348,11 +394,18 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)

// Use fresh plan if: (1) freshPlan provided, AND (2) no directMatchCondition OR it matches
val useFresh = freshPlan.isDefined && directMatchCondition.forall(_(cd.plan))

val newCache = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(cd.plan)
val planToExecute = if (useFresh) freshPlan.get else cd.plan
val qe = sessionWithConfigsOff.sessionState.executePlan(planToExecute)
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
}
val recomputedPlan = cd.copy(cachedRepresentation = newCache)
val recomputedPlan = cd.copy(
plan = if (useFresh) freshPlan.get else cd.plan,
cachedRepresentation = newCache)
this.synchronized {
if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) {
logWarning("While recaching, data was already added to cache.")
Expand Down
Loading