Skip to content

Commit 1e262cb

Browse files
committed
Rework Reporter and the data model
1 parent f50dc8c commit 1e262cb

File tree

15 files changed

+464
-338
lines changed

15 files changed

+464
-338
lines changed

lib/datadog/open_feature/component.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,13 @@ def initialize(settings, agent_settings, logger:, telemetry:)
3434

3535
transport = Transport::HTTP.exposures(agent_settings: agent_settings, logger: logger)
3636
@worker = Exposures::Worker.new(transport: transport, logger: logger)
37-
@reporter = Exposures::Reporter.new(worker: @worker, logger: logger)
37+
@reporter = Exposures::Reporter.new(@worker, telemetry: telemetry, logger: logger)
3838
@engine = EvaluationEngine.new(@reporter, telemetry: telemetry, logger: logger)
3939
end
4040

4141
def shutdown!
42-
@reporter&.flush
43-
return unless defined?(@worker) && @worker
44-
45-
@worker.flush
46-
@worker.stop(true)
42+
@worker&.flush
43+
@worker&.stop(true)
4744
end
4845
end
4946
end

lib/datadog/open_feature/exposures.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ module Exposures
77
end
88
end
99

10-
require_relative 'exposures/event'
1110
require_relative 'exposures/context'
1211
require_relative 'exposures/batch'
1312
require_relative 'exposures/buffer'

lib/datadog/open_feature/exposures/deduplicator.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'zlib'
4-
5-
require 'datadog/core/utils/lru_cache'
4+
require_relative '../../core/utils/lru_cache'
65

76
module Datadog
87
module OpenFeature
@@ -15,9 +14,9 @@ def initialize(limit: DEFAULT_CACHE_LIMIT)
1514
@mutex = Mutex.new
1615
end
1716

18-
def duplicate?(flag_key, targeting_key, allocation_key:, variation_key:)
19-
cache_key = digest(flag_key, targeting_key)
20-
cache_digest = digest(allocation_key, variation_key)
17+
def duplicate?(event)
18+
cache_key = digest(event.flag_key, event.targeting_key)
19+
cache_digest = digest(event.allocation_key, event.variation_key)
2120

2221
stored = @cache[cache_key]
2322
return true if stored == cache_digest

lib/datadog/open_feature/exposures/event.rb

Lines changed: 0 additions & 78 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# frozen_string_literal: true
2+
3+
module Datadog
4+
module OpenFeature
5+
module Exposures
6+
module Models
7+
end
8+
end
9+
end
10+
end
11+
12+
require_relative 'models/event'
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../../../core/utils/time'
4+
5+
module Datadog
6+
module OpenFeature
7+
module Exposures
8+
module Models
9+
# A data model for an exposure event.
10+
class Event
11+
TARGETING_KEY_FIELD = 'targeting_key'
12+
ALLOWED_FIELD_TYPES = [
13+
String,
14+
Integer,
15+
Float,
16+
TrueClass,
17+
FalseClass
18+
].freeze
19+
20+
# NOTE: The result is a Hash-like structure like this
21+
#
22+
# {
23+
# "flag": "boolean-one-of-matches",
24+
# "variationType": "INTEGER",
25+
# "defaultValue": 0,
26+
# "targetingKey": "haley",
27+
# "attributes": {
28+
# "not_matches_flag": "False"
29+
# },
30+
# "result": {
31+
# "value": 4,
32+
# "variant": "4",
33+
# "flagMetadata": {
34+
# "allocationKey": "4-for-not-matches",
35+
# "variationType": "number",
36+
# "doLog": true
37+
# }
38+
# }
39+
# }
40+
class << self
41+
def build(result, context:)
42+
payload = {
43+
timestamp: current_timestamp_ms,
44+
allocation: {
45+
key: result.dig('result', 'flagMetadata', 'allocationKey').to_s
46+
},
47+
flag: {
48+
key: result['flag'].to_s
49+
},
50+
variant: {
51+
key: result.dig('result', 'variant').to_s
52+
},
53+
subject: {
54+
id: result['targetingKey'].to_s,
55+
attributes: extract_attributes(context)
56+
}
57+
}
58+
59+
new(payload)
60+
end
61+
62+
private
63+
64+
def extract_attributes(context)
65+
context.fields.select do |key, value|
66+
next false if key == TARGETING_KEY_FIELD
67+
next true if ALLOWED_FIELD_TYPES.include?(value.class)
68+
69+
false
70+
end
71+
end
72+
73+
def current_timestamp_ms
74+
(Datadog::Core::Utils::Time.now.to_f * 1000).to_i
75+
end
76+
end
77+
78+
def initialize(payload)
79+
@payload = payload
80+
end
81+
82+
def flag_key
83+
@payload.dig(:flag, :key).to_s
84+
end
85+
86+
def targeting_key
87+
@payload.dig(:subject, :id).to_s
88+
end
89+
90+
def allocation_key
91+
@payload.dig(:allocation, :key).to_s
92+
end
93+
94+
def variation_key
95+
@payload.dig(:variant, :key).to_s
96+
end
97+
98+
# NOTE: The schema is this
99+
# https://github.com/DataDog/dd-source/blob/c10946901aaa103db960883c20161833a664e093/domains/evp-workers/apps/exposures-worker/schemas/exposure.json
100+
def to_h
101+
@payload
102+
end
103+
end
104+
end
105+
end
106+
end
107+
end

lib/datadog/open_feature/exposures/reporter.rb

Lines changed: 10 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,133 +1,30 @@
11
# frozen_string_literal: true
22

3-
require 'zlib'
4-
require 'thread'
5-
6-
require 'datadog/appsec/api_security/lru_cache'
7-
8-
require_relative 'event'
3+
require_relative 'models/event'
4+
require_relative 'deduplicator'
95

106
module Datadog
117
module OpenFeature
128
module Exposures
139
class Reporter
14-
DEFAULT_CACHE_LIMIT = 1_000
15-
16-
def initialize(worker:, cache: nil, logger: Datadog.logger, time_provider: Time)
10+
def initialize(worker, telemetry:, logger: Datadog.logger)
1711
@worker = worker
1812
@logger = logger
19-
@time_provider = time_provider
20-
@cache = cache || Datadog::AppSec::APISecurity::LRUCache.new(DEFAULT_CACHE_LIMIT)
21-
@cache_mutex = Mutex.new
13+
@telemetry = telemetry
14+
@deduplicator = Deduplicator.new
2215
end
2316

24-
def report(result:, context: nil)
25-
payload = normalize(result, context)
26-
return false if payload.nil?
27-
28-
cache_key = payload[:cache_key]
29-
digest = payload[:digest]
30-
31-
return false if cache_key && digest && duplicate?(cache_key, digest)
17+
def report(result, context:)
18+
return false unless result.dig('result', 'flagMetadata', 'doLog')
3219

33-
event = build_event(payload)
34-
return false if event.nil?
20+
event = Models::Event.build(result, context: context)
21+
return false if @deduplicator.duplicate?(event)
3522

3623
@worker.enqueue(event)
3724
rescue => e
3825
@logger.debug { "OpenFeature: Reporter failed to enqueue exposure: #{e.class}: #{e.message}" }
39-
false
40-
end
41-
42-
def flush
43-
@cache_mutex.synchronize { @cache.clear }
44-
end
45-
46-
private
47-
48-
def duplicate?(cache_key, digest)
49-
@cache_mutex.synchronize do
50-
stored = @cache[cache_key]
51-
return true if stored == digest
52-
53-
@cache.store(cache_key, digest)
54-
false
55-
end
56-
end
57-
58-
def normalize(result, context)
59-
data = ensure_hash(result)
60-
result_data = ensure_hash(data[:result] || data['result'])
61-
flag_metadata = ensure_hash(result_data[:flagMetadata] || result_data['flagMetadata'])
62-
63-
flag_key = data[:flag] || data['flag']
64-
subject_id = extract_subject_id(data, context)
65-
allocation_key = flag_metadata[:allocationKey] || flag_metadata['allocationKey']
66-
evaluation_key = result_data[:variant] || result_data['variant']
67-
68-
return nil if flag_key.nil? || subject_id.nil?
69-
70-
variant = evaluation_key || result_data[:value] || result_data['value']
71-
variant_key = variant.nil? ? nil : variant.to_s
72-
return nil if variant_key.nil?
7326

74-
{
75-
flag_key: flag_key,
76-
subject_id: subject_id,
77-
subject_type: nil,
78-
subject_attributes: extract_attributes(data, context),
79-
allocation_key: allocation_key,
80-
variant_key: variant_key,
81-
cache_key: cache_key(flag_key, subject_id),
82-
digest: allocation_key.nil? ? nil : digest(allocation_key, variant_key)
83-
}
84-
end
85-
86-
def build_event(payload)
87-
Event.new(
88-
timestamp: @time_provider.now,
89-
allocation_key: payload[:allocation_key],
90-
flag_key: payload[:flag_key],
91-
variant_key: payload[:variant_key],
92-
subject_id: payload[:subject_id],
93-
subject_type: payload[:subject_type],
94-
subject_attributes: payload[:subject_attributes]
95-
)
96-
end
97-
98-
def extract_subject_id(data, context)
99-
data[:targetingKey] || data['targetingKey'] || context_value(context, :targeting_key) ||
100-
context_value(context, :targetingKey) || context_value(context, :targetingkey)
101-
end
102-
103-
def extract_attributes(data, context)
104-
attributes = data[:attributes] || data['attributes']
105-
return attributes if attributes.is_a?(Hash)
106-
107-
context_value(context, :attributes).is_a?(Hash) ? context_value(context, :attributes) : nil
108-
end
109-
110-
def context_value(context, key)
111-
case context
112-
when Hash
113-
context[key] || context[key.to_s]
114-
else
115-
context.respond_to?(key) ? context.public_send(key) : nil
116-
end
117-
end
118-
119-
def ensure_hash(value)
120-
value.is_a?(Hash) ? value : {}
121-
end
122-
123-
def cache_key(flag_key, subject_id)
124-
return nil if flag_key.nil? || subject_id.nil?
125-
126-
"#{flag_key}:#{subject_id}"
127-
end
128-
129-
def digest(allocation_key, evaluation_key)
130-
Zlib.crc32("#{allocation_key}:#{evaluation_key}")
27+
false
13128
end
13229
end
13330
end

0 commit comments

Comments
 (0)