Skip to content

Commit 8c4a8c4

Browse files
authored
Handle changes to MutableSettings and ExporterSettings without rebuilding (#7724)
## Summary of changes - This is the big one - Update services to dynamically update when mutable settings or exporter settings change - Stop rebuilding everything when there's manual/remote configuration ## Reason for change This is the "endpoint" that we've been heading for - services only being disposed/rebuilt at the end of the app, and otherwise only rebuilding the _necessary_ parts. For example - we don't need to tear down all the API factories when a customer changes a global tag via remote config; they only need to change if the `ExporterSettings` change. The hope is that overall this reduces the overhead of using configuration in code and/or remote configuration, while also reducing the number of issues due to managing disposal of services. ## Implementation details Overall, this PR is kind of a pain. Moving from the "rebuild everything" to "reconfigure each service" couldn't be done piecemeal, so this is the one-shot PR. What's more, different services need different patterns (though we can probably consolidate some of them, this has taken a _lot_ of work and I likely changed patterns unnecessarily in some places). In general, there's a couple of patterns: - CI Vis doesn't let you change settings at runtime, so it _never_ needs to respond to changes. It always just uses the "initial" settings - Debugger _today_ doesn't respond to changes at runtime (except its own dynamic config), so for now we ignore Debugger too as it's not really a regression. I hope we can fix this soon though. - I've introduced the concept of `Managed*` versions of some services - These services generally "wrap" the existing type, delegating access to the underlying service, and handling settings changes - Many services only care about a sub-set of mutable settings, so they only update if they need to - Somewhat annoyingly, setting updates occur on a background thread, so we need to be careful about thread safety. Where necessary (most places) I've made sure access to a now-mutable service is done using `Volatile.Read()` (to ensure changes are visible) and are generally cached to a local variable (as the underlying field may be updated in the background). ## Test coverage In the vast majority of places, this should be covered by existing tests I plan to add some additional integration tests around reconfiguring and a bunch of manual testing to make sure I'm confident. ## Other details I strongly recommend reviewing commit-by-commit. They're generally self-contained, and hopefully simple enough to understand one commit at a time. https://datadoghq.atlassian.net/browse/LANGPLAT-819 Part of a config stack - #7522 - #7525 - #7530 - #7532 - #7543 - #7544 - #7721 - #7722 - #7695 - #7723 - #7724 👈 - #7796 This isn't the final PR in the stack, as there will be a bunch of cleaning up to do, but it's the final "implementation" PR
1 parent 9ac42fd commit 8c4a8c4

File tree

137 files changed

+3963
-1507
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+3963
-1507
lines changed

tracer/missing-nullability-files.csv

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ src/Datadog.Trace/Tracer.cs
3434
src/Datadog.Trace/TracerConstants.cs
3535
src/Datadog.Trace/TracerManager.cs
3636
src/Datadog.Trace/TracerManagerFactory.cs
37-
src/Datadog.Trace/Agent/AgentWriter.cs
3837
src/Datadog.Trace/Agent/Api.cs
3938
src/Datadog.Trace/Agent/ClientStatsPayload.cs
40-
src/Datadog.Trace/Agent/IAgentWriter.cs
4139
src/Datadog.Trace/Agent/IApi.cs
4240
src/Datadog.Trace/Agent/IApiRequest.cs
4341
src/Datadog.Trace/Agent/IApiRequestFactory.cs

tracer/src/Datadog.Trace.Tools.Runner/Utils.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,9 @@ public static async Task<AgentConfiguration> CheckAgentConnectionAsync(string ag
417417

418418
var settings = new TracerSettings(configurationSource, new ConfigurationTelemetry(), new OverrideErrorLog());
419419

420-
Log.Debug("Creating DiscoveryService for: {AgentUri}", settings.Exporter.AgentUri);
421-
var discoveryService = DiscoveryService.Create(
422-
settings.Exporter,
420+
Log.Debug("Creating DiscoveryService for: {AgentUri}", settings.Manager.InitialExporterSettings.AgentUri);
421+
var discoveryService = DiscoveryService.CreateUnmanaged(
422+
settings.Manager.InitialExporterSettings,
423423
tcpTimeout: TimeSpan.FromSeconds(5),
424424
initialRetryDelayMs: 200,
425425
maxRetryDelayMs: 1000,
@@ -433,7 +433,7 @@ public static async Task<AgentConfiguration> CheckAgentConnectionAsync(string ag
433433
using (cts.Token.Register(
434434
() =>
435435
{
436-
WriteError($"Error connecting to the Datadog Agent at {settings.Exporter.AgentUri}.");
436+
WriteError($"Error connecting to the Datadog Agent at {settings.Manager.InitialExporterSettings.AgentUri}.");
437437
tcs.TrySetResult(null);
438438
}))
439439
{

tracer/src/Datadog.Trace/Agent/AgentWriter.cs

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
55

6+
#nullable enable
7+
68
using System;
79
using System.Collections.Concurrent;
810
using System.Collections.Generic;
@@ -12,10 +14,8 @@
1214
using Datadog.Trace.Configuration;
1315
using Datadog.Trace.DogStatsd;
1416
using Datadog.Trace.Logging;
15-
using Datadog.Trace.Tagging;
1617
using Datadog.Trace.Telemetry;
1718
using Datadog.Trace.Telemetry.Metrics;
18-
using Datadog.Trace.Vendors.StatsdClient;
1919

2020
namespace Datadog.Trace.Agent
2121
{
@@ -25,10 +25,10 @@ internal class AgentWriter : IAgentWriter
2525

2626
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<AgentWriter>();
2727

28-
private static readonly ArraySegment<byte> EmptyPayload = new(new byte[] { 0x90 });
28+
private static readonly ArraySegment<byte> EmptyPayload = new([0x90]);
2929

3030
private readonly ConcurrentQueue<WorkItem> _pendingTraces = new ConcurrentQueue<WorkItem>();
31-
private readonly IDogStatsd _statsd;
31+
private readonly IStatsdManager _statsd;
3232
private readonly Task _flushTask;
3333
private readonly Task _serializationTask;
3434
private readonly TaskCompletionSource<bool> _processExit = new TaskCompletionSource<bool>();
@@ -43,7 +43,7 @@ internal class AgentWriter : IAgentWriter
4343
private readonly int _batchInterval;
4444
private readonly IKeepRateCalculator _traceKeepRateCalculator;
4545

46-
private readonly IStatsAggregator _statsAggregator;
46+
private readonly IStatsAggregator? _statsAggregator;
4747

4848
private readonly bool _apmTracingEnabled;
4949

@@ -65,17 +65,28 @@ internal class AgentWriter : IAgentWriter
6565

6666
private long _droppedTraces;
6767

68-
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, TracerSettings settings)
69-
: this(api, statsAggregator, statsd, maxBufferSize: settings.TraceBufferSize, batchInterval: settings.TraceBatchInterval, apmTracingEnabled: settings.ApmTracingEnabled)
68+
private bool _traceMetricsEnabled;
69+
70+
public AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, TracerSettings settings)
71+
: this(api, statsAggregator, statsd, maxBufferSize: settings.TraceBufferSize, batchInterval: settings.TraceBatchInterval, apmTracingEnabled: settings.ApmTracingEnabled, initialTracerMetricsEnabled: settings.Manager.InitialMutableSettings.TracerMetricsEnabled)
7072
{
73+
settings.Manager.SubscribeToChanges(changes =>
74+
{
75+
if (changes.UpdatedMutable is { } mutable
76+
&& mutable.TracerMetricsEnabled != changes.PreviousMutable.TracerMetricsEnabled)
77+
{
78+
Volatile.Write(ref _traceMetricsEnabled, mutable.TracerMetricsEnabled);
79+
_statsd.SetRequired(StatsdConsumer.AgentWriter, mutable.TracerMetricsEnabled);
80+
}
81+
});
7182
}
7283

73-
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true)
74-
: this(api, statsAggregator, statsd, MovingAverageKeepRateCalculator.CreateDefaultKeepRateCalculator(), automaticFlush, maxBufferSize, batchInterval, apmTracingEnabled)
84+
public AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true, bool initialTracerMetricsEnabled = false)
85+
: this(api, statsAggregator, statsd, MovingAverageKeepRateCalculator.CreateDefaultKeepRateCalculator(), automaticFlush, maxBufferSize, batchInterval, apmTracingEnabled, initialTracerMetricsEnabled)
7586
{
7687
}
7788

78-
internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled)
89+
internal AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled, bool initialTracerMetricsEnabled)
7990
{
8091
_statsAggregator = statsAggregator;
8192

@@ -92,18 +103,20 @@ internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd stat
92103
_backBuffer = new SpanBuffer(maxBufferSize, formatterResolver);
93104
_activeBuffer = _frontBuffer;
94105

106+
_apmTracingEnabled = apmTracingEnabled;
107+
_traceMetricsEnabled = initialTracerMetricsEnabled;
108+
_statsd.SetRequired(StatsdConsumer.AgentWriter, initialTracerMetricsEnabled);
109+
95110
_serializationTask = automaticFlush ? Task.Factory.StartNew(SerializeTracesLoop, TaskCreationOptions.LongRunning) : Task.CompletedTask;
96111
_serializationTask.ContinueWith(t => Log.Error(t.Exception, "Error in serialization task"), TaskContinuationOptions.OnlyOnFaulted);
97112

98113
_flushTask = automaticFlush ? Task.Run(FlushBuffersTaskLoopAsync) : Task.CompletedTask;
99114
_flushTask.ContinueWith(t => Log.Error(t.Exception, "Error in flush task"), TaskContinuationOptions.OnlyOnFaulted);
100115

101116
_backBufferFlushTask = _frontBufferFlushTask = Task.CompletedTask;
102-
103-
_apmTracingEnabled = apmTracingEnabled;
104117
}
105118

106-
internal event Action Flushed;
119+
internal event Action? Flushed;
107120

108121
internal SpanBuffer ActiveBuffer => _activeBuffer;
109122

@@ -138,10 +151,14 @@ public void WriteTrace(ArraySegment<Span> trace)
138151
}
139152
}
140153

141-
if (_statsd != null)
154+
if (Volatile.Read(ref _traceMetricsEnabled))
142155
{
143-
_statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
144-
_statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
156+
using var lease = _statsd.TryGetClientLease();
157+
if (lease.Client is { } statsd)
158+
{
159+
statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
160+
statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
161+
}
145162
}
146163
}
147164

@@ -244,7 +261,7 @@ private async Task FlushBuffersTaskLoopAsync()
244261
{
245262
tasks[2] = Task.Delay(TimeSpan.FromSeconds(1));
246263
await Task.WhenAny(tasks).ConfigureAwait(false);
247-
tasks[2] = null;
264+
tasks[2] = null!;
248265

249266
if (_forceFlush.Task.IsCompleted)
250267
{
@@ -314,10 +331,14 @@ async Task InternalBufferFlush()
314331

315332
try
316333
{
317-
if (_statsd != null)
334+
if (Volatile.Read(ref _traceMetricsEnabled))
318335
{
319-
_statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
320-
_statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
336+
using var lease = _statsd.TryGetClientLease();
337+
if (lease.Client is { } statsd)
338+
{
339+
statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
340+
statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
341+
}
321342
}
322343

323344
var droppedTraces = Interlocked.Exchange(ref _droppedTraces, 0);
@@ -336,7 +357,7 @@ async Task InternalBufferFlush()
336357
{
337358
droppedP0Traces = Interlocked.Exchange(ref _droppedP0Traces, 0);
338359
droppedP0Spans = Interlocked.Exchange(ref _droppedP0Spans, 0);
339-
Log.Debug<int, int, long, long>("Flushing {Spans} spans across {Traces} traces. CanComputeStats is enabled with {DroppedP0Traces} droppedP0Traces and {DroppedP0Spans} droppedP0Spans", buffer.SpanCount, buffer.TraceCount, droppedP0Traces, droppedP0Spans);
360+
Log.Debug("Flushing {Spans} spans across {Traces} traces. CanComputeStats is enabled with {DroppedP0Traces} droppedP0Traces and {DroppedP0Spans} droppedP0Spans", buffer.SpanCount, buffer.TraceCount, droppedP0Traces, droppedP0Spans);
340361
// Metrics for unsampled traces/spans already recorded
341362
}
342363
else
@@ -377,7 +398,7 @@ async Task InternalBufferFlush()
377398
private void SerializeTrace(ArraySegment<Span> spans)
378399
{
379400
// Declaring as inline method because only safe to invoke in the context of SerializeTrace
380-
SpanBuffer SwapBuffers()
401+
SpanBuffer? SwapBuffers()
381402
{
382403
if (_activeBuffer == _frontBuffer)
383404
{
@@ -512,10 +533,14 @@ private void DropTrace(ArraySegment<Span> spans)
512533
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.OverfullBuffer, spans.Count);
513534
TelemetryFactory.Metrics.RecordCountTraceChunkDropped(MetricTags.DropReason.OverfullBuffer);
514535

515-
if (_statsd != null)
536+
if (Volatile.Read(ref _traceMetricsEnabled))
516537
{
517-
_statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
518-
_statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
538+
using var lease = _statsd.TryGetClientLease();
539+
if (lease.Client is { } statsd)
540+
{
541+
statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
542+
statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
543+
}
519544
}
520545
}
521546

@@ -578,7 +603,7 @@ private void SerializeTracesLoop()
578603
private readonly struct WorkItem
579604
{
580605
public readonly ArraySegment<Span> Trace;
581-
public readonly Action Callback;
606+
public readonly Action? Callback;
582607

583608
public WorkItem(ArraySegment<Span> trace)
584609
{

tracer/src/Datadog.Trace/Agent/Api.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
using System;
77
using System.Collections.Generic;
8+
using System.Diagnostics.CodeAnalysis;
89
using System.IO;
910
using System.Net.Sockets;
11+
using System.Threading;
1012
using System.Threading.Tasks;
1113
using Datadog.Trace.Agent.Transports;
1214
using Datadog.Trace.DogStatsd;
@@ -32,7 +34,7 @@ internal class Api : IApi
3234

3335
private readonly IDatadogLogger _log;
3436
private readonly IApiRequestFactory _apiRequestFactory;
35-
private readonly IDogStatsd _statsd;
37+
private readonly IStatsdManager _statsd;
3638
private readonly string _containerId;
3739
private readonly string _entityId;
3840
private readonly Uri _tracesEndpoint;
@@ -43,12 +45,14 @@ internal class Api : IApi
4345
private readonly SendCallback<SendTracesState> _sendTraces;
4446
private string _cachedResponse;
4547
private string _agentVersion;
48+
private bool _healthMetricsEnabled;
4649

4750
public Api(
4851
IApiRequestFactory apiRequestFactory,
49-
IDogStatsd statsd,
52+
IStatsdManager statsd,
5053
Action<Dictionary<string, float>> updateSampleRates,
5154
bool partialFlushEnabled,
55+
bool healthMetricsEnabled,
5256
IDatadogLogger log = null)
5357
{
5458
// optionally injecting a log instance in here for testing purposes
@@ -58,10 +62,12 @@ public Api(
5862
_sendTraces = SendTracesAsyncImpl;
5963
_updateSampleRates = updateSampleRates;
6064
_statsd = statsd;
65+
ToggleTracerHealthMetrics(healthMetricsEnabled);
6166
_containerId = ContainerMetadata.GetContainerId();
6267
_entityId = ContainerMetadata.GetEntityId();
6368
_apiRequestFactory = apiRequestFactory;
6469
_partialFlushEnabled = partialFlushEnabled;
70+
_healthMetricsEnabled = healthMetricsEnabled;
6571
_tracesEndpoint = _apiRequestFactory.GetEndpoint(TracesPath);
6672
_log.Debug("Using traces endpoint {TracesEndpoint}", _tracesEndpoint.ToString());
6773
_statsEndpoint = _apiRequestFactory.GetEndpoint(StatsPath);
@@ -77,6 +83,13 @@ private enum SendResult
7783
Failed_DontRetry,
7884
}
7985

86+
[MemberNotNull(nameof(_statsd))]
87+
public void ToggleTracerHealthMetrics(bool enabled)
88+
{
89+
Volatile.Write(ref _healthMetricsEnabled, enabled);
90+
_statsd.SetRequired(StatsdConsumer.TraceApi, enabled);
91+
}
92+
8093
public Task<bool> SendStatsAsync(StatsBuffer stats, long bucketDuration)
8194
{
8295
_log.Debug("Sending stats to the Datadog Agent.");
@@ -299,10 +312,13 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin
299312

300313
try
301314
{
315+
var healthMetricsEnabled = Volatile.Read(ref _healthMetricsEnabled);
316+
using var lease = healthMetricsEnabled ? _statsd.TryGetClientLease() : default;
317+
var healthStats = healthMetricsEnabled ? lease.Client : null;
302318
try
303319
{
304320
TelemetryFactory.Metrics.RecordCountTraceApiRequests();
305-
_statsd?.Increment(TracerMetricNames.Api.Requests);
321+
healthStats?.Increment(TracerMetricNames.Api.Requests);
306322
response = await request.PostAsync(traces, MimeTypes.MsgPack).ConfigureAwait(false);
307323
}
308324
catch (Exception ex)
@@ -311,17 +327,17 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin
311327
// (which are handled below)
312328
var tag = ex is TimeoutException ? MetricTags.ApiError.Timeout : MetricTags.ApiError.NetworkError;
313329
TelemetryFactory.Metrics.RecordCountTraceApiErrors(tag);
314-
_statsd?.Increment(TracerMetricNames.Api.Errors);
330+
healthStats?.Increment(TracerMetricNames.Api.Errors);
315331
throw;
316332
}
317333

318-
if (_statsd != null)
334+
if (healthStats != null)
319335
{
320336
// don't bother creating the tags array if trace metrics are disabled
321337
string[] tags = { $"status:{response.StatusCode}" };
322338

323339
// count every response, grouped by status code
324-
_statsd?.Increment(TracerMetricNames.Api.Responses, tags: tags);
340+
healthStats.Increment(TracerMetricNames.Api.Responses, tags: tags);
325341
}
326342

327343
TelemetryFactory.Metrics.RecordCountTraceApiResponses(response.GetTelemetryStatusCodeMetricTag());

tracer/src/Datadog.Trace/Agent/ClientStatsPayload.cs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,29 @@
44
// </copyright>
55

66
using System.Threading;
7+
using Datadog.Trace.Configuration;
78

89
namespace Datadog.Trace.Agent
910
{
10-
internal class ClientStatsPayload
11+
internal class ClientStatsPayload(MutableSettings settings)
1112
{
13+
private AppSettings _settings = CreateSettings(settings);
1214
private long _sequence;
1315

14-
public string HostName { get; set; }
16+
public string HostName { get; init; }
1517

16-
public string Environment { get; set; }
18+
public AppSettings Details => _settings;
1719

18-
public string Version { get; set; }
19-
20-
public string ProcessTags { get; set; }
20+
public string ProcessTags { get; init; }
2121

2222
public long GetSequenceNumber() => Interlocked.Increment(ref _sequence);
23+
24+
public void UpdateDetails(MutableSettings settings)
25+
=> Interlocked.Exchange(ref _settings, CreateSettings(settings));
26+
27+
private static AppSettings CreateSettings(MutableSettings settings)
28+
=> new(settings.Environment, settings.ServiceVersion);
29+
30+
internal record AppSettings(string Environment, string Version);
2331
}
2432
}

0 commit comments

Comments
 (0)