Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,26 +249,32 @@ def __start_up(self, start_wait: float):
self.__hooks_lock = ReadWriteLock()
self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook]

data_store_listeners = Listeners()
store_sink = DataStoreUpdateSinkImpl(data_store_listeners)
store = _FeatureStoreClientWrapper(self._config.feature_store, store_sink)

self.__data_store_status_provider = DataStoreStatusProviderImpl(store, store_sink)

data_source_listeners = Listeners()
flag_change_listeners = Listeners()

self.__flag_tracker = FlagTrackerImpl(flag_change_listeners, lambda key, context: self.variation(key, context, None))
# Initialize data system (FDv1) to encapsulate v1 data plumbing
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
FDv1
)

self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, data_source_listeners, flag_change_listeners)
self.__data_source_status_provider = DataSourceStatusProviderImpl(data_source_listeners, self._config._data_source_update_sink)
self._store = store # type: FeatureStore
self._data_system = FDv1(self._config)
# Provide flag evaluation function for value-change tracking
self._data_system.set_flag_value_eval_fn(
lambda key, context: self.variation(key, context, None)
)
# Expose providers and store from data system
self.__data_store_status_provider = self._data_system.data_store_status_provider
self.__data_source_status_provider = (
self._data_system.data_source_status_provider
)
self.__flag_tracker = self._data_system.flag_tracker
self._store = self._data_system.store # type: FeatureStore

big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
self.__big_segment_store_manager = big_segment_store_manager

self._evaluator = Evaluator(
lambda key: _get_store_item(store, FEATURES, key), lambda key: _get_store_item(store, SEGMENTS, key), lambda key: big_segment_store_manager.get_user_membership(key), log
lambda key: _get_store_item(self._store, FEATURES, key),
lambda key: _get_store_item(self._store, SEGMENTS, key),
lambda key: big_segment_store_manager.get_user_membership(key),
log,
)

if self._config.offline:
Expand All @@ -279,11 +285,13 @@ def __start_up(self, start_wait: float):

diagnostic_accumulator = self._set_event_processor(self._config)

# Pass diagnostic accumulator to data system for streaming metrics
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator)

self.__register_plugins(environment_metadata)

update_processor_ready = threading.Event()
self._update_processor = self._make_update_processor(self._config, self._store, update_processor_ready, diagnostic_accumulator)
self._update_processor.start()
self._data_system.start(update_processor_ready)

if not self._config.offline and not self._config.use_ldd:
if start_wait > 60:
Expand All @@ -293,7 +301,7 @@ def __start_up(self, start_wait: float):
log.info("Waiting up to " + str(start_wait) + " seconds for LaunchDarkly client to initialize...")
update_processor_ready.wait(start_wait)

if self._update_processor.initialized() is True:
if self.is_initialized() is True:
log.info("Started LaunchDarkly Client: OK")
else:
log.warning("Initialization timeout exceeded for LaunchDarkly Client or an error occurred. " "Feature Flags may not yet be available.")
Expand Down Expand Up @@ -379,7 +387,7 @@ def close(self):
"""
log.info("Closing LaunchDarkly client..")
self._event_processor.stop()
self._update_processor.stop()
self._data_system.stop()
self.__big_segment_store_manager.stop()

# These magic methods allow a client object to be automatically cleaned up by the "with" scope operator
Expand Down Expand Up @@ -464,7 +472,14 @@ def is_initialized(self) -> bool:
unsuccessful attempt, or it might have received an unrecoverable error (such as an invalid SDK key)
and given up.
"""
return self.is_offline() or self._config.use_ldd or self._update_processor.initialized()
if self.is_offline() or self._config.use_ldd:
return True

return (
self._data_system._update_processor.initialized()
if self._data_system._update_processor
else False
)

def flush(self):
"""Flushes all pending analytics events.
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/datasystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def start(self, set_on_ready: Event):
Starts the data system.

This method will return immediately. The provided `Event` will be set when the system
has reached an initial state (either permanently faile, e.g. due to bad auth, or
has reached an initial state (either permanently failed, e.g. due to bad auth, or
succeeded)
"""
raise NotImplementedError
Expand Down
171 changes: 171 additions & 0 deletions ldclient/impl/datasystem/fdv1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from threading import Event
from typing import Optional

from ldclient.config import Config
from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl
from ldclient.impl.datasource.polling import PollingUpdateProcessor
from ldclient.impl.datasource.status import (
DataSourceStatusProviderImpl,
DataSourceUpdateSinkImpl
)
from ldclient.impl.datasource.streaming import StreamingUpdateProcessor
from ldclient.impl.datastore.status import (
DataStoreStatusProviderImpl,
DataStoreUpdateSinkImpl
)
from ldclient.impl.datasystem import DataAvailability
from ldclient.impl.flag_tracker import FlagTrackerImpl
from ldclient.impl.listeners import Listeners
from ldclient.impl.stubs import NullUpdateProcessor
from ldclient.interfaces import (
DataSourceState,
DataSourceStatus,
DataSourceStatusProvider,
DataStoreStatusProvider,
FeatureStore,
FlagTracker,
UpdateProcessor
)

# Delayed import inside __init__ to avoid circular dependency with ldclient.client


class FDv1:
"""
FDv1 wires the existing v1 data source and store behavior behind the
generic DataSystem surface.
"""

def __init__(self, config: Config):
self._config = config

# Set up data store plumbing
self._data_store_listeners = Listeners()
self._data_store_update_sink = DataStoreUpdateSinkImpl(
self._data_store_listeners
)
# Import here to avoid circular import
from ldclient.client import _FeatureStoreClientWrapper

self._store_wrapper: FeatureStore = _FeatureStoreClientWrapper(
self._config.feature_store, self._data_store_update_sink
)
self._data_store_status_provider_impl = DataStoreStatusProviderImpl(
self._store_wrapper, self._data_store_update_sink
)

# Set up data source plumbing
self._data_source_listeners = Listeners()
self._flag_change_listeners = Listeners()
self._flag_tracker_impl = FlagTrackerImpl(
self._flag_change_listeners,
lambda key, context: None, # Replaced by client to use its evaluation method
)
self._data_source_update_sink = DataSourceUpdateSinkImpl(
self._store_wrapper,
self._data_source_listeners,
self._flag_change_listeners,
)
self._data_source_status_provider_impl = DataSourceStatusProviderImpl(
self._data_source_listeners, self._data_source_update_sink
)

# Ensure v1 processors can find the sink via config for status updates
self._config._data_source_update_sink = self._data_source_update_sink

# Update processor created in start(), because it needs the ready Event
self._update_processor: Optional[UpdateProcessor] = None

# Diagnostic accumulator provided by client for streaming metrics
self._diagnostic_accumulator = None

# Track current data availability
self._data_availability: DataAvailability = (
DataAvailability.CACHED
if getattr(self._store_wrapper, "initialized", False)
else DataAvailability.DEFAULTS
)

# React to data source status updates to adjust availability
def _on_status_change(status: DataSourceStatus):
if status.state == DataSourceState.VALID:
self._data_availability = DataAvailability.REFRESHED

self._data_source_status_provider_impl.add_listener(_on_status_change)

def start(self, set_on_ready: Event):
"""
Starts the v1 update processor and returns immediately. The provided
Event is set by the processor upon first successful initialization or
upon permanent failure.
"""
update_processor = self._make_update_processor(
self._config, self._store_wrapper, set_on_ready
)
self._update_processor = update_processor
update_processor.start()

def stop(self):
if self._update_processor is not None:
self._update_processor.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully know the logic here, but since we are setting the _update_processor in the start method should we set it back to None after it stops? Would there ever be a case where you could call the start method on a previously stopped processor? It probably doesn't matter in this code since the start method will just overwrite it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will matter. As you point out, calling start again would override it. But no one is going to be interacting with the datasystem directly. That's all internal to the SDK so shouldn't be an issue.


@property
def store(self) -> FeatureStore:
return self._store_wrapper

def set_flag_value_eval_fn(self, eval_fn):
"""
Injects the flag value evaluation function used by the flag tracker to
compute FlagValueChange events. The function signature should be
(key: str, context: Context) -> Any.
"""
self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn)

def set_diagnostic_accumulator(self, diagnostic_accumulator):
"""
Sets the diagnostic accumulator for streaming initialization metrics.
This should be called before start() to ensure metrics are collected.
"""
self._diagnostic_accumulator = diagnostic_accumulator

@property
def data_source_status_provider(self) -> DataSourceStatusProvider:
return self._data_source_status_provider_impl

@property
def data_store_status_provider(self) -> DataStoreStatusProvider:
return self._data_store_status_provider_impl

@property
def flag_tracker(self) -> FlagTracker:
return self._flag_tracker_impl

@property
def data_availability(self) -> DataAvailability:
return self._data_availability

@property
def target_availability(self) -> DataAvailability:
if self._config.offline:
return DataAvailability.DEFAULTS
# In LDD mode or normal connected modes, the ideal is to be refreshed
return DataAvailability.REFRESHED

def _make_update_processor(self, config: Config, store: FeatureStore, ready: Event):
# Mirrors LDClient._make_update_processor but scoped for FDv1
if config.update_processor_class:
return config.update_processor_class(config, store, ready)

if config.offline or config.use_ldd:
return NullUpdateProcessor(config, store, ready)

if config.stream:
return StreamingUpdateProcessor(config, store, ready, self._diagnostic_accumulator)

# Polling mode
feature_requester = (
config.feature_requester_class(config)
if config.feature_requester_class is not None
else FeatureRequesterImpl(config)
)
return PollingUpdateProcessor(config, feature_requester, store, ready)
8 changes: 4 additions & 4 deletions ldclient/testing/test_ldclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,24 @@ def count_events(c):

def test_client_has_null_update_processor_in_offline_mode():
with make_offline_client() as client:
assert isinstance(client._update_processor, NullUpdateProcessor)
assert isinstance(client._data_system._update_processor, NullUpdateProcessor)


def test_client_has_null_update_processor_in_ldd_mode():
with make_ldd_client() as client:
assert isinstance(client._update_processor, NullUpdateProcessor)
assert isinstance(client._data_system._update_processor, NullUpdateProcessor)


def test_client_has_streaming_processor_by_default():
config = Config(sdk_key="secret", base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False)
with LDClient(config=config, start_wait=0) as client:
assert isinstance(client._update_processor, StreamingUpdateProcessor)
assert isinstance(client._data_system._update_processor, StreamingUpdateProcessor)


def test_client_has_polling_processor_if_streaming_is_disabled():
config = Config(sdk_key="secret", stream=False, base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False)
with LDClient(config=config, start_wait=0) as client:
assert isinstance(client._update_processor, PollingUpdateProcessor)
assert isinstance(client._data_system._update_processor, PollingUpdateProcessor)


def test_toggle_offline():
Expand Down