diff --git a/ldclient/client.py b/ldclient/client.py index 71158291..3cd3b9be 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -53,7 +53,8 @@ DataStoreStatusProvider, DataStoreUpdateSink, FeatureStore, - FlagTracker + FlagTracker, + ReadOnlyStore ) from ldclient.migrations import OpTracker, Stage from ldclient.plugin import ( @@ -272,7 +273,7 @@ def __start_up(self, start_wait: float): self._data_system.data_source_status_provider ) self.__flag_tracker = self._data_system.flag_tracker - self._store: FeatureStore = self._data_system.store # type: ignore + self._store: ReadOnlyStore = self._data_system.store big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments) self.__big_segment_store_manager = big_segment_store_manager diff --git a/ldclient/impl/datasystem/__init__.py b/ldclient/impl/datasystem/__init__.py index 57131c87..ec1fb9e0 100644 --- a/ldclient/impl/datasystem/__init__.py +++ b/ldclient/impl/datasystem/__init__.py @@ -16,7 +16,8 @@ DataSourceState, DataSourceStatusProvider, DataStoreStatusProvider, - FlagTracker + FlagTracker, + ReadOnlyStore ) @@ -141,6 +142,14 @@ def target_availability(self) -> DataAvailability: """ raise NotImplementedError + @property + @abstractmethod + def store(self) -> ReadOnlyStore: + """ + Returns the data store used by the data system. + """ + raise NotImplementedError + class SelectorStore(Protocol): """ diff --git a/ldclient/impl/datasystem/fdv1.py b/ldclient/impl/datasystem/fdv1.py index e45498e2..3e57ad34 100644 --- a/ldclient/impl/datasystem/fdv1.py +++ b/ldclient/impl/datasystem/fdv1.py @@ -24,6 +24,7 @@ DataStoreStatusProvider, FeatureStore, FlagTracker, + ReadOnlyStore, UpdateProcessor ) @@ -110,7 +111,7 @@ def stop(self): self._update_processor.stop() @property - def store(self) -> FeatureStore: + def store(self) -> ReadOnlyStore: return self._store_wrapper def set_flag_value_eval_fn(self, eval_fn): diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 8dd8e5c7..8123237b 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -24,7 +24,8 @@ DataStoreStatus, DataStoreStatusProvider, FeatureStore, - FlagTracker + FlagTracker, + ReadOnlyStore ) from ldclient.versioned_data_kind import VersionedDataKind @@ -500,7 +501,7 @@ def _recovery_condition(self, status: DataSourceStatus) -> bool: return interrupted_at_runtime or healthy_for_too_long or cannot_initialize @property - def store(self) -> FeatureStore: + def store(self) -> ReadOnlyStore: """Get the underlying store for flag evaluation.""" return self._store.get_active_store() diff --git a/ldclient/impl/datasystem/protocolv2.py b/ldclient/impl/datasystem/protocolv2.py index 50cc0862..7feb8a81 100644 --- a/ldclient/impl/datasystem/protocolv2.py +++ b/ldclient/impl/datasystem/protocolv2.py @@ -458,9 +458,7 @@ class Change: kind: ObjectKind key: str version: int - object: Any = ( - None # TODO(fdv2): At some point, we should define a better type for this. - ) + object: Optional[dict] = None @dataclass(frozen=True) diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index dabd5d29..15bc432b 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -8,7 +8,7 @@ import threading from collections import defaultdict -from typing import Any, Callable, Dict, List, Mapping, Optional, Set +from typing import Any, Callable, Dict, List, Optional, Set from ldclient.impl.datasystem.protocolv2 import ( Change, @@ -24,15 +24,20 @@ from ldclient.impl.util import log from ldclient.interfaces import ( DataStoreStatusProvider, - DiagnosticDescription, FeatureStore, - FlagChange + FlagChange, + ReadOnlyStore ) from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind +Collections = Dict[VersionedDataKind, Dict[str, dict]] -class InMemoryFeatureStore(FeatureStore, DiagnosticDescription): - """The default feature store implementation, which holds all data in a thread-safe data structure in memory.""" + +class InMemoryFeatureStore(ReadOnlyStore): + """ + The default feature store implementation, which holds all data in a + thread-safe data structure in memory. + """ def __init__(self): """Constructs an instance of InMemoryFeatureStore.""" @@ -40,98 +45,131 @@ def __init__(self): self._initialized = False self._items = defaultdict(dict) - def is_monitoring_enabled(self) -> bool: - return False - - def is_available(self) -> bool: - return True - - def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x) -> Any: - """ """ + def get( + self, + kind: VersionedDataKind, + key: str, + callback: Callable[[Any], Any] = lambda x: x, + ) -> Any: try: self._lock.rlock() items_of_kind = self._items[kind] item = items_of_kind.get(key) if item is None: - log.debug("Attempted to get missing key %s in '%s', returning None", key, kind.namespace) + log.debug( + "Attempted to get missing key %s in '%s', returning None", + key, + kind.namespace, + ) return callback(None) - if 'deleted' in item and item['deleted']: - log.debug("Attempted to get deleted key %s in '%s', returning None", key, kind.namespace) + if "deleted" in item and item["deleted"]: + log.debug( + "Attempted to get deleted key %s in '%s', returning None", + key, + kind.namespace, + ) return callback(None) return callback(item) finally: self._lock.runlock() - def all(self, kind, callback): - """ """ + def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x: x) -> Any: try: self._lock.rlock() items_of_kind = self._items[kind] - return callback(dict((k, i) for k, i in items_of_kind.items() if ('deleted' not in i) or not i['deleted'])) + return callback( + dict( + (k, i) + for k, i in items_of_kind.items() + if ("deleted" not in i) or not i["deleted"] + ) + ) finally: self._lock.runlock() - def init(self, all_data): - """ """ - all_decoded = {} - for kind, items in all_data.items(): - items_decoded = {} - for key, item in items.items(): - items_decoded[key] = kind.decode(item) - all_decoded[kind] = items_decoded + def set_basis(self, collections: Collections) -> bool: + """ + Initializes the store with a full set of data, replacing any existing data. + """ + all_decoded = self.__decode_collection(collections) + if all_decoded is None: + return False + try: self._lock.lock() self._items.clear() self._items.update(all_decoded) self._initialized = True - for k in all_data: - log.debug("Initialized '%s' store with %d items", k.namespace, len(all_data[k])) + except Exception as e: + log.error("Failed applying set_basis", exc_info=e) + return False finally: self._lock.unlock() - # noinspection PyShadowingNames - def delete(self, kind, key: str, version: int): - """ """ + return True + + def apply_delta(self, collections: Collections) -> bool: + """ + Applies a delta update to the store. + """ + all_decoded = self.__decode_collection(collections) + if all_decoded is None: + return False + try: self._lock.lock() - items_of_kind = self._items[kind] - items_of_kind[key] = {'deleted': True, 'version': version} + for kind, kind_data in all_decoded.items(): + items_of_kind = self._items[kind] + kind_data = all_decoded[kind] + for key, item in kind_data.items(): + items_of_kind[key] = item + log.debug( + "Updated %s in '%s' to version %d", key, kind.namespace, item["version"] + ) + except Exception as e: + log.error("Failed applying apply_delta", exc_info=e) + return False finally: self._lock.unlock() - def upsert(self, kind, item): - """ """ - decoded_item = kind.decode(item) - key = item['key'] + return True + + def __decode_collection(self, collections: Collections) -> Optional[Dict[VersionedDataKind, Dict[str, Any]]]: try: - self._lock.lock() - items_of_kind = self._items[kind] - items_of_kind[key] = decoded_item - log.debug("Updated %s in '%s' to version %d", key, kind.namespace, item['version']) - finally: - self._lock.unlock() + all_decoded = {} + for kind in collections: + collection = collections[kind] + items_decoded = {} + for key in collection: + items_decoded[key] = kind.decode(collection[key]) + all_decoded[kind] = items_decoded + + return all_decoded + except Exception as e: + log.error("Failed decoding collection.", exc_info=e) + return None @property def initialized(self) -> bool: - """ """ + """ + Indicates whether the store has been initialized with data. + """ try: self._lock.rlock() return self._initialized finally: self._lock.runlock() - def describe_configuration(self, config): - return 'memory' - class Store: """ - Store is a dual-mode persistent/in-memory store that serves requests for data from the evaluation - algorithm. + Store is a dual-mode persistent/in-memory store that serves requests for + data from the evaluation algorithm. - At any given moment one of two stores is active: in-memory, or persistent. Once the in-memory - store has data (either from initializers or a synchronizer), the persistent store is no longer - read from. From that point forward, calls to get data will serve from the memory store. + At any given moment one of two stores is active: in-memory, or persistent. + Once the in-memory store has data (either from initializers or a + synchronizer), the persistent store is no longer read from. From that point + forward, calls to get data will serve from the memory store. """ def __init__( @@ -164,7 +202,7 @@ def __init__( self._persist = False # Points to the active store. Swapped upon initialization. - self._active_store: FeatureStore = self._memory_store + self._active_store: ReadOnlyStore = self._memory_store # Identifies the current data self._selector = Selector.no_selector() @@ -211,7 +249,7 @@ def close(self) -> Optional[Exception]: try: # Most FeatureStore implementations don't have close methods # but we'll try to call it if it exists - if hasattr(self._persistent_store, 'close'): + if hasattr(self._persistent_store, "close"): self._persistent_store.close() except Exception as e: return e @@ -225,12 +263,14 @@ def apply(self, change_set: ChangeSet, persist: bool) -> None: change_set: The changeset to apply persist: Whether the changes should be persisted to the persistent store """ + collections = self._changes_to_store_data(change_set.changes) + with self._lock: try: if change_set.intent_code == IntentCode.TRANSFER_FULL: - self._set_basis(change_set, persist) + self._set_basis(collections, change_set.selector, persist) elif change_set.intent_code == IntentCode.TRANSFER_CHANGES: - self._apply_delta(change_set, persist) + self._apply_delta(collections, change_set.selector, persist) elif change_set.intent_code == IntentCode.TRANSFER_NONE: # No-op, no changes to apply return @@ -240,9 +280,11 @@ def apply(self, change_set: ChangeSet, persist: bool) -> None: except Exception as e: # Log error but don't re-raise - matches Go behavior - log.error(f"Store: couldn't apply changeset: {e}") + log.error("Store: couldn't apply changeset: %s", str(e)) - def _set_basis(self, change_set: ChangeSet, persist: bool) -> None: + def _set_basis( + self, collections: Collections, selector: Optional[Selector], persist: bool + ) -> None: """ Set the basis of the store. Any existing data is discarded. @@ -251,39 +293,40 @@ def _set_basis(self, change_set: ChangeSet, persist: bool) -> None: persist: Whether to persist the data to the persistent store """ # Take snapshot for change detection if we have flag listeners - old_data: Optional[Mapping[VersionedDataKind, Mapping[str, dict]]] = None + old_data: Optional[Collections] = None if self._flag_change_listeners.has_listeners(): old_data = {} for kind in [FEATURES, SEGMENTS]: old_data[kind] = self._memory_store.all(kind, lambda x: x) - # Convert changes to the format expected by FeatureStore.init() - all_data = self._changes_to_store_data(change_set.changes) - - # Initialize memory store with new data - self._memory_store.init(all_data) + ok = self._memory_store.set_basis(collections) + if ok is False: + return # Update dependency tracker - self._reset_dependency_tracker(all_data) + self._reset_dependency_tracker(collections) # Send change events if we had listeners if old_data is not None: - affected_items = self._compute_changed_items_for_full_data_set(old_data, all_data) + affected_items = self._compute_changed_items_for_full_data_set( + old_data, collections + ) self._send_change_events(affected_items) # Update state self._persist = persist - if change_set.selector is not None: - self._selector = change_set.selector + self._selector = selector if selector is not None else Selector.no_selector() # Switch to memory store as active self._active_store = self._memory_store # Persist to persistent store if configured and writable if self._should_persist(): - self._persistent_store.init(all_data) # type: ignore + self._persistent_store.init(collections) # type: ignore - def _apply_delta(self, change_set: ChangeSet, persist: bool) -> None: + def _apply_delta( + self, collections: Collections, selector: Optional[Selector], persist: bool + ) -> None: """ Apply a delta update to the store. @@ -291,53 +334,39 @@ def _apply_delta(self, change_set: ChangeSet, persist: bool) -> None: change_set: The changeset containing the delta changes persist: Whether to persist the changes to the persistent store """ + ok = self._memory_store.apply_delta(collections) + if ok is False: + return + has_listeners = self._flag_change_listeners.has_listeners() affected_items: Set[KindAndKey] = set() - # Apply each change - for change in change_set.changes: - if change.action == ChangeType.PUT: - # Convert to VersionedDataKind - kind = FEATURES if change.kind == ObjectKind.FLAG else SEGMENTS - item = change.object - if item is not None: - self._memory_store.upsert(kind, item) - - # Update dependency tracking - self._dependency_tracker.update_dependencies_from(kind, change.key, item) - if has_listeners: - self._dependency_tracker.add_affected_items( - affected_items, KindAndKey(kind=kind, key=change.key) - ) - - # Persist to persistent store if configured - if self._should_persist(): - self._persistent_store.upsert(kind, item) # type: ignore - - elif change.action == ChangeType.DELETE: - # Convert to VersionedDataKind - kind = FEATURES if change.kind == ObjectKind.FLAG else SEGMENTS - self._memory_store.delete(kind, change.key, change.version) - - # Update dependency tracking - self._dependency_tracker.update_dependencies_from(kind, change.key, None) + for kind in collections: + collection = collections[kind] + for key in collection: + item = collection[key] + self._dependency_tracker.update_dependencies_from( + kind, key, item + ) if has_listeners: self._dependency_tracker.add_affected_items( - affected_items, KindAndKey(kind=kind, key=change.key) + affected_items, KindAndKey(kind=kind, key=key) ) - # Persist to persistent store if configured - if self._should_persist(): - self._persistent_store.delete(kind, change.key, change.version) # type: ignore - # Send change events if affected_items: self._send_change_events(affected_items) # Update state self._persist = persist - if change_set.selector is not None: - self._selector = change_set.selector + self._selector = selector if selector is not None else Selector.no_selector() + + if self._should_persist(): + for kind in collections: + kind_data: Dict[str, dict] = collections[kind] + for i in kind_data: + item = kind_data[i] + self._persistent_store.upsert(kind, item) # type: ignore def _should_persist(self) -> bool: """Returns whether data should be persisted to the persistent store.""" @@ -347,33 +376,31 @@ def _should_persist(self) -> bool: and self._persistent_store_writable ) - def _changes_to_store_data( - self, changes: List[Change] - ) -> Mapping[VersionedDataKind, Mapping[str, dict]]: + def _changes_to_store_data(self, changes: List[Change]) -> Collections: """ - Convert a list of Changes to the format expected by FeatureStore.init(). + Convert a list of Changes to the pre-existing format used by FeatureStore. Args: changes: List of changes to convert Returns: - Mapping suitable for FeatureStore.init() + Mapping suitable for FeatureStore operations. """ - all_data: Dict[VersionedDataKind, Dict[str, dict]] = { + all_data: Collections = { FEATURES: {}, SEGMENTS: {}, } for change in changes: + kind = FEATURES if change.kind == ObjectKind.FLAG else SEGMENTS if change.action == ChangeType.PUT and change.object is not None: - kind = FEATURES if change.kind == ObjectKind.FLAG else SEGMENTS all_data[kind][change.key] = change.object + if change.action == ChangeType.DELETE: + all_data[kind][change.key] = {'key': change.key, 'deleted': True, 'version': change.version} return all_data - def _reset_dependency_tracker( - self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]] - ) -> None: + def _reset_dependency_tracker(self, all_data: Collections) -> None: """Reset dependency tracker with new full data set.""" self._dependency_tracker.reset() for kind, items in all_data.items(): @@ -388,8 +415,8 @@ def _send_change_events(self, affected_items: Set[KindAndKey]) -> None: def _compute_changed_items_for_full_data_set( self, - old_data: Mapping[VersionedDataKind, Mapping[str, dict]], - new_data: Mapping[VersionedDataKind, Mapping[str, dict]], + old_data: Collections, + new_data: Collections, ) -> Set[KindAndKey]: """Compute which items changed between old and new data sets.""" affected_items: Set[KindAndKey] = set() @@ -436,7 +463,7 @@ def commit(self) -> Optional[Exception]: return e return None - def get_active_store(self) -> FeatureStore: + def get_active_store(self) -> ReadOnlyStore: """Get the currently active store for reading data.""" with self._lock: return self._active_store diff --git a/ldclient/impl/dependency_tracker.py b/ldclient/impl/dependency_tracker.py index 1f6286b2..23d6b0d5 100644 --- a/ldclient/impl/dependency_tracker.py +++ b/ldclient/impl/dependency_tracker.py @@ -89,7 +89,7 @@ def compute_dependencies_from(from_kind: VersionedDataKind, from_item: Optional[ @param from_item [LaunchDarkly::Impl::Model::FeatureFlag, LaunchDarkly::Impl::Model::Segment] @return [Set] """ - if from_item is None: + if from_item is None or from_item.get('deleted', False): return set() from_item = from_kind.decode(from_item) if isinstance(from_item, dict) else from_item diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index cae5c237..307d5545 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -6,7 +6,7 @@ from abc import ABCMeta, abstractmethod, abstractproperty from enum import Enum -from typing import Any, Callable, Mapping, Optional +from typing import Any, Callable, Mapping, Optional, Protocol from ldclient.context import Context from ldclient.impl.listeners import Listeners @@ -39,6 +39,23 @@ class DataStoreMode(Enum): """ +class ReadOnlyStore(Protocol): + """ReadOnlyStore is a read-only interface for a feature store.""" + + @abstractmethod + def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x) -> Any: + raise NotImplementedError + + @abstractmethod + def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x: x) -> Any: + raise NotImplementedError + + @property + @abstractmethod + def initialized(self) -> bool: + raise NotImplementedError + + class FeatureStore: """ Interface for a versioned store for feature flags and related objects received from LaunchDarkly. diff --git a/ldclient/testing/impl/datasystem/test_fdv2_persistence.py b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py index 34cbd4c9..f7898d58 100644 --- a/ldclient/testing/impl/datasystem/test_fdv2_persistence.py +++ b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py @@ -20,7 +20,7 @@ class StubFeatureStore(FeatureStore): def __init__( self, initial_data: Optional[ - Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]] + Dict[VersionedDataKind, Dict[str, Dict[Any, Any]]] ] = None, ): self._data: Dict[VersionedDataKind, Dict[str, dict]] = { @@ -433,8 +433,8 @@ def test_persistent_store_delete_operations(): store.apply(delete_changeset, True) # Verify delete was called on persistent store - assert len(persistent_store.delete_calls) > 0 - assert any(call[1] == "deletable-flag" for call in persistent_store.delete_calls) + assert len(persistent_store.upsert_calls) > 0 + assert any(call[1] == "deletable-flag" for call in persistent_store.upsert_calls) def test_data_store_status_provider(): diff --git a/pyproject.toml b/pyproject.toml index 6f6ae434..731f7bef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ expiringdict = ">=1.1.4" pyRFC3339 = ">=1.0" semver = ">=2.10.2" urllib3 = ">=1.26.0,<3" -launchdarkly-eventsource = ">=1.2.4,<2.0.0" +launchdarkly-eventsource = ">=1.4.0,<2.0.0" redis = { version = ">=2.10.5", optional = true } python-consul = { version = ">=1.0.1", optional = true }