Skip to content

Commit 9176330

Browse files
committed
Send telemetry in forked children
1 parent 7b333e9 commit 9176330

File tree

25 files changed

+302
-56
lines changed

25 files changed

+302
-56
lines changed

lib/datadog/core/buffer/random.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ def closed?
8484
@closed
8585
end
8686

87+
# Discards the contents of the buffer.
88+
def clear
89+
@items = []
90+
nil
91+
end
92+
8793
protected
8894

8995
# Segment items into two segments: underflow and overflow.

lib/datadog/core/buffer/thread_safe.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ def close
4949
synchronize { super }
5050
end
5151

52+
def clear
53+
synchronize { super }
54+
end
55+
5256
def synchronize(&block)
5357
@mutex.synchronize(&block)
5458
end

lib/datadog/core/telemetry/component.rb

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@
1414
module Datadog
1515
module Core
1616
module Telemetry
17-
# Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle.
18-
# Note: Telemetry does not spawn its worker thread in fork processes, thus no telemetry is sent in forked processes.
17+
# Telemetry entry point, coordinates sending telemetry events at
18+
# various points in application lifecycle.
1919
#
2020
# @api private
2121
class Component
2222
ENDPOINT_COLLECTION_MESSAGE_LIMIT = 300
2323

2424
attr_reader :enabled, :logger, :transport, :worker
2525

26+
# Alias for consistency with other components.
27+
# TODO Remove +enabled+ method
28+
alias_method :enabled?, :enabled
29+
2630
include Core::Utils::Forking
2731
include Telemetry::Logging
2832

@@ -104,13 +108,17 @@ def initialize( # standard:disable Metrics/MethodLength
104108
@agent_settings = agent_settings
105109
end
106110

111+
attr_reader :settings
112+
attr_reader :agent_settings
113+
attr_reader :logger
114+
107115
def disable!
108116
@enabled = false
109117
@worker&.enabled = false
110118
end
111119

112120
def start(initial_event_is_change = false, components:)
113-
return if !@enabled
121+
return unless enabled?
114122

115123
initial_event = if initial_event_is_change
116124
Event::SynthAppClientConfigurationChange.new(
@@ -136,19 +144,19 @@ def shutdown!
136144
end
137145

138146
def emit_closing!
139-
return if !@enabled || forked?
147+
return unless enabled?
140148

141149
@worker.enqueue(Event::AppClosing.new)
142150
end
143151

144152
def integrations_change!
145-
return if !@enabled || forked?
153+
return unless enabled?
146154

147155
@worker.enqueue(Event::AppIntegrationsChange.new)
148156
end
149157

150158
def log!(event)
151-
return if !@enabled || forked? || !@log_collection_enabled
159+
return unless enabled? && @log_collection_enabled
152160

153161
@worker.enqueue(event)
154162
end
@@ -159,21 +167,21 @@ def log!(event)
159167
#
160168
# @api private
161169
def flush
162-
return if !@enabled || forked?
170+
return unless enabled?
163171

164172
@worker.flush
165173
end
166174

167175
# Report configuration changes caused by Remote Configuration.
168176
def client_configuration_change!(changes)
169-
return if !@enabled || forked?
177+
return unless enabled?
170178

171179
@worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config'))
172180
end
173181

174182
# Report application endpoints
175183
def app_endpoints_loaded(endpoints, page_size: ENDPOINT_COLLECTION_MESSAGE_LIMIT)
176-
return if !@enabled || forked?
184+
return unless enabled?
177185

178186
endpoints.each_slice(page_size).with_index do |endpoints_slice, i|
179187
@worker.enqueue(Event::AppEndpointsLoaded.new(endpoints_slice, is_first: i.zero?))

lib/datadog/core/telemetry/event/app_started.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ class AppStarted < Base
1111
def initialize(components:)
1212
# To not hold a reference to the component tree, generate
1313
# the event payload here in the constructor.
14+
#
15+
# Important: do not store data that contains (or is derived from)
16+
# the runtime_id oor sequence numbers.
17+
# This event is reused when a process forks, but in the
18+
# child process the runtime_id would be different and sequence
19+
# number would obviously also be different.
1420
@configuration = configuration(components.settings, components.agent_settings)
1521
@install_signature = install_signature(components.settings)
1622
@products = products(components)
@@ -30,6 +36,15 @@ def payload
3036
}
3137
end
3238

39+
# Whether the event is actually the app-started event.
40+
# For the app-started event we follow up by sending
41+
# app-dependencies-loaded, if the event is
42+
# app-client-configuration-change we don't send
43+
# app-dependencies-loaded.
44+
def app_started?
45+
true
46+
end
47+
3348
private
3449

3550
def products(components)

lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,36 @@ module Event
2828
# and app-closing events.
2929
class SynthAppClientConfigurationChange < AppStarted
3030
def type
31-
'app-client-configuration-change'
31+
if reset?
32+
super
33+
else
34+
'app-client-configuration-change'
35+
end
3236
end
3337

3438
def payload
35-
{
36-
configuration: @configuration,
37-
}
39+
if reset?
40+
super
41+
else
42+
{
43+
configuration: @configuration,
44+
}
45+
end
46+
end
47+
48+
def app_started?
49+
reset?
50+
end
51+
52+
# Revert this event to a "regular" AppStarted event.
53+
#
54+
# Used in after_fork to send the AppStarted event in child processes.
55+
def reset!
56+
@reset = true
57+
end
58+
59+
def reset?
60+
!!@reset
3861
end
3962
end
4063
end

lib/datadog/core/telemetry/worker.rb

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
module Datadog
1010
module Core
1111
module Telemetry
12-
# Accumulates events and sends them to the API at a regular interval, including heartbeat event.
12+
# Accumulates events and sends them to the API at a regular interval,
13+
# including heartbeat event.
14+
#
15+
# @api private
1316
class Worker
1417
include Core::Workers::Queue
1518
include Core::Workers::Polling
@@ -40,7 +43,7 @@ def initialize(
4043
self.enabled = enabled
4144
# Workers::IntervalLoop settings
4245
self.loop_base_interval = metrics_aggregation_interval_seconds
43-
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP
46+
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART
4447

4548
@shutdown_timeout = shutdown_timeout
4649
@buffer_size = buffer_size
@@ -53,12 +56,13 @@ def initialize(
5356
attr_reader :logger
5457
attr_reader :initial_event_once
5558
attr_reader :initial_event
59+
attr_reader :emitter
5660

5761
# Returns true if worker thread is successfully started,
5862
# false if worker thread was not started but telemetry is enabled,
5963
# nil if telemetry is disabled.
6064
def start(initial_event)
61-
return if !enabled? || forked?
65+
return unless enabled?
6266

6367
@initial_event = initial_event
6468

@@ -79,7 +83,21 @@ def stop(force_stop = false, timeout = @shutdown_timeout)
7983
# for not enqueueing event (presently) is that telemetry is disabled
8084
# altogether, and in this case other methods return nil.
8185
def enqueue(event)
82-
return if !enabled? || forked?
86+
return unless enabled?
87+
88+
# Start the worker if needed, including in forked children.
89+
# Needs to be done before pushing to buffer since perform
90+
# may invoke after_fork handler which resets the buffer.
91+
#
92+
# Telemetry is special in that it permits events to be submitted
93+
# to the worker with the worker not running, and the worker is
94+
# explicitly started later (to maintain proper initialization order).
95+
# Thus here we can't just call perform unconditionally and must
96+
# check if the worker is supposed to be running, and only call
97+
# perform in that case.
98+
if worker && !worker.alive?
99+
perform
100+
end
83101

84102
buffer.push(event)
85103
true
@@ -133,7 +151,7 @@ def flush
133151
private
134152

135153
def perform(*events)
136-
return if !enabled? || forked?
154+
return unless enabled?
137155

138156
if need_initial_event?
139157
started!
@@ -189,7 +207,9 @@ def started!
189207
# dependencies and send the new ones.
190208
# System tests demand only one instance of this event per
191209
# dependency.
192-
send_event(Event::AppDependenciesLoaded.new) if @dependency_collection && initial_event.class.eql?(Telemetry::Event::AppStarted) # standard:disable Style/ClassEqualityComparison:
210+
if @dependency_collection && initial_event.app_started?
211+
send_event(Event::AppDependenciesLoaded.new)
212+
end
193213

194214
true
195215
else
@@ -240,6 +260,28 @@ def disable_on_not_found!(response)
240260
disable!
241261
end
242262

263+
# Stop the worker after fork without sending closing event.
264+
# The closing event will be (or should be) sent by the worker
265+
# in the parent process.
266+
# Also, discard any accumulated events since they will be sent by
267+
# the parent.
268+
def after_fork
269+
# If telemetry is disabled, we still reset the state to avoid
270+
# having wrong state. It is possible that in the future telemetry
271+
# will be re-enabled after errors.
272+
buffer.clear
273+
initial_event_once.reset
274+
# In the child process, we get a new runtime_id.
275+
# As such we need to send AppStarted event.
276+
# In the parent process, the event may have been the
277+
# SynthAppClientConfigurationChange instead of AppStarted,
278+
# and in that case we need to convert it to the "regular"
279+
# AppStarted event.
280+
if @initial_event.is_a?(Event::SynthAppClientConfigurationChange)
281+
@initial_event.reset! # steep:ignore
282+
end
283+
end
284+
243285
# Deduplicate logs by counting the number of repeated occurrences of the same log
244286
# entry and replacing them with a single entry with the calculated `count` value.
245287
# Non-log events are unchanged.

lib/datadog/core/utils/only_once.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ def ran?
3131
@mutex.synchronize { @ran_once }
3232
end
3333

34-
private
35-
36-
def reset_ran_once_state_for_tests
34+
def reset
3735
@mutex.synchronize { @ran_once = false }
3836
end
3937
end

lib/datadog/core/utils/only_once_successful.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ def failed?
6565
@mutex.synchronize { @ran_once && @failed }
6666
end
6767

68+
def reset
69+
@mutex.synchronize do
70+
@ran_once = false
71+
@failed = false
72+
@retries = 0
73+
end
74+
end
75+
6876
private
6977

7078
def check_limit!
@@ -77,14 +85,6 @@ def check_limit!
7785
def limited?
7886
!@limit.nil?
7987
end
80-
81-
def reset_ran_once_state_for_tests
82-
@mutex.synchronize do
83-
@ran_once = false
84-
@failed = false
85-
@retries = 0
86-
end
87-
end
8888
end
8989
end
9090
end

sig/datadog/core/buffer/random.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ module Datadog
3535
def push: (Object) -> Object?
3636

3737
def replace!: (Object) -> Object?
38+
39+
def clear: -> void
3840
end
3941
end
4042
end

sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ module Datadog
33
module Telemetry
44
module Event
55
class SynthAppClientConfigurationChange < AppStarted
6-
def type: () -> "app-client-configuration-change"
6+
def type: -> "app-client-configuration-change"
77

8-
def payload: () -> { configuration: untyped }
8+
def payload: () -> { ?products: untyped, configuration: untyped, ?install_signature: untyped }
9+
10+
def reset?: -> bool
11+
12+
def reset!: -> void
913
end
1014
end
1115
end

0 commit comments

Comments
 (0)