99module Datadog
1010 module Core
1111 module Telemetry
12- # Accumulates events and sends them to the API at a regular interval, including heartbeat event.
12+ # Accumulates events and sends them to the API at a regular interval,
13+ # including heartbeat event.
14+ #
15+ # @api private
1316 class Worker
1417 include Core ::Workers ::Queue
1518 include Core ::Workers ::Polling
@@ -40,11 +43,15 @@ def initialize(
4043 self . enabled = enabled
4144 # Workers::IntervalLoop settings
4245 self . loop_base_interval = metrics_aggregation_interval_seconds
43- self . fork_policy = Core ::Workers ::Async ::Thread ::FORK_POLICY_STOP
46+ self . fork_policy = Core ::Workers ::Async ::Thread ::FORK_POLICY_RESTART
4447
4548 @shutdown_timeout = shutdown_timeout
4649 @buffer_size = buffer_size
4750
51+ initialize_state
52+ end
53+
54+ private def initialize_state
4855 self . buffer = buffer_klass . new ( @buffer_size )
4956
5057 @initial_event_once = Utils ::OnlyOnceSuccessful . new ( APP_STARTED_EVENT_RETRIES )
@@ -53,12 +60,13 @@ def initialize(
5360 attr_reader :logger
5461 attr_reader :initial_event_once
5562 attr_reader :initial_event
63+ attr_reader :emitter
5664
5765 # Returns true if worker thread is successfully started,
5866 # false if worker thread was not started but telemetry is enabled,
5967 # nil if telemetry is disabled.
6068 def start ( initial_event )
61- return if ! enabled? || forked ?
69+ return unless enabled?
6270
6371 @initial_event = initial_event
6472
@@ -79,7 +87,21 @@ def stop(force_stop = false, timeout = @shutdown_timeout)
7987 # for not enqueueing event (presently) is that telemetry is disabled
8088 # altogether, and in this case other methods return nil.
8189 def enqueue ( event )
82- return if !enabled? || forked?
90+ return unless enabled?
91+
92+ # Start the worker if needed, including in forked children.
93+ # Needs to be done before pushing to buffer since perform
94+ # may invoke after_fork handler which resets the buffer.
95+ #
96+ # Telemetry is special in that it permits events to be submitted
97+ # to the worker with the worker not running, and the worker is
98+ # explicitly started later (to maintain proper initialization order).
99+ # Thus here we can't just call perform unconditionally and must
100+ # check if the worker is supposed to be running, and only call
101+ # perform in that case.
102+ if worker && !worker . alive?
103+ perform
104+ end
83105
84106 buffer . push ( event )
85107 true
@@ -133,7 +155,7 @@ def flush
133155 private
134156
135157 def perform ( *events )
136- return if ! enabled? || forked ?
158+ return unless enabled?
137159
138160 if need_initial_event?
139161 started!
@@ -189,7 +211,9 @@ def started!
189211 # dependencies and send the new ones.
190212 # System tests demand only one instance of this event per
191213 # dependency.
192- send_event ( Event ::AppDependenciesLoaded . new ) if @dependency_collection && initial_event . class . eql? ( Telemetry ::Event ::AppStarted ) # standard:disable Style/ClassEqualityComparison:
214+ if @dependency_collection && initial_event . app_started?
215+ send_event ( Event ::AppDependenciesLoaded . new )
216+ end
193217
194218 true
195219 else
@@ -240,6 +264,27 @@ def disable_on_not_found!(response)
240264 disable!
241265 end
242266
267+ # Stop the worker after fork without sending closing event.
268+ # The closing event will be (or should be) sent by the worker
269+ # in the parent process.
270+ # Also, discard any accumulated events since they will be sent by
271+ # the parent.
272+ def after_fork
273+ # If telemetry is disabled, we still reset the state to avoid
274+ # having wrong state. It is possible that in the future telemetry
275+ # will be re-enabled after errors.
276+ initialize_state
277+ # In the child process, we get a new runtime_id.
278+ # As such we need to send AppStarted event.
279+ # In the parent process, the event may have been the
280+ # SynthAppClientConfigurationChange instead of AppStarted,
281+ # and in that case we need to convert it to the "regular"
282+ # AppStarted event.
283+ if @initial_event . is_a? ( Event ::SynthAppClientConfigurationChange )
284+ @initial_event . reset! # steep:ignore
285+ end
286+ end
287+
243288 # Deduplicate logs by counting the number of repeated occurrences of the same log
244289 # entry and replacing them with a single entry with the calculated `count` value.
245290 # Non-log events are unchanged.
0 commit comments