Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/datadog/core/buffer/random.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ def closed?
@closed
end

# Discards the contents of the buffer.
def clear
@items = []
nil
end

protected

# Segment items into two segments: underflow and overflow.
Expand Down
4 changes: 4 additions & 0 deletions lib/datadog/core/buffer/thread_safe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def close
synchronize { super }
end

def clear
synchronize { super }
end

def synchronize(&block)
@mutex.synchronize(&block)
end
Expand Down
29 changes: 19 additions & 10 deletions lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@
module Datadog
module Core
module Telemetry
# Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle.
# Note: Telemetry does not spawn its worker thread in fork processes, thus no telemetry is sent in forked processes.
# Telemetry entry point, coordinates sending telemetry events at
# various points in application lifecycle.
#
# @api private
class Component
ENDPOINT_COLLECTION_MESSAGE_LIMIT = 300

attr_reader :enabled, :logger, :transport, :worker
attr_reader :enabled
attr_reader :logger
attr_reader :transport
attr_reader :worker
attr_reader :settings
attr_reader :agent_settings

# Alias for consistency with other components.
# TODO Remove +enabled+ method
alias_method :enabled?, :enabled

include Core::Utils::Forking
include Telemetry::Logging
Expand Down Expand Up @@ -110,7 +119,7 @@ def disable!
end

def start(initial_event_is_change = false, components:)
return if !@enabled
return unless enabled?

initial_event = if initial_event_is_change
Event::SynthAppClientConfigurationChange.new(
Expand All @@ -136,19 +145,19 @@ def shutdown!
end

def emit_closing!
return if !@enabled || forked?
return unless enabled?

@worker.enqueue(Event::AppClosing.new)
end

def integrations_change!
return if !@enabled || forked?
return unless enabled?

@worker.enqueue(Event::AppIntegrationsChange.new)
end

def log!(event)
return if !@enabled || forked? || !@log_collection_enabled
return unless enabled? && @log_collection_enabled

@worker.enqueue(event)
end
Expand All @@ -159,21 +168,21 @@ def log!(event)
#
# @api private
def flush
return if !@enabled || forked?
return unless enabled?

@worker.flush
end

# Report configuration changes caused by Remote Configuration.
def client_configuration_change!(changes)
return if !@enabled || forked?
return unless enabled?

@worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config'))
end

# Report application endpoints
def app_endpoints_loaded(endpoints, page_size: ENDPOINT_COLLECTION_MESSAGE_LIMIT)
return if !@enabled || forked?
return unless enabled?

endpoints.each_slice(page_size).with_index do |endpoints_slice, i|
@worker.enqueue(Event::AppEndpointsLoaded.new(endpoints_slice, is_first: i.zero?))
Expand Down
15 changes: 15 additions & 0 deletions lib/datadog/core/telemetry/event/app_started.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ class AppStarted < Base
def initialize(components:)
# To not hold a reference to the component tree, generate
# the event payload here in the constructor.
#
# Important: do not store data that contains (or is derived from)
# the runtime_id oor sequence numbers.
# This event is reused when a process forks, but in the
# child process the runtime_id would be different and sequence
# number would obviously also be different.
@configuration = configuration(components.settings, components.agent_settings)
@install_signature = install_signature(components.settings)
@products = products(components)
Expand All @@ -30,6 +36,15 @@ def payload
}
end

# Whether the event is actually the app-started event.
# For the app-started event we follow up by sending
# app-dependencies-loaded, if the event is
# app-client-configuration-change we don't send
# app-dependencies-loaded.
def app_started?
true
end

private

def products(components)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,36 @@ module Event
# and app-closing events.
class SynthAppClientConfigurationChange < AppStarted
def type
'app-client-configuration-change'
if reset?
super
else
'app-client-configuration-change'
end
end

def payload
{
configuration: @configuration,
}
if reset?
super
else
{
configuration: @configuration,
}
end
end

def app_started?
reset?
end

# Revert this event to a "regular" AppStarted event.
#
# Used in after_fork to send the AppStarted event in child processes.
def reset!
@reset = true
end

def reset?
!!@reset
end
end
end
Expand Down
54 changes: 48 additions & 6 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
module Datadog
module Core
module Telemetry
# Accumulates events and sends them to the API at a regular interval, including heartbeat event.
# Accumulates events and sends them to the API at a regular interval,
# including heartbeat event.
#
# @api private
class Worker
include Core::Workers::Queue
include Core::Workers::Polling
Expand Down Expand Up @@ -40,7 +43,7 @@ def initialize(
self.enabled = enabled
# Workers::IntervalLoop settings
self.loop_base_interval = metrics_aggregation_interval_seconds
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART

@shutdown_timeout = shutdown_timeout
@buffer_size = buffer_size
Expand All @@ -53,12 +56,13 @@ def initialize(
attr_reader :logger
attr_reader :initial_event_once
attr_reader :initial_event
attr_reader :emitter

# Returns true if worker thread is successfully started,
# false if worker thread was not started but telemetry is enabled,
# nil if telemetry is disabled.
def start(initial_event)
return if !enabled? || forked?
return unless enabled?

@initial_event = initial_event

Expand All @@ -79,7 +83,21 @@ def stop(force_stop = false, timeout = @shutdown_timeout)
# for not enqueueing event (presently) is that telemetry is disabled
# altogether, and in this case other methods return nil.
def enqueue(event)
return if !enabled? || forked?
return unless enabled?

# Start the worker if needed, including in forked children.
# Needs to be done before pushing to buffer since perform
# may invoke after_fork handler which resets the buffer.
#
# Telemetry is special in that it permits events to be submitted
# to the worker with the worker not running, and the worker is
# explicitly started later (to maintain proper initialization order).
# Thus here we can't just call perform unconditionally and must
# check if the worker is supposed to be running, and only call
# perform in that case.
if worker && !worker.alive?
perform
end

buffer.push(event)
true
Expand Down Expand Up @@ -133,7 +151,7 @@ def flush
private

def perform(*events)
return if !enabled? || forked?
return unless enabled?

if need_initial_event?
started!
Expand Down Expand Up @@ -189,7 +207,9 @@ def started!
# dependencies and send the new ones.
# System tests demand only one instance of this event per
# dependency.
send_event(Event::AppDependenciesLoaded.new) if @dependency_collection && initial_event.class.eql?(Telemetry::Event::AppStarted) # standard:disable Style/ClassEqualityComparison:
if @dependency_collection && initial_event.app_started?
send_event(Event::AppDependenciesLoaded.new)
end

true
else
Expand Down Expand Up @@ -240,6 +260,28 @@ def disable_on_not_found!(response)
disable!
end

# Stop the worker after fork without sending closing event.
# The closing event will be (or should be) sent by the worker
# in the parent process.
# Also, discard any accumulated events since they will be sent by
# the parent.
def after_fork
# If telemetry is disabled, we still reset the state to avoid
# having wrong state. It is possible that in the future telemetry
# will be re-enabled after errors.
buffer.clear
initial_event_once.reset
# In the child process, we get a new runtime_id.
# As such we need to send AppStarted event.
# In the parent process, the event may have been the
# SynthAppClientConfigurationChange instead of AppStarted,
# and in that case we need to convert it to the "regular"
# AppStarted event.
if @initial_event.is_a?(Event::SynthAppClientConfigurationChange)
@initial_event.reset! # steep:ignore
end
end

# Deduplicate logs by counting the number of repeated occurrences of the same log
# entry and replacing them with a single entry with the calculated `count` value.
# Non-log events are unchanged.
Expand Down
4 changes: 1 addition & 3 deletions lib/datadog/core/utils/only_once.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def ran?
@mutex.synchronize { @ran_once }
end

private

def reset_ran_once_state_for_tests
def reset
@mutex.synchronize { @ran_once = false }
end
Comment on lines -34 to 36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right way to go -- I think this is bending the OnlyOnce in a really weird direction generally only to support a specific use-case of one of its users.

Rather than resetting the only once, consider creating a new instance?

end
Expand Down
16 changes: 8 additions & 8 deletions lib/datadog/core/utils/only_once_successful.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ def failed?
@mutex.synchronize { @ran_once && @failed }
end

def reset
@mutex.synchronize do
@ran_once = false
@failed = false
@retries = 0
end
end

private

def check_limit!
Expand All @@ -77,14 +85,6 @@ def check_limit!
def limited?
[email protected]?
end

def reset_ran_once_state_for_tests
@mutex.synchronize do
@ran_once = false
@failed = false
@retries = 0
end
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/buffer/random.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ module Datadog
def push: (Object) -> Object?

def replace!: (Object) -> Object?

def clear: -> void
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ module Datadog
def self.build: (untyped settings, Datadog::Core::Configuration::AgentSettings agent_settings, Datadog::Core::Logger logger) -> Component

def initialize: (logger: Core::Logger, settings: untyped, agent_settings: Datadog::Core::Configuration::AgentSettings, enabled: true | false) -> void

def enabled?: -> bool

def disable!: () -> void

Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/event/app_started.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ module Datadog
def type: () -> "app-started"

def payload: () -> { products: untyped, configuration: untyped, install_signature: untyped }

def app_started?: -> bool

private

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ module Datadog
module Telemetry
module Event
class SynthAppClientConfigurationChange < AppStarted
def type: () -> "app-client-configuration-change"
def type: -> ("app-client-configuration-change" | "app-started")

def payload: () -> { configuration: untyped }
def payload: () -> { ?products: untyped, configuration: untyped, ?install_signature: untyped }

def reset?: -> bool

def reset!: -> void
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module Datadog
@logger: ::Logger

attr_reader logger: ::Logger
attr_reader initial_event: Telemetry::Event::Base
attr_reader initial_event: Telemetry::Event::AppStarted
attr_reader initial_event_once: Datadog::Core::Utils::OnlyOnceSuccessful

def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Float | Integer, ?buffer_size: Integer, dependency_collection: bool, logger: ::Logger) -> void
Expand Down
4 changes: 1 addition & 3 deletions sig/datadog/core/utils/only_once.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ module Datadog

def ran?: () -> untyped

private

def reset_ran_once_state_for_tests: () -> untyped
def reset: -> void
end
end
end
Expand Down
Loading