Skip to content

Commit 6e3ebf1

Browse files
committed
chore: Refactor FDv1 behind the datasystem interface
1 parent 9a21a7a commit 6e3ebf1

File tree

4 files changed

+201
-24
lines changed

4 files changed

+201
-24
lines changed

ldclient/client.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -249,26 +249,32 @@ def __start_up(self, start_wait: float):
249249
self.__hooks_lock = ReadWriteLock()
250250
self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook]
251251

252-
data_store_listeners = Listeners()
253-
store_sink = DataStoreUpdateSinkImpl(data_store_listeners)
254-
store = _FeatureStoreClientWrapper(self._config.feature_store, store_sink)
255-
256-
self.__data_store_status_provider = DataStoreStatusProviderImpl(store, store_sink)
257-
258-
data_source_listeners = Listeners()
259-
flag_change_listeners = Listeners()
260-
261-
self.__flag_tracker = FlagTrackerImpl(flag_change_listeners, lambda key, context: self.variation(key, context, None))
252+
# Initialize data system (FDv1) to encapsulate v1 data plumbing
253+
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
254+
FDv1
255+
)
262256

263-
self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, data_source_listeners, flag_change_listeners)
264-
self.__data_source_status_provider = DataSourceStatusProviderImpl(data_source_listeners, self._config._data_source_update_sink)
265-
self._store = store # type: FeatureStore
257+
self._data_system = FDv1(self._config)
258+
# Provide flag evaluation function for value-change tracking
259+
self._data_system.set_flag_value_eval_fn(
260+
lambda key, context: self.variation(key, context, None)
261+
)
262+
# Expose providers and store from data system
263+
self.__data_store_status_provider = self._data_system.data_store_status_provider
264+
self.__data_source_status_provider = (
265+
self._data_system.data_source_status_provider
266+
)
267+
self.__flag_tracker = self._data_system.flag_tracker
268+
self._store = self._data_system.store # type: FeatureStore
266269

267270
big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
268271
self.__big_segment_store_manager = big_segment_store_manager
269272

270273
self._evaluator = Evaluator(
271-
lambda key: _get_store_item(store, FEATURES, key), lambda key: _get_store_item(store, SEGMENTS, key), lambda key: big_segment_store_manager.get_user_membership(key), log
274+
lambda key: _get_store_item(self._store, FEATURES, key),
275+
lambda key: _get_store_item(self._store, SEGMENTS, key),
276+
lambda key: big_segment_store_manager.get_user_membership(key),
277+
log,
272278
)
273279

274280
if self._config.offline:
@@ -282,8 +288,7 @@ def __start_up(self, start_wait: float):
282288
self.__register_plugins(environment_metadata)
283289

284290
update_processor_ready = threading.Event()
285-
self._update_processor = self._make_update_processor(self._config, self._store, update_processor_ready, diagnostic_accumulator)
286-
self._update_processor.start()
291+
self._data_system.start(update_processor_ready)
287292

288293
if not self._config.offline and not self._config.use_ldd:
289294
if start_wait > 60:
@@ -293,7 +298,7 @@ def __start_up(self, start_wait: float):
293298
log.info("Waiting up to " + str(start_wait) + " seconds for LaunchDarkly client to initialize...")
294299
update_processor_ready.wait(start_wait)
295300

296-
if self._update_processor.initialized() is True:
301+
if self.is_initialized() is True:
297302
log.info("Started LaunchDarkly Client: OK")
298303
else:
299304
log.warning("Initialization timeout exceeded for LaunchDarkly Client or an error occurred. " "Feature Flags may not yet be available.")
@@ -379,7 +384,7 @@ def close(self):
379384
"""
380385
log.info("Closing LaunchDarkly client..")
381386
self._event_processor.stop()
382-
self._update_processor.stop()
387+
self._data_system.stop()
383388
self.__big_segment_store_manager.stop()
384389

385390
# These magic methods allow a client object to be automatically cleaned up by the "with" scope operator
@@ -464,7 +469,14 @@ def is_initialized(self) -> bool:
464469
unsuccessful attempt, or it might have received an unrecoverable error (such as an invalid SDK key)
465470
and given up.
466471
"""
467-
return self.is_offline() or self._config.use_ldd or self._update_processor.initialized()
472+
if self.is_offline() or self._config.use_ldd:
473+
return True
474+
475+
return (
476+
self._data_system._update_processor.initialized()
477+
if self._data_system._update_processor
478+
else False
479+
)
468480

469481
def flush(self):
470482
"""Flushes all pending analytics events.

ldclient/impl/datasystem/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def start(self, set_on_ready: Event):
6868
Starts the data system.
6969
7070
This method will return immediately. The provided `Event` will be set when the system
71-
has reached an initial state (either permanently faile, e.g. due to bad auth, or
71+
has reached an initial state (either permanently failed, e.g. due to bad auth, or
7272
succeeded)
7373
"""
7474
raise NotImplementedError

ldclient/impl/datasystem/fdv1.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
from threading import Event
2+
from typing import Optional
3+
4+
from ldclient.config import Config
5+
from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl
6+
from ldclient.impl.datasource.polling import PollingUpdateProcessor
7+
from ldclient.impl.datasource.status import (
8+
DataSourceStatusProviderImpl,
9+
DataSourceUpdateSinkImpl
10+
)
11+
from ldclient.impl.datasource.streaming import StreamingUpdateProcessor
12+
from ldclient.impl.datastore.status import (
13+
DataStoreStatusProviderImpl,
14+
DataStoreUpdateSinkImpl
15+
)
16+
from ldclient.impl.datasystem import DataAvailability
17+
from ldclient.impl.flag_tracker import FlagTrackerImpl
18+
from ldclient.impl.listeners import Listeners
19+
from ldclient.impl.stubs import NullUpdateProcessor
20+
from ldclient.interfaces import (
21+
DataSourceState,
22+
DataSourceStatus,
23+
DataSourceStatusProvider,
24+
DataStoreStatusProvider,
25+
FeatureStore,
26+
FlagTracker,
27+
UpdateProcessor
28+
)
29+
30+
# Delayed import inside __init__ to avoid circular dependency with ldclient.client
31+
32+
33+
class FDv1:
34+
"""
35+
FDv1 wires the existing v1 data source and store behavior behind the
36+
generic DataSystem surface.
37+
"""
38+
39+
def __init__(self, config: Config):
40+
self._config = config
41+
42+
# Set up data store plumbing
43+
self._data_store_listeners = Listeners()
44+
self._data_store_update_sink = DataStoreUpdateSinkImpl(
45+
self._data_store_listeners
46+
)
47+
# Import here to avoid circular import
48+
from ldclient.client import _FeatureStoreClientWrapper
49+
50+
self._store_wrapper: FeatureStore = _FeatureStoreClientWrapper(
51+
self._config.feature_store, self._data_store_update_sink
52+
)
53+
self._data_store_status_provider_impl = DataStoreStatusProviderImpl(
54+
self._store_wrapper, self._data_store_update_sink
55+
)
56+
57+
# Set up data source plumbing
58+
self._data_source_listeners = Listeners()
59+
self._flag_change_listeners = Listeners()
60+
self._flag_tracker_impl = FlagTrackerImpl(
61+
self._flag_change_listeners,
62+
# TODO: What is going on here? This doesn't seem right.
63+
# It requires calling the set_flag_value_eval_fn method. We should see if we can fix that.
64+
lambda key, context: None, # Replaced by client to use its evaluation method
65+
)
66+
self._data_source_update_sink = DataSourceUpdateSinkImpl(
67+
self._store_wrapper,
68+
self._data_source_listeners,
69+
self._flag_change_listeners,
70+
)
71+
self._data_source_status_provider_impl = DataSourceStatusProviderImpl(
72+
self._data_source_listeners, self._data_source_update_sink
73+
)
74+
75+
# Ensure v1 processors can find the sink via config for status updates
76+
self._config._data_source_update_sink = self._data_source_update_sink
77+
78+
# Update processor created in start(), because it needs the ready Event
79+
self._update_processor: Optional[UpdateProcessor] = None
80+
81+
# Track current data availability
82+
# QUESTION: Why cached?
83+
self._data_availability: DataAvailability = (
84+
DataAvailability.CACHED
85+
if getattr(self._store_wrapper, "initialized", False)
86+
else DataAvailability.DEFAULTS
87+
)
88+
89+
# React to data source status updates to adjust availability
90+
def _on_status_change(status: DataSourceStatus):
91+
if status.state == DataSourceState.VALID:
92+
self._data_availability = DataAvailability.REFRESHED
93+
94+
self._data_source_status_provider_impl.add_listener(_on_status_change)
95+
96+
def start(self, set_on_ready: Event):
97+
"""
98+
Starts the v1 update processor and returns immediately. The provided
99+
Event is set by the processor upon first successful initialization or
100+
upon permanent failure.
101+
"""
102+
update_processor = self._make_update_processor(
103+
self._config, self._store_wrapper, set_on_ready
104+
)
105+
self._update_processor = update_processor
106+
update_processor.start()
107+
108+
def stop(self):
109+
if self._update_processor is not None:
110+
self._update_processor.stop()
111+
112+
@property
113+
def store(self) -> FeatureStore:
114+
return self._store_wrapper
115+
116+
def set_flag_value_eval_fn(self, eval_fn):
117+
"""
118+
Injects the flag value evaluation function used by the flag tracker to
119+
compute FlagValueChange events. The function signature should be
120+
(key: str, context: Context) -> Any.
121+
"""
122+
self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn)
123+
124+
@property
125+
def data_source_status_provider(self) -> DataSourceStatusProvider:
126+
return self._data_source_status_provider_impl
127+
128+
@property
129+
def data_store_status_provider(self) -> DataStoreStatusProvider:
130+
return self._data_store_status_provider_impl
131+
132+
@property
133+
def flag_tracker(self) -> FlagTracker:
134+
return self._flag_tracker_impl
135+
136+
@property
137+
def data_availability(self) -> DataAvailability:
138+
return self._data_availability
139+
140+
@property
141+
def target_availability(self) -> DataAvailability:
142+
if self._config.offline:
143+
return DataAvailability.DEFAULTS
144+
# In LDD mode or normal connected modes, the ideal is to be refreshed
145+
return DataAvailability.REFRESHED
146+
147+
def _make_update_processor(self, config: Config, store: FeatureStore, ready: Event):
148+
# Mirrors LDClient._make_update_processor but scoped for FDv1
149+
if config.update_processor_class:
150+
return config.update_processor_class(config, store, ready)
151+
152+
if config.offline or config.use_ldd:
153+
return NullUpdateProcessor(config, store, ready)
154+
155+
if config.stream:
156+
# Diagnostic accumulator is handled in client; pass None here
157+
return StreamingUpdateProcessor(config, store, ready, None)
158+
159+
# Polling mode
160+
feature_requester = (
161+
config.feature_requester_class(config)
162+
if config.feature_requester_class is not None
163+
else FeatureRequesterImpl(config)
164+
)
165+
return PollingUpdateProcessor(config, feature_requester, store, ready)

ldclient/testing/test_ldclient.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,24 @@ def count_events(c):
5959

6060
def test_client_has_null_update_processor_in_offline_mode():
6161
with make_offline_client() as client:
62-
assert isinstance(client._update_processor, NullUpdateProcessor)
62+
assert isinstance(client._data_system._update_processor, NullUpdateProcessor)
6363

6464

6565
def test_client_has_null_update_processor_in_ldd_mode():
6666
with make_ldd_client() as client:
67-
assert isinstance(client._update_processor, NullUpdateProcessor)
67+
assert isinstance(client._data_system._update_processor, NullUpdateProcessor)
6868

6969

7070
def test_client_has_streaming_processor_by_default():
7171
config = Config(sdk_key="secret", base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False)
7272
with LDClient(config=config, start_wait=0) as client:
73-
assert isinstance(client._update_processor, StreamingUpdateProcessor)
73+
assert isinstance(client._data_system._update_processor, StreamingUpdateProcessor)
7474

7575

7676
def test_client_has_polling_processor_if_streaming_is_disabled():
7777
config = Config(sdk_key="secret", stream=False, base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False)
7878
with LDClient(config=config, start_wait=0) as client:
79-
assert isinstance(client._update_processor, PollingUpdateProcessor)
79+
assert isinstance(client._data_system._update_processor, PollingUpdateProcessor)
8080

8181

8282
def test_toggle_offline():

0 commit comments

Comments
 (0)