Skip to content

Commit 182ce81

Browse files
committed
Add buffering so that we don't block the main thread to do dsm work
1 parent 5342cd3 commit 182ce81

File tree

2 files changed

+126
-96
lines changed

2 files changed

+126
-96
lines changed

lib/datadog/data_streams/processor.rb

Lines changed: 117 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require_relative '../core/worker'
88
require_relative '../core/workers/polling'
99
require_relative '../core/ddsketch'
10+
require_relative '../core/buffer/cruby'
1011
require_relative '../core/utils/time'
1112

1213
module Datadog
@@ -22,9 +23,13 @@ class Processor < Core::Worker
2223

2324
PROPAGATION_KEY = 'dd-pathway-ctx-base64'
2425

26+
# Default buffer size for lock-free event queue
27+
# Set to handle high-throughput scenarios (e.g., 10k events/sec for 10s interval)
28+
DEFAULT_BUFFER_SIZE = 100_000
29+
2530
attr_reader :pathway_context, :buckets, :bucket_size_ns
2631

27-
def initialize(interval:, logger:, settings:, agent_settings:)
32+
def initialize(interval:, logger:, settings:, agent_settings:, buffer_size: DEFAULT_BUFFER_SIZE)
2833
raise UnsupportedError, 'DDSketch is not supported' unless Datadog::Core::DDSketch.supported?
2934

3035
@settings = settings
@@ -41,6 +46,7 @@ def initialize(interval:, logger:, settings:, agent_settings:)
4146
@buckets = {}
4247
@consumer_stats = []
4348
@stats_mutex = Mutex.new
49+
@event_buffer = Core::Buffer::CRuby.new(buffer_size)
4450

4551
super()
4652
self.loop_base_interval = interval
@@ -55,43 +61,15 @@ def initialize(interval:, logger:, settings:, agent_settings:)
5561
# @param now [Time] Timestamp
5662
# @return [Boolean] true if tracking succeeded
5763
def track_kafka_produce(topic, partition, offset, now)
58-
now_ns = (now.to_f * 1e9).to_i
59-
partition_key = "#{topic}:#{partition}"
60-
61-
@stats_mutex.synchronize do
62-
bucket_time_ns = now_ns - (now_ns % @bucket_size_ns)
63-
@buckets[bucket_time_ns] ||= create_bucket
64-
65-
@buckets[bucket_time_ns][:latest_produce_offsets][partition_key] = [
66-
offset,
67-
@buckets[bucket_time_ns][:latest_produce_offsets][partition_key] || 0
68-
].max
69-
end
70-
71-
true
72-
end
73-
74-
# Track Kafka offset commit for consumer lag monitoring
75-
# @param group [String] The consumer group name
76-
# @param topic [String] The Kafka topic name
77-
# @param partition [Integer] The partition number
78-
# @param offset [Integer] The committed offset
79-
# @param now [Time] Timestamp
80-
# @return [Boolean] true if tracking succeeded
81-
def track_kafka_commit(group, topic, partition, offset, now)
82-
now_ns = (now.to_f * 1e9).to_i
83-
consumer_key = "#{group}:#{topic}:#{partition}"
84-
85-
@stats_mutex.synchronize do
86-
bucket_time_ns = now_ns - (now_ns % @bucket_size_ns)
87-
@buckets[bucket_time_ns] ||= create_bucket
88-
89-
@buckets[bucket_time_ns][:latest_commit_offsets][consumer_key] = [
90-
offset,
91-
@buckets[bucket_time_ns][:latest_commit_offsets][consumer_key] || 0
92-
].max
93-
end
94-
64+
@event_buffer.push(
65+
{
66+
type: :kafka_produce,
67+
topic: topic,
68+
partition: partition,
69+
offset: offset,
70+
timestamp_ns: (now.to_f * 1e9).to_i
71+
}
72+
)
9573
true
9674
end
9775

@@ -102,15 +80,15 @@ def track_kafka_commit(group, topic, partition, offset, now)
10280
# @param now [Time] Timestamp
10381
# @return [Boolean] true if tracking succeeded
10482
def track_kafka_consume(topic, partition, offset, now)
105-
record_consumer_stats(
106-
topic: topic,
107-
partition: partition,
108-
offset: offset,
109-
timestamp: now
83+
@event_buffer.push(
84+
{
85+
type: :kafka_consume,
86+
topic: topic,
87+
partition: partition,
88+
offset: offset,
89+
timestamp: now
90+
}
11091
)
111-
112-
aggregate_consumer_stats_by_partition(topic, partition, offset, now)
113-
11492
true
11593
end
11694

@@ -168,12 +146,91 @@ def set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}, &b
168146

169147
# Called periodically by the worker to flush stats to the agent
170148
def perform
149+
process_events
171150
flush_stats
172151
true
173152
end
174153

175154
private
176155

156+
# Drain event buffer and apply updates to shared data structures
157+
# This runs in the background worker thread, not the critical path
158+
def process_events
159+
events = @event_buffer.pop
160+
return if events.empty?
161+
162+
@stats_mutex.synchronize do
163+
events.each do |event_obj|
164+
# Buffer stores Objects; we know they're hashes with symbol keys
165+
event = event_obj # : ::Hash[::Symbol, untyped]
166+
case event[:type]
167+
when :kafka_produce
168+
process_kafka_produce_event(event)
169+
when :kafka_consume
170+
process_kafka_consume_event(event)
171+
when :checkpoint
172+
process_checkpoint_event(event)
173+
end
174+
end
175+
end
176+
end
177+
178+
def process_kafka_produce_event(event)
179+
partition_key = "#{event[:topic]}:#{event[:partition]}"
180+
bucket_time_ns = event[:timestamp_ns] - (event[:timestamp_ns] % @bucket_size_ns)
181+
bucket = @buckets[bucket_time_ns] ||= create_bucket
182+
183+
bucket[:latest_produce_offsets][partition_key] = [
184+
event[:offset],
185+
bucket[:latest_produce_offsets][partition_key] || 0
186+
].max
187+
end
188+
189+
def process_kafka_consume_event(event)
190+
@consumer_stats << {
191+
topic: event[:topic],
192+
partition: event[:partition],
193+
offset: event[:offset],
194+
timestamp: event[:timestamp],
195+
timestamp_sec: event[:timestamp].to_f
196+
}
197+
198+
timestamp_ns = (event[:timestamp].to_f * 1e9).to_i
199+
bucket_time_ns = timestamp_ns - (timestamp_ns % @bucket_size_ns)
200+
@buckets[bucket_time_ns] ||= create_bucket
201+
202+
# Track offset gaps for lag detection
203+
partition_key = "#{event[:topic]}:#{event[:partition]}"
204+
@latest_consumer_offsets ||= {}
205+
previous_offset = @latest_consumer_offsets[partition_key] || 0
206+
207+
if event[:offset] > previous_offset + 1
208+
@consumer_lag_events ||= []
209+
@consumer_lag_events << {
210+
topic: event[:topic],
211+
partition: event[:partition],
212+
expected_offset: previous_offset + 1,
213+
actual_offset: event[:offset],
214+
gap_size: event[:offset] - previous_offset - 1,
215+
timestamp_sec: event[:timestamp].to_f
216+
}
217+
end
218+
219+
@latest_consumer_offsets[partition_key] = [event[:offset], previous_offset].max
220+
end
221+
222+
def process_checkpoint_event(event)
223+
now_ns = (event[:timestamp_sec] * 1e9).to_i
224+
bucket_time_ns = now_ns - (now_ns % @bucket_size_ns)
225+
bucket = @buckets[bucket_time_ns] ||= create_bucket
226+
227+
aggr_key = [event[:tags].join(','), event[:hash], event[:parent_hash]]
228+
stats = bucket[:pathway_stats][aggr_key] ||= create_pathway_stats
229+
230+
stats[:edge_latency].add(event[:edge_latency_sec])
231+
stats[:full_pathway_latency].add(event[:full_pathway_latency_sec])
232+
end
233+
177234
def encode_pathway_context
178235
@pathway_context.encode_b64
179236
end
@@ -331,59 +388,24 @@ def record_checkpoint_stats(
331388
hash:, parent_hash:, edge_latency_sec:, full_pathway_latency_sec:, payload_size:, tags:,
332389
timestamp_sec:
333390
)
334-
@stats_mutex.synchronize do
335-
now_ns = (timestamp_sec * 1e9).to_i
336-
bucket_time_ns = now_ns - (now_ns % @bucket_size_ns)
337-
338-
bucket = @buckets[bucket_time_ns] ||= create_bucket
339-
340-
aggr_key = [tags.join(','), hash, parent_hash]
341-
stats = bucket[:pathway_stats][aggr_key] ||= create_pathway_stats
342-
343-
stats[:edge_latency].add(edge_latency_sec)
344-
stats[:full_pathway_latency].add(full_pathway_latency_sec)
345-
end
346-
391+
@event_buffer.push(
392+
{
393+
type: :checkpoint,
394+
hash: hash,
395+
parent_hash: parent_hash,
396+
edge_latency_sec: edge_latency_sec,
397+
full_pathway_latency_sec: full_pathway_latency_sec,
398+
payload_size: payload_size,
399+
tags: tags,
400+
timestamp_sec: timestamp_sec
401+
}
402+
)
347403
true
348404
end
349405

350406
def record_consumer_stats(topic:, partition:, offset:, timestamp:)
351-
@stats_mutex.synchronize do
352-
@consumer_stats << {
353-
topic: topic,
354-
partition: partition,
355-
offset: offset,
356-
timestamp: timestamp,
357-
timestamp_sec: timestamp.to_f
358-
}
359-
360-
now_ns = (timestamp.to_f * 1e9).to_i
361-
bucket_time_ns = now_ns - (now_ns % @bucket_size_ns)
362-
@buckets[bucket_time_ns] ||= create_bucket
363-
end
364-
end
365-
366-
def aggregate_consumer_stats_by_partition(topic, partition, offset, timestamp)
367-
partition_key = "#{topic}:#{partition}"
368-
369-
@stats_mutex.synchronize do
370-
@latest_consumer_offsets ||= {}
371-
previous_offset = @latest_consumer_offsets[partition_key] || 0
372-
373-
if offset > previous_offset + 1
374-
@consumer_lag_events ||= []
375-
@consumer_lag_events << {
376-
topic: topic,
377-
partition: partition,
378-
expected_offset: previous_offset + 1,
379-
actual_offset: offset,
380-
gap_size: offset - previous_offset - 1,
381-
timestamp_sec: timestamp.to_f
382-
}
383-
end
384-
385-
@latest_consumer_offsets[partition_key] = [offset, previous_offset].max
386-
end
407+
# Already handled by track_kafka_consume pushing to buffer
408+
# This method kept for API compatibility but does nothing
387409
end
388410

389411
def send_stats_to_agent(payload)

sig/datadog/data_streams/processor.rbs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ module Datadog
55

66
class Processor < Core::Worker
77
PROPAGATION_KEY: ::String
8+
DEFAULT_BUFFER_SIZE: ::Integer
89

910
@pathway_context: PathwayContext
1011

@@ -16,6 +17,8 @@ module Datadog
1617

1718
@stats_mutex: ::Thread::Mutex
1819

20+
@event_buffer: Core::Buffer::CRuby
21+
1922
@transport: Transport::Stats::Transport?
2023

2124
@logger: Core::Logger
@@ -27,7 +30,7 @@ module Datadog
2730
include Core::Workers::Polling
2831
include Core::Workers::IntervalLoop
2932

30-
def initialize: (interval: ::Float, logger: Core::Logger, settings: Core::Configuration::Settings, agent_settings: Core::Configuration::AgentSettings) -> void
33+
def initialize: (interval: ::Float, logger: Core::Logger, settings: Core::Configuration::Settings, agent_settings: Core::Configuration::AgentSettings, ?buffer_size: ::Integer) -> void
3134
def track_kafka_produce: (::String topic, ::Integer partition, ::Integer offset, ::Time now) -> (nil | true)
3235
def track_kafka_commit: (::String group, ::String topic, ::Integer partition, ::Integer offset, ::Time now) -> (nil | true)
3336
def track_kafka_consume: (::String topic, ::Integer partition, ::Integer offset, ::Time now) -> (nil | true)
@@ -46,6 +49,11 @@ module Datadog
4649
def perform: () -> (bool | nil)
4750
def flush_stats: () -> void
4851

52+
def process_events: () -> void
53+
def process_kafka_produce_event: (::Hash[::Symbol, untyped] event) -> void
54+
def process_kafka_consume_event: (::Hash[::Symbol, untyped] event) -> void
55+
def process_checkpoint_event: (::Hash[::Symbol, untyped] event) -> void
56+
4957
def get_current_pathway: () -> PathwayContext?
5058
def get_current_context: () -> PathwayContext
5159

0 commit comments

Comments
 (0)