@@ -17,22 +17,22 @@ module InstanceMethods
1717 def each_message ( **kwargs , &block )
1818 # Wrap the block to add DSM processing for each message
1919 wrapped_block = if Datadog . configuration . tracing . data_streams . enabled
20- proc do |message |
21- # DSM: Create checkpoint for consumed message
22- Datadog . logger . debug { "Kafka each_message: DSM enabled for topic #{ message . topic } " }
20+ proc do |message |
21+ # DSM: Create checkpoint for consumed message
22+ Datadog . logger . debug { "Kafka each_message: DSM enabled for topic #{ message . topic } " }
2323
24- processor = Datadog . configuration . tracing . data_streams . processor
24+ processor = Datadog . configuration . tracing . data_streams . processor
2525
26- # Extract pathway context from message headers if available
27- headers = message . headers || { }
28- processor . set_consume_checkpoint ( 'kafka' , message . topic ) { |key | headers [ key ] }
26+ # Extract pathway context from message headers if available
27+ headers = message . headers || { }
28+ processor . set_consume_checkpoint ( 'kafka' , message . topic ) { |key | headers [ key ] }
2929
30- # Call the original block if provided
31- block . call ( message ) if block_given?
32- end
33- else
34- block
35- end
30+ # Call the original block if provided
31+ yield ( message ) if block
32+ end
33+ else
34+ block
35+ end
3636
3737 # Call the original method with wrapped block
3838 super ( **kwargs , &wrapped_block )
@@ -42,22 +42,22 @@ def each_message(**kwargs, &block)
4242 def each_batch ( **kwargs , &block )
4343 # Wrap the block to add DSM processing for each batch
4444 wrapped_block = if Datadog . configuration . tracing . data_streams . enabled
45- proc do |batch |
46- # DSM: Create checkpoint for consumed batch
47- Datadog . logger . debug { "Kafka each_batch: DSM enabled for topic #{ batch . topic } " }
45+ proc do |batch |
46+ # DSM: Create checkpoint for consumed batch
47+ Datadog . logger . debug { "Kafka each_batch: DSM enabled for topic #{ batch . topic } " }
4848
49- processor = Datadog . configuration . tracing . data_streams . processor
49+ processor = Datadog . configuration . tracing . data_streams . processor
5050
51- # For batch processing, we don't have individual message headers
52- # so we create a consume checkpoint without pathway context
53- processor . set_consume_checkpoint ( 'kafka' , batch . topic )
51+ # For batch processing, we don't have individual message headers
52+ # so we create a consume checkpoint without pathway context
53+ processor . set_consume_checkpoint ( 'kafka' , batch . topic )
5454
55- # Call the original block if provided
56- block . call ( batch ) if block_given?
57- end
58- else
59- block
60- end
55+ # Call the original block if provided
56+ yield ( batch ) if block
57+ end
58+ else
59+ block
60+ end
6161
6262 # Call the original method with wrapped block
6363 super ( **kwargs , &wrapped_block )
0 commit comments