Skip to content

Commit f50dc8c

Browse files
committed
Introduce deduplicator and reporter
* Extract AppSec::LRUCache into the Core
1 parent 0f5c9c2 commit f50dc8c

File tree

14 files changed

+373
-71
lines changed

14 files changed

+373
-71
lines changed
Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,11 @@
11
# frozen_string_literal: true
22

3-
require 'forwardable'
3+
require 'datadog/core/utils/lru_cache'
44

55
module Datadog
66
module AppSec
77
module APISecurity
8-
# An LRU (Least Recently Used) cache implementation that relies on the
9-
# Ruby 1.9+ `Hash` implementation that guarantees insertion order.
10-
#
11-
# WARNING: This implementation is NOT thread-safe and should be used
12-
# in a single-threaded context.
13-
class LRUCache
14-
extend Forwardable
15-
16-
def_delegators :@store, :clear, :empty?
17-
18-
def initialize(max_size)
19-
raise ArgumentError, 'max_size must be an Integer' unless max_size.is_a?(Integer)
20-
raise ArgumentError, 'max_size must be greater than 0' if max_size <= 0
21-
22-
@max_size = max_size
23-
@store = {}
24-
end
25-
26-
# NOTE: Accessing a key moves it to the end of the list.
27-
def [](key)
28-
if (entry = @store.delete(key))
29-
@store[key] = entry
30-
end
31-
end
32-
33-
def store(key, value)
34-
return @store[key] = value if @store.delete(key)
35-
36-
# NOTE: evict the oldest entry if store reached the maximum allowed size
37-
@store.shift if @store.size >= @max_size
38-
@store[key] = value
39-
end
40-
41-
# NOTE: If the key exists, it's moved to the end of the list and
42-
# if does not, the given block will be executed and the result
43-
# will be stored (which will add it to the end of the list).
44-
def fetch_or_store(key)
45-
if (entry = @store.delete(key))
46-
return @store[key] = entry
47-
end
48-
49-
# NOTE: evict the oldest entry if store reached the maximum allowed size
50-
@store.shift if @store.size >= @max_size
51-
@store[key] = yield
52-
end
53-
end
8+
LRUCache = Datadog::Core::Utils::LRUCache
549
end
5510
end
5611
end
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# frozen_string_literal: true
2+
3+
require 'forwardable'
4+
5+
module Datadog
6+
module Core
7+
module Utils
8+
# An LRU (Least Recently Used) cache implementation that relies on the
9+
# Ruby 1.9+ `Hash` implementation that guarantees insertion order.
10+
#
11+
# WARNING: This implementation is NOT thread-safe and should be used
12+
# in a single-threaded context.
13+
class LRUCache
14+
extend Forwardable
15+
16+
def_delegators :@store, :clear, :empty?
17+
18+
def initialize(max_size)
19+
raise ArgumentError, 'max_size must be an Integer' unless max_size.is_a?(Integer)
20+
raise ArgumentError, 'max_size must be greater than 0' if max_size <= 0
21+
22+
@max_size = max_size
23+
@store = {}
24+
end
25+
26+
# NOTE: Accessing a key moves it to the end of the list.
27+
def [](key)
28+
if (entry = @store.delete(key))
29+
@store[key] = entry
30+
end
31+
end
32+
33+
def store(key, value)
34+
return @store[key] = value if @store.delete(key)
35+
36+
# NOTE: evict the oldest entry if store reached the maximum allowed size
37+
@store.shift if @store.size >= @max_size
38+
@store[key] = value
39+
end
40+
41+
# NOTE: If the key exists, it's moved to the end of the list and
42+
# if does not, the given block will be executed and the result
43+
# will be stored (which will add it to the end of the list).
44+
def fetch_or_store(key)
45+
if (entry = @store.delete(key))
46+
return @store[key] = entry
47+
end
48+
49+
# NOTE: evict the oldest entry if store reached the maximum allowed size
50+
@store.shift if @store.size >= @max_size
51+
@store[key] = yield
52+
end
53+
end
54+
end
55+
end
56+
end

lib/datadog/open_feature/component.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def initialize(settings, agent_settings, logger:, telemetry:)
3939
end
4040

4141
def shutdown!
42-
@reporter&.clear
42+
@reporter&.flush
4343
return unless defined?(@worker) && @worker
4444

4545
@worker.flush

lib/datadog/open_feature/exposures.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ module Exposures
1212
require_relative 'exposures/batch'
1313
require_relative 'exposures/buffer'
1414
require_relative 'exposures/worker'
15+
require_relative 'exposures/deduplicator'
1516
require_relative 'exposures/reporter'
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# frozen_string_literal: true
2+
3+
require 'zlib'
4+
5+
require 'datadog/core/utils/lru_cache'
6+
7+
module Datadog
8+
module OpenFeature
9+
module Exposures
10+
class Deduplicator
11+
DEFAULT_CACHE_LIMIT = 1_000
12+
13+
def initialize(limit: DEFAULT_CACHE_LIMIT)
14+
@cache = Datadog::Core::Utils::LRUCache.new(limit)
15+
@mutex = Mutex.new
16+
end
17+
18+
def duplicate?(flag_key, targeting_key, allocation_key:, variation_key:)
19+
cache_key = digest(flag_key, targeting_key)
20+
cache_digest = digest(allocation_key, variation_key)
21+
22+
stored = @cache[cache_key]
23+
return true if stored == cache_digest
24+
25+
@mutex.synchronize { @cache.store(cache_key, cache_digest) }
26+
false
27+
end
28+
29+
private
30+
31+
def digest(left, right)
32+
Zlib.crc32(left + right)
33+
end
34+
end
35+
end
36+
end
37+
end
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# frozen_string_literal: true
2+
3+
require 'zlib'
4+
require 'thread'
5+
6+
require 'datadog/appsec/api_security/lru_cache'
7+
8+
require_relative 'event'
9+
10+
module Datadog
11+
module OpenFeature
12+
module Exposures
13+
class Reporter
14+
DEFAULT_CACHE_LIMIT = 1_000
15+
16+
def initialize(worker:, cache: nil, logger: Datadog.logger, time_provider: Time)
17+
@worker = worker
18+
@logger = logger
19+
@time_provider = time_provider
20+
@cache = cache || Datadog::AppSec::APISecurity::LRUCache.new(DEFAULT_CACHE_LIMIT)
21+
@cache_mutex = Mutex.new
22+
end
23+
24+
def report(result:, context: nil)
25+
payload = normalize(result, context)
26+
return false if payload.nil?
27+
28+
cache_key = payload[:cache_key]
29+
digest = payload[:digest]
30+
31+
return false if cache_key && digest && duplicate?(cache_key, digest)
32+
33+
event = build_event(payload)
34+
return false if event.nil?
35+
36+
@worker.enqueue(event)
37+
rescue => e
38+
@logger.debug { "OpenFeature: Reporter failed to enqueue exposure: #{e.class}: #{e.message}" }
39+
false
40+
end
41+
42+
def flush
43+
@cache_mutex.synchronize { @cache.clear }
44+
end
45+
46+
private
47+
48+
def duplicate?(cache_key, digest)
49+
@cache_mutex.synchronize do
50+
stored = @cache[cache_key]
51+
return true if stored == digest
52+
53+
@cache.store(cache_key, digest)
54+
false
55+
end
56+
end
57+
58+
def normalize(result, context)
59+
data = ensure_hash(result)
60+
result_data = ensure_hash(data[:result] || data['result'])
61+
flag_metadata = ensure_hash(result_data[:flagMetadata] || result_data['flagMetadata'])
62+
63+
flag_key = data[:flag] || data['flag']
64+
subject_id = extract_subject_id(data, context)
65+
allocation_key = flag_metadata[:allocationKey] || flag_metadata['allocationKey']
66+
evaluation_key = result_data[:variant] || result_data['variant']
67+
68+
return nil if flag_key.nil? || subject_id.nil?
69+
70+
variant = evaluation_key || result_data[:value] || result_data['value']
71+
variant_key = variant.nil? ? nil : variant.to_s
72+
return nil if variant_key.nil?
73+
74+
{
75+
flag_key: flag_key,
76+
subject_id: subject_id,
77+
subject_type: nil,
78+
subject_attributes: extract_attributes(data, context),
79+
allocation_key: allocation_key,
80+
variant_key: variant_key,
81+
cache_key: cache_key(flag_key, subject_id),
82+
digest: allocation_key.nil? ? nil : digest(allocation_key, variant_key)
83+
}
84+
end
85+
86+
def build_event(payload)
87+
Event.new(
88+
timestamp: @time_provider.now,
89+
allocation_key: payload[:allocation_key],
90+
flag_key: payload[:flag_key],
91+
variant_key: payload[:variant_key],
92+
subject_id: payload[:subject_id],
93+
subject_type: payload[:subject_type],
94+
subject_attributes: payload[:subject_attributes]
95+
)
96+
end
97+
98+
def extract_subject_id(data, context)
99+
data[:targetingKey] || data['targetingKey'] || context_value(context, :targeting_key) ||
100+
context_value(context, :targetingKey) || context_value(context, :targetingkey)
101+
end
102+
103+
def extract_attributes(data, context)
104+
attributes = data[:attributes] || data['attributes']
105+
return attributes if attributes.is_a?(Hash)
106+
107+
context_value(context, :attributes).is_a?(Hash) ? context_value(context, :attributes) : nil
108+
end
109+
110+
def context_value(context, key)
111+
case context
112+
when Hash
113+
context[key] || context[key.to_s]
114+
else
115+
context.respond_to?(key) ? context.public_send(key) : nil
116+
end
117+
end
118+
119+
def ensure_hash(value)
120+
value.is_a?(Hash) ? value : {}
121+
end
122+
123+
def cache_key(flag_key, subject_id)
124+
return nil if flag_key.nil? || subject_id.nil?
125+
126+
"#{flag_key}:#{subject_id}"
127+
end
128+
129+
def digest(allocation_key, evaluation_key)
130+
Zlib.crc32("#{allocation_key}:#{evaluation_key}")
131+
end
132+
end
133+
end
134+
end
135+
end

sig/datadog/appsec/api_security/lru_cache.rbs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,7 @@
11
module Datadog
22
module AppSec
33
module APISecurity
4-
class LRUCache
5-
extend Forwardable
6-
7-
@store: Hash[untyped, untyped]
8-
9-
@max_size: Integer
10-
11-
def initialize: (Integer max_size) -> void
12-
13-
def []: (untyped key) -> untyped?
14-
15-
def store: (untyped key, untyped value) -> untyped
16-
17-
def fetch_or_store: (untyped key) { () -> untyped } -> untyped
18-
19-
def clear: () -> void
20-
21-
def empty?: () -> bool
4+
class LRUCache < ::Datadog::Core::Utils::LRUCache
225
end
236
end
247
end
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
module Datadog
2+
module Core
3+
module Utils
4+
class LRUCache
5+
extend ::Forwardable
6+
7+
@store: ::Hash[untyped, untyped]
8+
9+
@max_size: ::Integer
10+
11+
def initialize: (::Integer max_size) -> void
12+
13+
def []: (untyped key) -> untyped?
14+
15+
def store: (untyped key, untyped value) -> untyped
16+
17+
def fetch_or_store: (untyped key) { () -> untyped } -> untyped
18+
19+
def clear: () -> void
20+
21+
def empty?: () -> bool
22+
end
23+
end
24+
end
25+
end
26+
27+
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
module Datadog
2+
module OpenFeature
3+
module Exposures
4+
class Deduplicator
5+
DEFAULT_CACHE_LIMIT: ::Integer
6+
7+
def initialize: (?limit: ::Integer) -> void
8+
9+
def duplicate?: (
10+
::String,
11+
::String,
12+
allocation_key: ::String,
13+
variation_key: ::String
14+
) -> bool
15+
16+
private
17+
18+
def key: (::String, ::String) -> ::String
19+
20+
def digest: (::String, ::String) -> ::Integer
21+
end
22+
end
23+
end
24+
end
25+
26+

sig/datadog/open_feature/exposures/reporter.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ module Datadog
1313

1414
def report: (result: ::Hash[untyped, untyped], ?context: untyped) -> bool
1515

16-
def clear: () -> void
16+
def flush: () -> void
1717

1818
private
1919

0 commit comments

Comments
 (0)