Skip to content

Commit 0d9c1ce

Browse files
committed
Move data_streams so that its not namespaced by tracing
1 parent 7cb55ff commit 0d9c1ce

File tree

25 files changed

+927
-976
lines changed

25 files changed

+927
-976
lines changed

lib/datadog/core/configuration.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ def shutdown!
191191
end
192192
end
193193

194+
# Access the Data Streams Monitoring processor
195+
# @return [Datadog::DataStreams::Processor, nil] The processor instance, or nil if not enabled
196+
def data_streams
197+
components.data_streams_processor
198+
end
199+
194200
protected
195201

196202
def components(allow_initialization: true)

lib/datadog/core/configuration/components.rb

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
require_relative '../crashtracking/component'
2020
require_relative '../environment/agent_info'
2121
require_relative '../process_discovery'
22+
require_relative '../../data_streams/processor'
2223

2324
module Datadog
2425
module Core
@@ -75,6 +76,12 @@ def build_crashtracker(settings, agent_settings, logger:)
7576

7677
Datadog::Core::Crashtracking::Component.build(settings, agent_settings, logger: logger)
7778
end
79+
80+
def build_data_streams_processor(settings)
81+
return unless settings.data_streams.enabled
82+
83+
Datadog::DataStreams::Processor.new
84+
end
7885
end
7986

8087
attr_reader \
@@ -90,7 +97,8 @@ def build_crashtracker(settings, agent_settings, logger:)
9097
:error_tracking,
9198
:dynamic_instrumentation,
9299
:appsec,
93-
:agent_info
100+
:agent_info,
101+
:data_streams_processor
94102

95103
def initialize(settings)
96104
@settings = settings
@@ -126,6 +134,7 @@ def initialize(settings)
126134
@appsec = Datadog::AppSec::Component.build_appsec_component(settings, telemetry: telemetry)
127135
@dynamic_instrumentation = Datadog::DI::Component.build(settings, agent_settings, @logger, telemetry: telemetry)
128136
@error_tracking = Datadog::ErrorTracking::Component.build(settings, @tracer, @logger)
137+
@data_streams_processor = self.class.build_data_streams_processor(settings)
129138
@environment_logger_extra[:dynamic_instrumentation_enabled] = !!@dynamic_instrumentation
130139

131140
# Configure non-privileged components.
@@ -154,9 +163,6 @@ def startup!(settings, old_state: nil)
154163
end
155164
end
156165

157-
# Start Data Streams Monitoring processor if enabled
158-
settings.tracing.data_streams.processor.start if settings.tracing.data_streams.enabled
159-
160166
if settings.remote.enabled && old_state&.remote_started?
161167
# The library was reconfigured and previously it already started
162168
# the remote component (i.e., it received at least one request
@@ -199,7 +205,7 @@ def shutdown!(replacement = nil)
199205
runtime_metrics.stop(true, close_metrics: false)
200206

201207
# Shutdown Data Streams Monitoring processor
202-
settings.tracing.data_streams.processor.stop(true, 1) if settings.tracing.data_streams.enabled
208+
data_streams_processor&.stop(true, 1)
203209

204210
# Shutdown the old metrics, unless they are still being used.
205211
# (e.g. custom Statsd instances.)

lib/datadog/core/configuration/settings.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
require_relative '../../profiling/ext'
1313

1414
require_relative '../../tracing/configuration/settings'
15+
require_relative '../../data_streams/configuration/settings'
1516

1617
module Datadog
1718
module Core
@@ -1030,6 +1031,7 @@ def initialize(*_)
10301031
# TODO: Tracing should manage its own settings.
10311032
# Keep this extension here for now to keep things working.
10321033
extend Datadog::Tracing::Configuration::Settings
1034+
extend Datadog::DataStreams::Configuration::Settings
10331035
end
10341036
# standard:enable Metrics/BlockLength
10351037
end
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../../core/environment/variable_helpers'
4+
require_relative '../processor'
5+
6+
module Datadog
7+
module DataStreams
8+
module Configuration
9+
# Configuration settings for Data Streams Monitoring.
10+
# @public_api
11+
module Settings
12+
def self.extended(base)
13+
base.class_eval do
14+
# Data Streams Monitoring configuration
15+
# @public_api
16+
settings :data_streams do
17+
# Whether Data Streams Monitoring is enabled. When enabled, the library will
18+
# collect and report data lineage information for messaging systems.
19+
#
20+
# @default `DD_DATA_STREAMS_ENABLED` environment variable, otherwise `false`.
21+
# @return [Boolean]
22+
option :enabled do |o|
23+
o.type :bool
24+
o.env 'DD_DATA_STREAMS_ENABLED'
25+
o.default false
26+
end
27+
end
28+
end
29+
end
30+
end
31+
end
32+
end
33+
end
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# frozen_string_literal: true
2+
3+
require 'stringio'
4+
require 'datadog/core/utils/base64'
5+
6+
module Datadog
7+
module DataStreams
8+
# Represents a pathway context for data streams monitoring
9+
class PathwayContext
10+
attr_accessor :hash,
11+
:pathway_start_sec,
12+
:current_edge_start_sec,
13+
:parent_hash,
14+
:previous_direction,
15+
:closest_opposite_direction_hash,
16+
:closest_opposite_direction_edge_start
17+
18+
def initialize(hash_value:, pathway_start_sec:, current_edge_start_sec:)
19+
@hash = hash_value
20+
@pathway_start_sec = pathway_start_sec
21+
@current_edge_start_sec = current_edge_start_sec
22+
@parent_hash = nil
23+
24+
@previous_direction = ''
25+
@closest_opposite_direction_hash = 0
26+
@closest_opposite_direction_edge_start = current_edge_start_sec
27+
end
28+
29+
def encode
30+
# Format:
31+
# - 8 bytes: hash value (little-endian)
32+
# - VarInt: pathway start time (milliseconds)
33+
# - VarInt: current edge start time (milliseconds)
34+
[@hash].pack('Q') <<
35+
encode_var_int_64((@pathway_start_sec * 1000).to_i) <<
36+
encode_var_int_64((@current_edge_start_sec * 1000).to_i)
37+
end
38+
39+
def encode_b64
40+
Core::Utils::Base64.strict_encode64(encode)
41+
end
42+
43+
# Decode pathway context from base64 encoded string
44+
def self.decode_b64(encoded_ctx)
45+
return nil unless encoded_ctx && !encoded_ctx.empty?
46+
47+
begin
48+
binary_data = Core::Utils::Base64.strict_decode64(encoded_ctx)
49+
decode(binary_data)
50+
rescue
51+
# Invalid base64 or decode error
52+
nil
53+
end
54+
end
55+
56+
# Decode pathway context from binary data
57+
def self.decode(binary_data)
58+
return nil unless binary_data && binary_data.bytesize >= 8
59+
60+
reader = StringIO.new(binary_data)
61+
62+
# Extract 8-byte hash (little-endian)
63+
hash_bytes = reader.read(8)
64+
return nil unless hash_bytes
65+
66+
hash_value = hash_bytes.unpack1('Q')
67+
68+
# Extract pathway start time (VarInt milliseconds)
69+
pathway_start_ms = decode_varint(reader)
70+
return nil unless pathway_start_ms
71+
72+
# Extract current edge start time (VarInt milliseconds)
73+
current_edge_start_ms = decode_varint(reader)
74+
return nil unless current_edge_start_ms
75+
76+
# Convert milliseconds to seconds
77+
pathway_start_sec = pathway_start_ms / 1000.0
78+
current_edge_start_sec = current_edge_start_ms / 1000.0
79+
80+
new(
81+
hash_value: hash_value,
82+
pathway_start_sec: pathway_start_sec,
83+
current_edge_start_sec: current_edge_start_sec
84+
)
85+
rescue EOFError
86+
# Not enough data in binary stream
87+
nil
88+
end
89+
90+
private
91+
92+
def encode_var_int_64(value)
93+
bytes = []
94+
while value >= 0x80
95+
bytes << ((value & 0x7F) | 0x80)
96+
value >>= 7
97+
end
98+
bytes << value
99+
bytes.pack('C*')
100+
end
101+
102+
# Decode VarInt from IO stream using Ruby-idiomatic approach
103+
#
104+
# VarInt format: Each byte uses 7 bits for data, 1 bit for continuation
105+
# - High bit set = more bytes follow
106+
# - High bit clear = final byte
107+
# - Data bits accumulated in little-endian order
108+
def self.decode_varint(io)
109+
value = 0
110+
shift = 0
111+
112+
loop do
113+
byte = io.readbyte
114+
115+
# Add this byte's 7 data bits to our value
116+
value |= (byte & 0x7F) << shift
117+
118+
# If high bit is clear, we're done
119+
return value unless (byte & 0x80).nonzero?
120+
121+
shift += 7
122+
123+
# Safety: prevent infinite decoding
124+
raise EOFError if shift >= 64
125+
end
126+
rescue EOFError
127+
nil
128+
end
129+
private_class_method :decode_varint
130+
end
131+
end
132+
end

0 commit comments

Comments
 (0)