-
Notifications
You must be signed in to change notification settings - Fork 237
Align metering with namespace quotas #2007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
4267bcf
49b8e45
f0c6bbd
e1f4ccc
a2947f1
9b789df
f0bfb11
6ed0b20
faef879
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,9 +29,10 @@ import filodb.query.exec.TsCardExec._ | |
| * @param coordActorProducer produces a single actor to ask a query. Actors are | ||
| * queried in the order they're returned from this function. | ||
| */ | ||
| case class TenantIngestionMetering(settings: FilodbSettings, | ||
| dsIterProducer: () => Iterator[DatasetRef], | ||
| coordActorProducer: () => ActorRef) extends StrictLogging{ | ||
| case class TenantIngestionMetering( | ||
| settings: FilodbSettings, | ||
| dsIterProducer: () => Iterator[DatasetRef], | ||
| coordActorProducer: () => ActorRef) extends StrictLogging { | ||
|
|
||
| private val ASK_TIMEOUT = FiniteDuration( | ||
| settings.config.getDuration("metering-query-interval").toSeconds, | ||
|
|
@@ -45,6 +46,10 @@ case class TenantIngestionMetering(settings: FilodbSettings, | |
| private val METRIC_ACTIVE = "tsdb_metering_active_timeseries" | ||
| private val METRIC_TOTAL = "tsdb_metering_total_timeseries" | ||
| private val METRIC_LONGTERM = "tsdb_metering_longterm_timeseries" | ||
| // Quota-aligned metrics | ||
| private val METRIC_SAMPLES_INGESTED = "tsdb_metering_samples_ingested_per_min" | ||
| private val METRIC_QUERY_BYTES_SCANNED = "tsdb_metering_query_samples_scanned_per_min" | ||
| private val METRIC_RETAINED_TIMESERIES = "tsdb_metering_retained_timeseries" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does "retained" mean? |
||
|
|
||
| def schedulePeriodicPublishJob() : Unit = { | ||
| // NOTE: the FiniteDuration overload of scheduleWithFixedDelay | ||
|
|
@@ -65,6 +70,63 @@ case class TenantIngestionMetering(settings: FilodbSettings, | |
| } | ||
| } | ||
|
|
||
| private def publishMetrics(dsRef: DatasetRef, ws: String, ns: String, data: RowData): Unit = { | ||
| val tags = Map( | ||
| "metric_ws" -> ws, | ||
| "metric_ns" -> ns, | ||
| "dataset" -> Option(dsRef.dataset).getOrElse("unknown"), | ||
| "cluster_type" -> CLUSTER_TYPE, | ||
| "_ws_" -> ws, | ||
| "_ns_" -> ns | ||
| ) | ||
|
|
||
| if (CLUSTER_TYPE == "downsample") { | ||
| Kamon.gauge(METRIC_LONGTERM).withTags(TagSet.from(tags)).update(data.counts.longTerm.toDouble) | ||
| } | ||
| else { | ||
| Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) | ||
| Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.shortTerm.toDouble) | ||
|
|
||
| // Update retained timeseries count - directly maps to active timeseries | ||
| Kamon.gauge(METRIC_RETAINED_TIMESERIES) | ||
| .withTags(TagSet.from(tags)) | ||
| .update(data.counts.active.toDouble) | ||
|
Comment on lines
+90
to
+93
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this the same thing as
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these names are from the design doc. Need Alex Goodwin to confirm. |
||
|
|
||
| // Update samples ingested per minute based on active timeseries and ingestion resolution | ||
| val ingestResolutionMillis = settings.config.getLong("ingest-resolution-millis") | ||
| val samplesPerMin = data.counts.active * (60000.0 / ingestResolutionMillis) | ||
| Kamon.gauge(METRIC_SAMPLES_INGESTED) | ||
| .withTags(TagSet.from(tags)) | ||
| .update(samplesPerMin) | ||
|
|
||
| // Update query bytes scanned per minute based on active timeseries and configured max data per shard | ||
| val maxDataPerShardQuery = settings.config.getBytes("max-data-per-shard-query").longValue() | ||
| val avgBytesPerTs = maxDataPerShardQuery / 1000000 // Convert to MB for better readability | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure about this. The name is bytes but the value is MB? |
||
| val MegabytesScannedPerMin = data.counts.active * avgBytesPerTs | ||
| Kamon.gauge(METRIC_QUERY_BYTES_SCANNED) | ||
| .withTags(TagSet.from(tags)) | ||
| .update(MegabytesScannedPerMin) | ||
|
|
||
| logger.debug(s"Published quota metrics for ws=$ws ns=$ns: " + | ||
| s"retained=${data.counts.active}, " + | ||
| s"samples_per_min=${samplesPerMin}, " + | ||
| s"bytes_scanned_per_min=${MegabytesScannedPerMin}") | ||
| } | ||
| } | ||
|
|
||
| private def handleQueryResponse(dsRef: DatasetRef, response: Any): Unit = response match { | ||
| case QueryResult(_, _, rv, _, _, _, _) => | ||
| rv.foreach(_.rows().foreach { rr => | ||
| val data = RowData.fromRowReader(rr) | ||
| val prefix = data.group.toString.split(PREFIX_DELIM) | ||
| val ws = prefix(0) | ||
| val ns = prefix(1) | ||
| publishMetrics(dsRef, ws, ns, data) | ||
| }) | ||
| case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) | ||
| case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + response) | ||
| } | ||
|
|
||
| /** | ||
| * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. | ||
| * Schedules a job to publish the Coordinator's response. | ||
|
|
@@ -82,29 +144,9 @@ case class TenantIngestionMetering(settings: FilodbSettings, | |
| ), | ||
| ASK_TIMEOUT) | ||
| fut.onComplete { | ||
| case Success(QueryResult(_, _, rv, _, _, _, _)) => | ||
| rv.foreach(_.rows().foreach{ rr => | ||
| // publish a cardinality metric for each namespace | ||
| val data = RowData.fromRowReader(rr) | ||
| val prefix = data.group.toString.split(PREFIX_DELIM) | ||
| val tags = Map("metric_ws" -> prefix(0), | ||
| "metric_ns" -> prefix(1), | ||
| "dataset" -> dsRef.dataset, | ||
| "cluster_type" -> CLUSTER_TYPE) | ||
|
|
||
| if (CLUSTER_TYPE == "downsample") { | ||
| Kamon.gauge(METRIC_LONGTERM).withTags(TagSet.from(tags)).update(data.counts.longTerm.toDouble) | ||
| } | ||
| else { | ||
| Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) | ||
| Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.shortTerm.toDouble) | ||
| } | ||
| }) | ||
| case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage) | ||
| case Success(result) => handleQueryResponse(dsRef, result) | ||
| case Failure(t) => logger.warn("Failure: " + t.getMessage) | ||
| // required to compile | ||
| case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| package filodb.coordinator | ||
|
|
||
| import scala.util.Success | ||
| import scala.concurrent.duration._ | ||
|
|
||
| import akka.actor.{Actor, ActorSystem, Props} | ||
| import akka.testkit.{TestKit, TestProbe} | ||
| import com.typesafe.config.{Config, ConfigFactory} | ||
| import kamon.Kamon | ||
| import kamon.tag.TagSet | ||
| import org.scalatest.BeforeAndAfterAll | ||
| import org.scalatest.concurrent.Eventually | ||
| import org.scalatest.matchers.should.Matchers | ||
| import org.scalatest.time.{Seconds, Span} | ||
| import org.scalatest.wordspec.AnyWordSpecLike | ||
|
|
||
| import filodb.coordinator.client.QueryCommands.LogicalPlan2Query | ||
| import filodb.core.DatasetRef | ||
| import filodb.core.query.{ | ||
| QueryStats, | ||
| QueryWarnings, | ||
| RangeVector, | ||
| RangeVectorCursor, | ||
| RangeVectorKey, | ||
| ResultSchema, | ||
| RvRange | ||
| } | ||
| import filodb.memory.format.{RowReader, ZeroCopyUTF8String} | ||
| import filodb.query.{QueryResult, TsCardinalities} | ||
| import filodb.query.exec.TsCardExec._ | ||
|
|
||
| class TenantIngestionMeteringSpec extends TestKit(ActorSystem("TenantIngestionMeteringSpec")) | ||
| with AnyWordSpecLike with Matchers with BeforeAndAfterAll with Eventually { | ||
|
|
||
|
|
||
| implicit override val patienceConfig = PatienceConfig( | ||
| timeout = Span(30, Seconds), | ||
| interval = Span(1, Seconds) | ||
| ) | ||
|
|
||
| override def beforeAll(): Unit = { | ||
| super.beforeAll() | ||
| Kamon.init() | ||
| } | ||
|
|
||
| override def afterAll(): Unit = { | ||
| Kamon.stopModules() | ||
| TestKit.shutdownActorSystem(system) | ||
| super.afterAll() | ||
| } | ||
|
|
||
| val testConfig: Config = ConfigFactory.parseString(""" | ||
| filodb { | ||
| metering-query-interval = 1 second | ||
| cluster-type = "raw" | ||
| partition = "test-partition" | ||
| ingest-resolution-millis = 10000 | ||
| max-data-per-shard-query = 1048576 | ||
| } | ||
| """) | ||
|
|
||
| val settings = new FilodbSettings(testConfig) | ||
| val testDataset = DatasetRef("test-dataset") | ||
| val testWs = "test-ws" | ||
| val testNs = "test-ns" | ||
|
|
||
| val mockRowReader = new RowReader { | ||
| override def notNull(columnNo: Int): Boolean = true | ||
| override def getBoolean(columnNo: Int): Boolean = throw new UnsupportedOperationException | ||
| override def getInt(columnNo: Int): Int = throw new UnsupportedOperationException | ||
| override def getLong(columnNo: Int): Long = columnNo match { | ||
| case 1 => 100L // active | ||
| case 2 => 150L // shortTerm | ||
| case 3 => 0L // longTerm | ||
| case _ => throw new IllegalArgumentException(s"Invalid index $columnNo") | ||
| } | ||
| override def getDouble(columnNo: Int): Double = throw new UnsupportedOperationException | ||
| override def getFloat(columnNo: Int): Float = throw new UnsupportedOperationException | ||
| override def getString(columnNo: Int): String = throw new UnsupportedOperationException | ||
| override def getAny(columnNo: Int): Any = columnNo match { | ||
| case 0 => ZeroCopyUTF8String(s"${testWs}${PREFIX_DELIM}${testNs}") | ||
| case _ => throw new IllegalArgumentException(s"Invalid index $columnNo") | ||
| } | ||
| override def getBlobBase(columnNo: Int): Any = throw new UnsupportedOperationException | ||
| override def getBlobOffset(columnNo: Int): Long = throw new UnsupportedOperationException | ||
| override def getBlobNumBytes(columnNo: Int): Int = throw new UnsupportedOperationException | ||
| } | ||
|
|
||
| class MockCoordinator(probe: TestProbe) extends Actor { | ||
| def receive: Receive = { | ||
| case msg @ LogicalPlan2Query(dsRef, logicalPlan: TsCardinalities, qContext, _) => | ||
| probe.ref forward msg | ||
| val rangeVector = new RangeVector { | ||
| override def key: RangeVectorKey = new RangeVectorKey { | ||
| override def labelValues: Map[ZeroCopyUTF8String, ZeroCopyUTF8String] = Map.empty | ||
| override def sourceShards: Seq[Int] = Seq.empty | ||
| override def partIds: Seq[Int] = Seq.empty | ||
| override def schemaNames: Seq[String] = Seq.empty | ||
| override def keySize: Int = 0 | ||
| } | ||
| override def rows: RangeVectorCursor = new RangeVectorCursor { | ||
| private var hasNextValue = true | ||
| override def hasNext: Boolean = hasNextValue | ||
| override def next(): RowReader = { | ||
| if (!hasNextValue) throw new NoSuchElementException | ||
| hasNextValue = false | ||
| mockRowReader | ||
| } | ||
| override def close(): Unit = {} | ||
| } | ||
| override def outputRange: Option[RvRange] = None | ||
| override def numRows: Option[Int] = Some(1) | ||
| } | ||
|
|
||
| val queryResult = QueryResult( | ||
| id = dsRef.dataset, | ||
| resultSchema = ResultSchema.empty, | ||
| result = Seq(rangeVector), | ||
| queryStats = QueryStats(), | ||
| warnings = QueryWarnings(), | ||
| mayBePartial = false, | ||
| partialResultReason = None | ||
| ) | ||
| sender() ! Success(queryResult) | ||
| } | ||
| } | ||
|
|
||
| def createMetering(probe: TestProbe): TenantIngestionMetering = { | ||
| val dsIterProducer = () => Iterator(testDataset) | ||
| val coordActor = system.actorOf(Props(new MockCoordinator(probe))) | ||
| val coordActorProducer = () => coordActor | ||
| TenantIngestionMetering(settings, dsIterProducer, coordActorProducer) | ||
| } | ||
|
|
||
| "TenantIngestionMetering" should { | ||
| "publish metrics correctly for raw cluster type" in { | ||
| val probe = TestProbe() | ||
| val metering = createMetering(probe) | ||
|
|
||
| // Trigger metric publishing | ||
| metering.schedulePeriodicPublishJob() | ||
|
|
||
| // Verify the query is sent with correct parameters | ||
| probe.expectMsgPF(10.seconds) { | ||
| case LogicalPlan2Query(dsRef, logicalPlan: TsCardinalities, qContext, _) => | ||
| dsRef shouldBe testDataset | ||
| logicalPlan.shardKeyPrefix shouldBe Nil | ||
| logicalPlan.numGroupByFields shouldBe 2 | ||
| logicalPlan.overrideClusterName shouldBe "raw" | ||
| qContext.traceInfo should contain("filodb.partition" -> "test-partition") | ||
| } | ||
|
|
||
| // Wait and verify metrics were published with correct values | ||
| val tags = Map( | ||
| "metric_ws" -> testWs, | ||
| "metric_ns" -> testNs, | ||
| "dataset" -> testDataset.dataset, | ||
| "cluster_type" -> "raw", | ||
| "_ws_" -> testWs, | ||
| "_ns_" -> testNs | ||
| ) | ||
|
|
||
| eventually { | ||
| // Verify active timeseries metric | ||
| val activeGauge = Kamon.gauge("tsdb_metering_active_timeseries") | ||
| val activeValue = activeGauge.withTags(TagSet.from(tags)) | ||
| println(s"Active timeseries - Expected: 100.0, Actual: $activeValue") | ||
| activeValue shouldBe 100.0 +- 0.1 | ||
|
|
||
| // Verify total timeseries metric | ||
| val totalGauge = Kamon.gauge("tsdb_metering_total_timeseries") | ||
| val totalValue = totalGauge.withTags(TagSet.from(tags)) | ||
| println(s"Total timeseries - Expected: 150.0, Actual: $totalValue") | ||
| totalValue shouldBe 150.0 +- 0.1 | ||
|
|
||
| // Verify retained timeseries metric | ||
| val retainedGauge = Kamon.gauge("tsdb_metering_retained_timeseries") | ||
| val retainedValue = retainedGauge.withTags(TagSet.from(tags)) | ||
| println(s"Retained timeseries - Expected: 100.0, Actual: $retainedValue") | ||
| retainedValue shouldBe 100.0 +- 0.1 | ||
|
|
||
| // Verify samples ingested per minute (based on 10s resolution) | ||
| val expectedSamplesPerMin = 100.0 * (60000.0 / 10000) // 600.0 | ||
| val samplesGauge = Kamon.gauge("tsdb_metering_samples_ingested_per_min") | ||
| val samplesValue = samplesGauge.withTags(TagSet.from(tags)) | ||
| println(s"Samples ingested per min - Expected: $expectedSamplesPerMin, Actual: $samplesValue") | ||
| samplesValue shouldBe expectedSamplesPerMin +- 0.1 | ||
|
|
||
| // Verify query bytes scanned per minute | ||
| val expectedBytesPerMin = 100.0 * 1048576.0 // Using raw bytes now | ||
| val bytesGauge = Kamon.gauge("tsdb_metering_query_samples_scanned_per_min") | ||
| val bytesValue = bytesGauge.withTags(TagSet.from(tags)) | ||
| println(s"Query bytes scanned per min - Expected: $expectedBytesPerMin, Actual: $bytesValue") | ||
| bytesValue shouldBe expectedBytesPerMin +- 0.1 | ||
|
|
||
| // Print all tags being used | ||
| println(s"Using tags: $tags") | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this constant be named
SAMPLES_SCANNED?