@@ -51,9 +51,7 @@ def initialize(interval: nil)
5151
5252 # Auto-start the processor by calling perform once
5353 # This kicks off the async polling loop via Workers::Async::Thread
54- Datadog . logger . debug ( "[DSM Processor] Initialization complete, interval=#{ interval } s, starting async worker..." )
5554 perform
56- Datadog . logger . debug ( "[DSM Processor] perform called, started=#{ started? rescue 'N/A' } , running=#{ running? rescue 'N/A' } " )
5755 end
5856
5957 # Track Kafka produce offset for lag monitoring
@@ -152,7 +150,6 @@ def track_kafka_consume(topic, partition, offset, now_sec = nil)
152150 # @yield [key, value] Block to inject context into carrier
153151 # @return [String, nil] Base64 encoded pathway context or nil if disabled
154152 def set_produce_checkpoint ( type :, destination :, manual_checkpoint : true , tags : [ ] , &block )
155- Datadog . logger . debug ( "[DSM Processor] set_produce_checkpoint called: type=#{ type } , destination=#{ destination } , enabled=#{ @enabled } " )
156153 return nil unless @enabled
157154
158155 checkpoint_tags = [ "type:#{ type } " , "topic:#{ destination } " , 'direction:out' ]
@@ -161,7 +158,6 @@ def set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: [
161158
162159 span = Datadog ::Tracing . active_span
163160 pathway = set_checkpoint ( checkpoint_tags , nil , 0 , span )
164- Datadog . logger . debug ( "[DSM Processor] set_produce_checkpoint created pathway: #{ pathway ? 'yes' : 'no' } " )
165161
166162 yield ( PROPAGATION_KEY , pathway ) if pathway && block
167163
@@ -205,12 +201,9 @@ def stop(force_stop = false, timeout = 1)
205201
206202 # Called periodically by the worker to flush stats to the agent
207203 def perform
208- Datadog . logger . debug ( "[DSM Processor] perform called, enabled=#{ @enabled } " )
209204 return unless @enabled
210205
211- Datadog . logger . debug ( "[DSM Processor] flushing stats" )
212206 flush_stats
213- Datadog . logger . debug ( "[DSM Processor] flush complete" )
214207 true
215208 end
216209
@@ -306,12 +299,7 @@ def decode_pathway_b64(encoded_ctx)
306299 def flush_stats
307300 @stats_mutex . synchronize do
308301 # Check if we have data to send
309- if @buckets . empty? && @consumer_stats . empty?
310- Datadog . logger . debug ( "[DSM Processor] flush_stats: no data to send" )
311- return
312- end
313-
314- Datadog . logger . debug ( "[DSM Processor] flush_stats: sending data (#{ @buckets . size } buckets, #{ @consumer_stats . size } consumer stats)" )
302+ return if @buckets . empty? && @consumer_stats . empty?
315303
316304 # Build payload in agent format
317305 stats_buckets = serialize_buckets
@@ -525,8 +513,6 @@ def send_dsm_payload(data, headers)
525513 # Create HTTP request to DSM endpoint
526514 agent_host = Datadog . configuration . agent . host || 'localhost'
527515 agent_port = Datadog . configuration . agent . port || 8126
528- Datadog . logger . debug ( "[DSM Processor] Sending to agent: host=#{ agent_host } , port=#{ agent_port } " )
529-
530516 uri = URI ( "http://#{ agent_host } :#{ agent_port } /v0.1/pipeline_stats" )
531517
532518 http = Net ::HTTP . new ( uri . host , uri . port )
0 commit comments