Skip to content

Commit e4a4cdc

Browse files
authored
refactor: O11Y-630 - Sampling and conditional logic for LOGS moved from exporters to processors (#288)
## Summary To avoid buffering logs that should be discarded due to the sampling config, the stop sampling logs in the exporter to do it in the processors. "Conditional logic" refers to the logic that was inside the ConditionLogRecordExporter (Used to allow different filtering rules for crashes vs normal logs) ## How did you test this change? Unit tests ## Are there any deployment considerations? No <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Replaces exporter-based log sampling and crash/normal filtering with processor-based equivalents and updates the log pipeline and tests accordingly. > > - **Logs pipeline**: > - Introduces `ConditionalLogRecordProcessor` to filter crash (`io.opentelemetry.crash`) vs normal logs. > - Introduces `SamplingLogProcessor` to apply sampling before downstream processing. > - Removes exporter wrappers `ConditionalLogRecordExporter` and `SamplingLogExporter`. > - Updates `InstrumentationManager.createLoggerProcessor` to chain `SamplingLogProcessor` → `ConditionalLogRecordProcessor` → `RoutingLogRecordProcessor`, and simplifies `createLogExporter` to only build a composite in debug mode. > - **Tests**: > - Adds unit tests for `ConditionalLogRecordProcessor` and `SamplingLogProcessor`. > - Removes tests for deleted exporter classes. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 2067fb3. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 629bc39 commit e4a4cdc

File tree

9 files changed

+408
-535
lines changed

9 files changed

+408
-535
lines changed

sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/ConditionalLogRecordExporter.kt

Lines changed: 0 additions & 43 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.launchdarkly.observability.client
2+
3+
import io.opentelemetry.context.Context
4+
import io.opentelemetry.sdk.common.CompletableResultCode
5+
import io.opentelemetry.sdk.logs.LogRecordProcessor
6+
import io.opentelemetry.sdk.logs.ReadWriteLogRecord
7+
8+
/**
9+
* Filters log records before they reach downstream processors based on their instrumentation scope.
10+
*
11+
* Crash logs emitted by OpenTelemetry's crash instrumentation use the "io.opentelemetry.crash" scope.
12+
* This processor can drop those logs independently from normal application logs.
13+
*/
14+
class ConditionalLogRecordProcessor(
15+
private val delegate: LogRecordProcessor,
16+
private val allowNormalLogs: Boolean,
17+
private val allowCrashes: Boolean
18+
) : LogRecordProcessor {
19+
20+
override fun onEmit(context: Context, logRecord: ReadWriteLogRecord) {
21+
if (!shouldEmitLog(logRecord)) {
22+
return
23+
}
24+
25+
delegate.onEmit(context, logRecord)
26+
}
27+
28+
override fun forceFlush(): CompletableResultCode = delegate.forceFlush()
29+
30+
override fun shutdown(): CompletableResultCode = delegate.shutdown()
31+
32+
override fun close() {
33+
delegate.close()
34+
}
35+
36+
private fun shouldEmitLog(logRecord: ReadWriteLogRecord): Boolean {
37+
// Check if this is a crash log from OpenTelemetry's CrashReporterInstrumentation
38+
val instrumentationScopeName = logRecord.instrumentationScopeInfo.name
39+
val isCrashLog = instrumentationScopeName == CRASH_INSTRUMENTATION_SCOPE
40+
41+
return when {
42+
isCrashLog -> allowCrashes
43+
else -> allowNormalLogs
44+
}
45+
}
46+
47+
private companion object {
48+
const val CRASH_INSTRUMENTATION_SCOPE = "io.opentelemetry.crash"
49+
}
50+
}

sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/client/InstrumentationManager.kt

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import com.launchdarkly.observability.network.SamplingApiService
99
import com.launchdarkly.observability.sampling.CustomSampler
1010
import com.launchdarkly.observability.sampling.ExportSampler
1111
import com.launchdarkly.observability.sampling.SamplingConfig
12-
import com.launchdarkly.observability.sampling.SamplingLogExporter
12+
import com.launchdarkly.observability.sampling.SamplingLogProcessor
1313
import com.launchdarkly.observability.sampling.SamplingTraceExporter
1414
import com.launchdarkly.observability.coroutines.DispatcherProviderHolder
1515
import io.opentelemetry.android.OpenTelemetryRum
@@ -415,20 +415,30 @@ class InstrumentationManager(
415415

416416
val finalExporter = createLogExporter(
417417
primaryLogExporter,
418-
exportSampler,
419418
logger,
420419
telemetryInspector,
421420
options
422421
)
423-
val baseProcessor = createBatchLogRecordProcessor(finalExporter)
424-
425-
// Here we set up a routing log processor that will route logs with a matching scope name to the
426-
// respective instrumentation's log record processor. If the log's scope name does not match
427-
// an instrumentation's scope name, it will fall through to the base processor. This was
428-
// originally added to route replay instrumentation logs through a separate log processing
429-
// pipeline to provide instrumentation specific caching and export.
430-
val routingLogRecordProcessor =
431-
RoutingLogRecordProcessor(fallthroughProcessor = baseProcessor)
422+
423+
val samplingProcessor = SamplingLogProcessor(
424+
createBatchLogRecordProcessor(finalExporter),
425+
exportSampler
426+
)
427+
428+
val baseProcessor = ConditionalLogRecordProcessor(
429+
delegate = samplingProcessor,
430+
allowNormalLogs = !options.disableLogs,
431+
allowCrashes = !options.disableErrorTracking
432+
)
433+
434+
/*
435+
Here we set up a routing log processor that will route logs with a matching scope name to the
436+
respective instrumentation's log record processor. If the log's scope name does not match
437+
an instrumentation's scope name, it will fall through to the base processor. This was
438+
originally added to route replay instrumentation logs through a separate log processing
439+
pipeline to provide instrumentation specific caching and export.
440+
*/
441+
val routingLogRecordProcessor = RoutingLogRecordProcessor(fallthroughProcessor = baseProcessor)
432442
options.instrumentations.forEach { instrumentation ->
433443
instrumentation.getLogRecordProcessor(credential = sdkKey)?.let { processor ->
434444
instrumentation.getLoggerScopeName().let { scopeName ->
@@ -449,12 +459,11 @@ class InstrumentationManager(
449459

450460
private fun createLogExporter(
451461
primaryExporter: LogRecordExporter,
452-
exportSampler: ExportSampler,
453462
logger: LDLogger,
454463
telemetryInspector: TelemetryInspector?,
455464
options: Options
456465
): LogRecordExporter {
457-
val baseExporter = if (options.debug) {
466+
return if (options.debug) {
458467
LogRecordExporter.composite(
459468
buildList {
460469
add(primaryExporter)
@@ -465,14 +474,6 @@ class InstrumentationManager(
465474
} else {
466475
primaryExporter
467476
}
468-
469-
val conditionalExporter = ConditionalLogRecordExporter(
470-
delegate = baseExporter,
471-
allowNormalLogs = !options.disableLogs,
472-
allowCrashes = !options.disableErrorTracking
473-
)
474-
475-
return SamplingLogExporter(conditionalExporter, exportSampler)
476477
}
477478

478479
fun createBatchLogRecordProcessor(logRecordExporter: LogRecordExporter): BatchLogRecordProcessor {

sdk/@launchdarkly/observability-android/lib/src/main/kotlin/com/launchdarkly/observability/sampling/SamplingLogExporter.kt

Lines changed: 0 additions & 59 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.launchdarkly.observability.sampling
2+
3+
import io.opentelemetry.api.common.Attributes
4+
import io.opentelemetry.context.Context
5+
import io.opentelemetry.sdk.common.CompletableResultCode
6+
import io.opentelemetry.sdk.logs.LogRecordProcessor
7+
import io.opentelemetry.sdk.logs.ReadWriteLogRecord
8+
9+
/**
10+
* A [LogRecordProcessor] that applies sampling logic before delegating to another [LogRecordProcessor].
11+
*
12+
* This processor drops unsampled log records so they are never enqueued in downstream processors,
13+
* reducing buffering and export overhead for logs that would have been filtered out later.
14+
*
15+
* @param delegate The underlying [LogRecordProcessor] that should receive sampled log records.
16+
* @param sampler The sampler that decides whether a log record should be processed.
17+
*/
18+
class SamplingLogProcessor(
19+
private val delegate: LogRecordProcessor,
20+
private val sampler: ExportSampler
21+
) : LogRecordProcessor {
22+
23+
override fun onEmit(context: Context, logRecord: ReadWriteLogRecord) {
24+
if (!sampler.isSamplingEnabled()) {
25+
delegate.onEmit(context, logRecord)
26+
return
27+
}
28+
29+
val samplingResult = sampler.sampleLog(logRecord.toLogRecordData())
30+
if (!samplingResult.sample) {
31+
return
32+
}
33+
34+
samplingResult.attributes?.let { addSamplingAttributes(logRecord, it) }
35+
delegate.onEmit(context, logRecord)
36+
}
37+
38+
override fun shutdown(): CompletableResultCode {
39+
return delegate.shutdown()
40+
}
41+
42+
override fun forceFlush(): CompletableResultCode {
43+
return delegate.forceFlush()
44+
}
45+
46+
override fun close() {
47+
delegate.close()
48+
}
49+
50+
private fun addSamplingAttributes(
51+
logRecord: ReadWriteLogRecord,
52+
samplingAttributes: Attributes
53+
) {
54+
val mergedAttributes = Attributes.builder()
55+
.putAll(logRecord.attributes)
56+
.putAll(samplingAttributes)
57+
.build()
58+
logRecord.setAllAttributes(mergedAttributes)
59+
}
60+
}

0 commit comments

Comments
 (0)