Skip to content

Commit 2b9b824

Browse files
committed
WIP: DataStreams processor and Kafka consumer updates
1 parent eac0317 commit 2b9b824

File tree

2 files changed

+18
-18
lines changed

2 files changed

+18
-18
lines changed

lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ def each_message(**kwargs, &block)
2020
proc do |message|
2121
# DSM: Create checkpoint for consumed message
2222
Datadog.logger.debug { "Kafka each_message: DSM enabled for topic #{message.topic}" }
23-
23+
2424
processor = Datadog.configuration.tracing.data_streams.processor
25-
25+
2626
# Extract pathway context from message headers if available
2727
headers = message.headers || {}
2828
processor.set_consume_checkpoint('kafka', message.topic) { |key| headers[key] }
29-
29+
3030
# Call the original block if provided
3131
block.call(message) if block_given?
3232
end
@@ -45,13 +45,13 @@ def each_batch(**kwargs, &block)
4545
proc do |batch|
4646
# DSM: Create checkpoint for consumed batch
4747
Datadog.logger.debug { "Kafka each_batch: DSM enabled for topic #{batch.topic}" }
48-
48+
4949
processor = Datadog.configuration.tracing.data_streams.processor
50-
50+
5151
# For batch processing, we don't have individual message headers
5252
# so we create a consume checkpoint without pathway context
5353
processor.set_consume_checkpoint('kafka', batch.topic)
54-
54+
5555
# Call the original block if provided
5656
block.call(batch) if block_given?
5757
end

spec/datadog/tracing/data_streams/processor_spec.rb

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -187,25 +187,25 @@
187187
it 'creates produce checkpoint with correct tags' do
188188
carrier_set = double('carrier_set')
189189
expect(carrier_set).to receive(:call).with('dd-pathway-ctx-base64', anything)
190-
190+
191191
result = processor.set_produce_checkpoint('kafka', 'orders-topic') { |key, value| carrier_set.call(key, value) }
192-
192+
193193
expect(result).to be_a(String)
194194
expect(result).not_to be_empty
195195
end
196196

197197
it 'works without block' do
198198
result = processor.set_produce_checkpoint('kafka', 'orders-topic')
199-
199+
200200
expect(result).to be_a(String)
201201
expect(result).not_to be_empty
202202
end
203203

204204
it 'returns nil when processor disabled' do
205205
processor.enabled = false
206-
206+
207207
result = processor.set_produce_checkpoint('kafka', 'orders-topic')
208-
208+
209209
expect(result).to be_nil
210210
end
211211
end
@@ -214,9 +214,9 @@
214214
it 'creates consume checkpoint with correct tags' do
215215
carrier_get = double('carrier_get')
216216
expect(carrier_get).to receive(:call).with('dd-pathway-ctx-base64').and_return(nil)
217-
217+
218218
result = processor.set_consume_checkpoint('kafka', 'orders-topic') { |key| carrier_get.call(key) }
219-
219+
220220
expect(result).to be_a(String)
221221
expect(result).not_to be_empty
222222
end
@@ -226,29 +226,29 @@
226226
carrier_get = double('carrier_get')
227227
expect(carrier_get).to receive(:call).with('dd-pathway-ctx-base64').and_return(encoded_context)
228228
expect(processor).to receive(:decode_pathway_b64).with(encoded_context)
229-
229+
230230
processor.set_consume_checkpoint('kafka', 'orders-topic') { |key| carrier_get.call(key) }
231231
end
232232

233233
it 'works without block' do
234234
result = processor.set_consume_checkpoint('kafka', 'orders-topic')
235-
235+
236236
expect(result).to be_a(String)
237237
expect(result).not_to be_empty
238238
end
239239

240240
it 'respects manual_checkpoint parameter' do
241241
result = processor.set_consume_checkpoint('kafka', 'orders-topic', manual_checkpoint: false)
242-
242+
243243
expect(result).to be_a(String)
244244
expect(result).not_to be_empty
245245
end
246246

247247
it 'returns nil when processor disabled' do
248248
processor.enabled = false
249-
249+
250250
result = processor.set_consume_checkpoint('kafka', 'orders-topic')
251-
251+
252252
expect(result).to be_nil
253253
end
254254
end

0 commit comments

Comments
 (0)