Skip to content

Commit e083d5d

Browse files
author
Yu Zhang
committed
fix(query) fix the absent function for multi-partition namespace.
1 parent 9896c30 commit e083d5d

File tree

6 files changed

+60
-5
lines changed

6 files changed

+60
-5
lines changed

coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package filodb.coordinator.queryplanner
22

3-
43
import java.util.concurrent.ThreadLocalRandom
54

65
import akka.serialization.SerializationExtension

coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import filodb.core.query.Filter.{Equals, EqualsRegex}
1919
import filodb.grpc.GrpcCommonUtils
2020
import filodb.query._
2121
import filodb.query.LogicalPlan._
22+
import filodb.query.RangeFunctionId.AbsentOverTime
2223
import filodb.query.exec._
2324

2425
//scalastyle:off file.size.limit
@@ -517,6 +518,22 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
517518
materializeForAggregateAssignment(aggregate, assignment, queryContext, timeRangeOverride)
518519
case binaryJoin: BinaryJoin =>
519520
materializeForBinaryJoinAssignment(binaryJoin, assignment, queryContext, timeRangeOverride)
521+
case psw: PeriodicSeriesWithWindowing if psw.function == AbsentOverTime =>
522+
val plans = proportionMap.map(entry => {
523+
val partitionDetails = entry._2
524+
materializeForPartition(logicalPlan, partitionDetails.partitionName,
525+
partitionDetails.grpcEndPoint, partitionDetails.httpEndPoint, queryContext, timeRangeOverride)
526+
}).toSeq
527+
val dispatcher = PlannerUtil.pickDispatcher(plans)
528+
// 0 present 1 absent => 01/10/00 are present. 11 is absent.
529+
val reducer = MultiPartitionReduceAggregateExec(queryContext, dispatcher,
530+
plans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]), AggregationOperator.Absent, Nil)
531+
if (!queryContext.plannerParams.skipAggregatePresent) {
532+
val promQlQueryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
533+
reducer.addRangeVectorTransformer(AggregatePresenter(AggregationOperator.Absent, Nil,
534+
RangeParams(promQlQueryParams.startSecs, promQlQueryParams.stepSecs, promQlQueryParams.endSecs)))
535+
}
536+
reducer
520537
case _ =>
521538
val plans = proportionMap.map(entry => {
522539
val partitionDetails = entry._2
@@ -698,11 +715,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
698715
* @param logicalPlan the logic plan.
699716
* @return true if the binary join or aggregation has clauses.
700717
*/
701-
private def hasJoinClause(logicalPlan: LogicalPlan): Boolean = {
718+
private def hasJoinOrAggClause(logicalPlan: LogicalPlan): Boolean = {
702719
logicalPlan match {
703720
case binaryJoin: BinaryJoin => binaryJoin.on.nonEmpty || binaryJoin.ignoring.nonEmpty
704-
case aggregate: Aggregate => hasJoinClause(aggregate.vectors)
705-
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasJoinClause)
721+
case aggregate: Aggregate => hasJoinOrAggClause(aggregate.vectors)
722+
// AbsentOverTime is a special case that is converted to aggregation.
723+
case psw: PeriodicSeriesWithWindowing if psw.function == AbsentOverTime => true
724+
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasJoinOrAggClause)
706725
case _ => false
707726
}
708727
}
@@ -714,7 +733,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
714733
val tschemaLabels = getTschemaLabelsIfCanPushdown(aggregate.vectors, queryContext)
715734
// TODO have a more accurate pushdown after location rule is define.
716735
// Right now do not push down any multi-partition namespace plans when on clause exists.
717-
val canPushdown = !(hasMultiPartitionNamespace && hasJoinClause(aggregate)) &&
736+
val canPushdown = !(hasMultiPartitionNamespace && hasJoinOrAggClause(aggregate)) &&
718737
canPushdownAggregate(aggregate, tschemaLabels, queryContext)
719738
val plan = if (!canPushdown) {
720739
val childPlanRes = walkLogicalPlanTree(aggregate.vectors, queryContext.copy(

coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,6 +1434,7 @@ object ProtoConverters {
14341434
case AggregationOperator.Stdvar => GrpcMultiPartitionQueryService.AggregationOperator.STDVAR
14351435
case AggregationOperator.Quantile => GrpcMultiPartitionQueryService.AggregationOperator.QUANTILE
14361436
case AggregationOperator.Max => GrpcMultiPartitionQueryService.AggregationOperator.MAX
1437+
case AggregationOperator.Absent => throw new UnsupportedOperationException("Absent is not supported")
14371438
}
14381439
}
14391440
}

query/src/main/scala/filodb/query/PlanEnums.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ object AggregationOperator extends Enum[AggregationOperator] {
110110
case object BottomK extends AggregationOperator("bottomk")
111111
case object CountValues extends AggregationOperator("count_values")
112112
case object Quantile extends AggregationOperator("quantile")
113+
case object Absent extends AggregationOperator("absent")
113114
}
114115

115116
sealed abstract class BinaryOperator extends EnumEntry {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package filodb.query.exec.aggregator
2+
3+
import filodb.core.query._
4+
import filodb.memory.format.RowReader
5+
6+
/**
7+
* Map: Every sample is mapped to itself
8+
* ReduceMappedRow: Same as ReduceAggregate since every row is mapped into an aggregate
9+
* ReduceAggregate: Accumulator maintains the min. Reduction happens by choosing one of currentMin, or the value.
10+
* Present: The min is directly presented
11+
*/
12+
object AbsentRowAggregator extends RowAggregator {
13+
class AbsentHolder(var timestamp: Long = 0L, var value: Double = 1.0) extends AggregateHolder {
14+
val row = new TransientRow()
15+
def toRowReader: MutableRowReader = { row.setValues(timestamp, value); row }
16+
def resetToZero(): Unit = value = 1.0
17+
}
18+
type AggHolderType = AbsentHolder
19+
def zero: AbsentHolder = new AbsentHolder()
20+
def newRowToMapInto: MutableRowReader = new TransientRow()
21+
def map(rvk: RangeVectorKey, item: RowReader, mapInto: MutableRowReader): RowReader = item
22+
def reduceAggregate(acc: AbsentHolder, aggRes: RowReader): AbsentHolder = {
23+
acc.timestamp = aggRes.getLong(0)
24+
// NaN means the time series present. 1.0 means absent.
25+
if (aggRes.getDouble(1).isNaN) {
26+
acc.value = Double.NaN
27+
}
28+
acc
29+
}
30+
def present(aggRangeVector: RangeVector, limit: Int,
31+
rangeParams: RangeParams, queryStats: QueryStats): Seq[RangeVector] = Seq(aggRangeVector)
32+
def reductionSchema(source: ResultSchema): ResultSchema = source
33+
def presentationSchema(reductionSchema: ResultSchema): ResultSchema = reductionSchema
34+
}

query/src/main/scala/filodb/query/exec/aggregator/RowAggregator.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ object RowAggregator {
123123
def apply(aggrOp: AggregationOperator, params: Seq[Any], schema: ResultSchema): RowAggregator = {
124124
val valColType = ResultSchema.valueColumnType(schema)
125125
aggrOp match {
126+
case Absent if valColType != ColumnType.HistogramColumn => AbsentRowAggregator
126127
case Min if valColType != ColumnType.HistogramColumn => MinRowAggregator
127128
case Max if valColType != ColumnType.HistogramColumn => MaxRowAggregator
128129
case Sum if valColType == ColumnType.DoubleColumn => SumRowAggregator

0 commit comments

Comments
 (0)