Skip to content

Commit 0808070

Browse files
authored
DI: notify about failed probe condition evaluation, with rate limiting (#4979)
1 parent 9819144 commit 0808070

15 files changed

+584
-157
lines changed

benchmarks/di_instrument.rb

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
# Need to require datadog/di explicitly because dynamic instrumentation is not
4444
# currently integrated into the Ruby tracer due to being under development.
4545
require 'datadog/di'
46+
begin
47+
require 'datadog/di/proc_responder'
48+
rescue LoadError
49+
# Old tree
50+
end
4651

4752
class DIInstrumentBenchmark
4853
class Target
@@ -106,9 +111,15 @@ def run_benchmark
106111
calls = 0
107112
probe = Datadog::DI::Probe.new(id: 1, type: :log,
108113
type_name: 'DIInstrumentBenchmark::Target', method_name: 'test_method')
109-
rv = instrumenter.hook_method(probe) do
114+
executed_proc = lambda do |context|
110115
calls += 1
111116
end
117+
if defined?(Datadog::DI::ProcResponder)
118+
responder = Datadog::DI::ProcResponder.new(executed_proc)
119+
rv = instrumenter.hook_method(probe, responder)
120+
else
121+
rv = instrumenter.hook_method(probe, &executed_proc)
122+
end
112123
unless rv
113124
raise "Method probe was not successfully installed"
114125
end
@@ -149,8 +160,11 @@ def run_benchmark
149160
calls = 0
150161
probe = Datadog::DI::Probe.new(id: 1, type: :log,
151162
file: file, line_no: line + 1)
152-
rv = instrumenter.hook_line(probe) do
153-
calls += 1
163+
if defined?(Datadog::DI::ProcResponder)
164+
responder = Datadog::DI::ProcResponder.new(executed_proc)
165+
rv = instrumenter.hook_line(probe, responder)
166+
else
167+
rv = instrumenter.hook_line(probe, &executed_proc)
154168
end
155169
unless rv
156170
raise "Line probe (in method) was not successfully installed"
@@ -198,8 +212,10 @@ def run_benchmark
198212
calls = 0
199213
probe = Datadog::DI::Probe.new(id: 1, type: :log,
200214
file: targeted_file, line_no: targeted_line + 1)
201-
rv = instrumenter.hook_line(probe) do
202-
calls += 1
215+
rv = if defined?(Datadog::DI::ProcResponder)
216+
instrumenter.hook_line(probe, responder)
217+
else
218+
instrumenter.hook_line(probe, &executed_proc)
203219
end
204220
unless rv
205221
raise "Line probe (targeted) was not successfully installed"

lib/datadog/di/instrumenter.rb

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,7 @@ def initialize(settings, serializer, logger, code_tracker: nil, telemetry: nil)
8989
# from the method but from outside of the method).
9090
Location = Struct.new(:path, :lineno, :label)
9191

92-
def hook_method(probe, &block)
93-
unless block
94-
raise ArgumentError, 'block is required'
95-
end
96-
92+
def hook_method(probe, responder)
9793
lock.synchronize do
9894
if probe.instrumentation_module
9995
# Already instrumented, warn?
@@ -130,10 +126,34 @@ def hook_method(probe, &block)
130126
caller_locations: caller_locations,
131127
)
132128
continue = condition.satisfied?(context)
133-
rescue
134-
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
129+
rescue => exc
130+
# Evaluation error exception can be raised for "expected"
131+
# errors, we probably need another setting to control whether
132+
# these exceptions are propagated.
133+
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions &&
134+
!exc.is_a?(DI::Error::ExpressionEvaluationError)
135+
136+
if context
137+
# We want to report evaluation errors for conditions
138+
# as probe snapshots. However, if we failed to create
139+
# the context, we won't be able to report anything as
140+
# the probe notifier builder requires a context.
141+
begin
142+
responder.probe_condition_evaluation_failed_callback(context, exc)
143+
rescue
144+
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
145+
146+
# TODO log / report via telemetry?
147+
end
148+
else
149+
_ = 42 # stop standard from wrecking this code
150+
151+
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
152+
153+
# TODO log / report via telemetry?
154+
# If execution gets here, there is probably a bug in the tracer.
155+
end
135156

136-
# TODO log / report via telemetry?
137157
continue = false
138158
end
139159
end
@@ -195,8 +215,7 @@ def hook_method(probe, &block)
195215
caller_locations: caller_locs,
196216
return_value: rv, duration: duration, exception: exc,)
197217

198-
# & is to stop steep complaints, block is always present here.
199-
block&.call(context)
218+
responder.probe_executed_callback(context)
200219
if exc
201220
raise exc
202221
else
@@ -258,11 +277,7 @@ def unhook_method(probe)
258277
# not for eval'd code, unless the eval'd code is associated with
259278
# a file name and client invokes this method with the correct
260279
# file name for the eval'd code.
261-
def hook_line(probe, &block)
262-
unless block
263-
raise ArgumentError, 'No block given to hook_line'
264-
end
265-
280+
def hook_line(probe, responder)
266281
lock.synchronize do
267282
if probe.instrumentation_trace_point
268283
# Already instrumented, warn?
@@ -367,14 +382,44 @@ def hook_line(probe, &block)
367382

368383
if continue
369384
if condition = probe.condition
370-
context = Context.new(
371-
locals: Instrumenter.get_local_variables(tp),
372-
target_self: tp.self,
373-
probe: probe, settings: settings, serializer: serializer,
374-
path: tp.path,
375-
caller_locations: caller_locations,
376-
)
377-
continue = condition.satisfied?(context)
385+
begin
386+
context = Context.new(
387+
locals: Instrumenter.get_local_variables(tp),
388+
target_self: tp.self,
389+
probe: probe, settings: settings, serializer: serializer,
390+
path: tp.path,
391+
caller_locations: caller_locations,
392+
)
393+
continue = condition.satisfied?(context)
394+
rescue => exc
395+
# Evaluation error exception can be raised for "expected"
396+
# errors, we probably need another setting to control whether
397+
# these exceptions are propagated.
398+
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions &&
399+
!exc.is_a?(DI::Error::ExpressionEvaluationError)
400+
401+
continue = false
402+
if context
403+
# We want to report evaluation errors for conditions
404+
# as probe snapshots. However, if we failed to create
405+
# the context, we won't be able to report anything as
406+
# the probe notifier builder requires a context.
407+
begin
408+
responder.probe_condition_evaluation_failed_callback(context, condition, exc)
409+
rescue
410+
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
411+
412+
# TODO log / report via telemetry?
413+
end
414+
else
415+
_ = 42 # stop standard from wrecking this code
416+
417+
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
418+
419+
# TODO log / report via telemetry?
420+
# If execution gets here, there is probably a bug in the tracer.
421+
end
422+
end
378423
end
379424
end
380425

@@ -393,8 +438,7 @@ def hook_line(probe, &block)
393438
caller_locations: caller_locations,
394439
)
395440

396-
# & is to stop steep complaints, block is always present here.
397-
block&.call(context)
441+
responder.probe_executed_callback(context)
398442
end
399443
rescue => exc
400444
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
@@ -445,11 +489,11 @@ def unhook_line(probe)
445489
end
446490
end
447491

448-
def hook(probe, &block)
492+
def hook(probe, responder)
449493
if probe.method?
450-
hook_method(probe, &block)
494+
hook_method(probe, responder)
451495
elsif probe.line?
452-
hook_line(probe, &block)
496+
hook_line(probe, responder)
453497
else
454498
# TODO add test coverage for this path
455499
logger.debug { "di: unknown probe type to hook: #{probe}" }

lib/datadog/di/probe.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ def initialize(id:, type:,
8686
@rate_limit = rate_limit || (@capture_snapshot ? 1 : 5000)
8787
@rate_limiter = Datadog::Core::TokenBucket.new(@rate_limit)
8888

89+
# At most one report per second.
90+
# We create the rate limiter here even though it may never be used,
91+
# to avoid having to synchronize the creation since method probes
92+
# can be executed on multiple threads concurrently (even if line
93+
# probes are never executed concurrently since those are done in a
94+
# trace point).
95+
if condition
96+
@condition_evaluation_failed_rate_limiter = Datadog::Core::TokenBucket.new(1)
97+
end
98+
8999
@emitting_notified = false
90100
end
91101

@@ -115,6 +125,16 @@ def initialize(id:, type:,
115125
# Rate limiter object. For internal DI use only.
116126
attr_reader :rate_limiter
117127

128+
# Rate limiter object for sending snapshots with evaluation errors
129+
# for when probe condition evaluation fails.
130+
# This rate limit is separate from the "base" rate limit for the probe
131+
# because when the condition evaluation succeeds we want the "base"
132+
# rate limit applied, not tainted by any evaluation errors
133+
# (for example, the condition can be highly selective, and when it
134+
# does not hold the evaluation may fail - we don't want to use up the
135+
# probe rate limit for the errors).
136+
attr_reader :condition_evaluation_failed_rate_limiter
137+
118138
def capture_snapshot?
119139
@capture_snapshot
120140
end

lib/datadog/di/probe_manager.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def add_probe(probe)
101101
end
102102

103103
begin
104-
instrumenter.hook(probe, &method(:probe_executed_callback))
104+
instrumenter.hook(probe, self)
105105

106106
@installed_probes[probe.id] = probe
107107
payload = probe_notification_builder.build_installed(probe)
@@ -184,7 +184,7 @@ def remove_other_probes(probe_ids)
184184
begin
185185
# TODO is it OK to hook from trace point handler?
186186
# TODO the class is now defined, but can hooking still fail?
187-
instrumenter.hook(probe, &method(:probe_executed_callback))
187+
instrumenter.hook(probe, self)
188188
@pending_probes.delete(probe.id)
189189
break
190190
rescue Error::DITargetNotDefined
@@ -242,6 +242,14 @@ def probe_executed_callback(context)
242242
probe_notifier_worker.add_snapshot(payload)
243243
end
244244

245+
def probe_condition_evaluation_failed_callback(context, expr, exc)
246+
probe = context.probe
247+
if probe.condition_evaluation_failed_rate_limiter&.allow?
248+
payload = probe_notification_builder.build_condition_evaluation_failed(context, expr, exc)
249+
probe_notifier_worker.add_snapshot(payload)
250+
end
251+
end
252+
245253
# Class/module definition trace point (:end type).
246254
# Used to install hooks when the target classes/modules aren't yet
247255
# defined when the hook request is received.

lib/datadog/di/probe_notification_builder.rb

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,32 @@ def build_snapshot(context)
8787
end
8888
end
8989

90+
message = nil
91+
evaluation_errors = []
92+
if segments = probe.template_segments
93+
message, evaluation_errors = evaluate_template(segments, context)
94+
end
95+
build_snapshot_base(context,
96+
evaluation_errors: evaluation_errors, message: message,
97+
captures: captures)
98+
end
99+
100+
def build_condition_evaluation_failed(context, expression, exception)
101+
error = {
102+
message: "#{exception.class}: #{exception}",
103+
expr: expression.dsl_expr,
104+
}
105+
build_snapshot_base(context, evaluation_errors: [error])
106+
end
107+
108+
private
109+
110+
def build_snapshot_base(context, evaluation_errors: [], captures: nil, message: nil)
111+
probe = context.probe
112+
113+
timestamp = timestamp_now
114+
duration = context.duration
115+
90116
location = if probe.line?
91117
{
92118
file: context.path,
@@ -103,13 +129,6 @@ def build_snapshot(context)
103129
format_caller_locations(caller_locations)
104130
end
105131

106-
timestamp = timestamp_now
107-
message = nil
108-
evaluation_errors = []
109-
if segments = probe.template_segments
110-
message, evaluation_errors = evaluate_template(segments, context)
111-
end
112-
duration = context.duration
113132
{
114133
service: settings.service,
115134
"debugger.snapshot": {
@@ -132,7 +151,7 @@ def build_snapshot(context)
132151
host: nil,
133152
logger: {
134153
name: probe.file,
135-
method: probe.method_name || 'no_method',
154+
method: probe.method_name,
136155
thread_name: Thread.current.name,
137156
# Dynamic instrumentation currently does not need thread_id for
138157
# anything. It can be sent if a customer requests it at which point
@@ -150,8 +169,6 @@ def build_snapshot(context)
150169
}
151170
end
152171

153-
private
154-
155172
def build_status(probe, message:, status:)
156173
{
157174
service: settings.service,

lib/datadog/di/proc_responder.rb

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# frozen_string_literal: true
2+
3+
module Datadog
4+
module DI
5+
# An adapter to convert procs to responders.
6+
#
7+
# Used in test suite and benchmarks.
8+
#
9+
# @api private
10+
class ProcResponder
11+
def initialize(executed_proc, failed_proc = nil)
12+
@executed_proc = executed_proc
13+
@failed_proc = failed_proc
14+
end
15+
16+
attr_reader :executed_proc
17+
attr_reader :failed_proc
18+
19+
def probe_executed_callback(context)
20+
executed_proc.call(context)
21+
end
22+
23+
def probe_condition_evaluation_failed_callback(context, exc)
24+
if failed_proc.nil?
25+
raise NotImplementedError, "Failed proc not provided"
26+
end
27+
28+
failed_proc.call(context, exc)
29+
end
30+
end
31+
end
32+
end

sig/datadog/di/instrumenter.rbs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ module Datadog
3333

3434
attr_reader telemetry: Core::Telemetry::Component?
3535

36-
def hook_method: (Probe probe) ?{ (?) -> untyped } -> void
36+
def hook_method: (Probe probe, untyped responder) -> void
3737

3838
def unhook_method: (Probe probe) -> void
39-
def hook_line: (Probe probe) ?{ (?) -> untyped } -> void
39+
def hook_line: (Probe probe, untyped responder) -> void
4040

4141
def unhook_line: (Probe probe) -> void
4242

43-
def hook: (Probe probe) { (?) -> untyped } -> void
43+
def hook: (Probe probe, untyped responder) -> void
4444

4545
def unhook: (Probe probe) -> void
4646

0 commit comments

Comments
 (0)