Skip to content

Commit 8b4b875

Browse files
author
Yu Zhang
committed
add warning log for histogram value drop and add comments.
1 parent ca6ccbd commit 8b4b875

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

memory/src/main/scala/filodb.memory/format/NibblePack.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,15 @@ object NibblePack {
319319
val numElems = Math.min(numBuckets - i, 8)
320320
cforRange { 0 until numElems } { n =>
321321
if (data(n) < lastHistDeltas(i + n)) valueDropped = true
322+
// delta(n) represents the increase in the Nth bucket between measurements.
323+
// Each new delta should be greater than or equal to the previous one.
324+
// A decrease indicates a dropped or reset value, which is invalid.
325+
// Example of an invalid histogram due to a drop:
326+
// time1 = (0, 131, 131)
327+
// time2 = (433, 447, 447)
328+
// -> Bucket 1 increased by 131 at time1, but only by 14 at time2, violating the monotonicity rule.
329+
// Previous deltas: 0, 131, 0 // lastHistDeltas
330+
// Current deltas: 0, 14, 0 // data
322331
packArray(n) = data(n) - originalDeltas(i + n)
323332
}
324333
System.arraycopy(data, 0, lastHistDeltas, i, numElems)

memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ case class EmptyHistogramException(message: String) extends IllegalArgumentExcep
626626
* A reader for SectDelta encoded histograms, including correction/drop functionality
627627
*/
628628
class SectDeltaHistogramReader(acc2: MemoryReader, histVect: Ptr.U8)
629-
extends RowHistogramReader(acc2, histVect) with CounterHistogramReader {
629+
extends RowHistogramReader(acc2, histVect) with CounterHistogramReader with StrictLogging {
630630
// baseHist is section base histogram; summedHist used to compute base + delta or other sums
631631
private val summedHist = LongHistogram.empty(buckets)
632632
private val baseHist = summedHist.copy
@@ -683,11 +683,18 @@ class SectDeltaHistogramReader(acc2: MemoryReader, histVect: Ptr.U8)
683683
// code to go through and build a list of corrections and corresponding index values.. (dropIndex, correction)
684684
private lazy val corrections = {
685685
var index = 0
686+
var dropped = false
686687
// Step 1: build an iterator of (starting-index, section) for each section
687-
iterateSections.map { case (s) => val o = (index, s); index += s.numElements(acc); o }.collect {
688+
val correctionData = iterateSections.map { case (s) => val o = (index, s); index += s.numElements(acc); o }.collect {
688689
case (i, s) if i > 0 && s.sectionType(acc) == Section.TypeDrop =>
690+
dropped = true
689691
(i, apply(i - 1).asInstanceOf[LongHistogram].copy)
690692
}.toBuffer
693+
if (dropped) {
694+
logger.warn(s"detected counter reset in histogram correction=${corrections}\n" +
695+
s"allSections=${dumpAllSections}")
696+
}
697+
correctionData
691698
}
692699

693700
def dropPositions(accNotUsed: MemoryReader, vectorNotUsed: BinaryVectorPtr): debox.Buffer[Int] = {

0 commit comments

Comments
 (0)