Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
from ld_eventsource.errors import HTTPStatusError

from ldclient.config import Config
from ldclient.impl.datasystem import SelectorStore, Synchronizer, Update
from ldclient.impl.datasystem import (
DiagnosticAccumulator,
DiagnosticSource,
SelectorStore,
Synchronizer,
Update
)
from ldclient.impl.datasystem.protocolv2 import (
ChangeSetBuilder,
DeleteObject,
Expand Down Expand Up @@ -98,7 +104,7 @@ def query_params() -> dict[str, str]:
)


class StreamingDataSource(Synchronizer):
class StreamingDataSource(Synchronizer, DiagnosticSource):
"""
StreamingSynchronizer is a specific type of Synchronizer that handles
streaming data sources.
Expand All @@ -112,6 +118,11 @@ def __init__(self, config: Config):
self._config = config
self._sse: Optional[SSEClient] = None
self._running = False
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
self._connection_attempt_start_time: Optional[float] = None

def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
self._diagnostic_accumulator = diagnostic_accumulator

@property
def name(self) -> str:
Expand All @@ -133,6 +144,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:

change_set_builder = ChangeSetBuilder()
self._running = True
self._connection_attempt_start_time = time()

for action in self._sse.all:
if isinstance(action, Fault):
Expand All @@ -153,6 +165,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
if isinstance(action, Start) and action.headers is not None:
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
if fallback:
self._record_stream_init(True)
yield Update(
state=DataSourceState.OFF,
revert_to_fdv1=True
Expand All @@ -165,6 +178,8 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
try:
update = self._process_message(action, change_set_builder)
if update is not None:
self._record_stream_init(False)
self._connection_attempt_start_time = None
yield update
except json.decoder.JSONDecodeError as e:
log.info(
Expand Down Expand Up @@ -192,10 +207,6 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
environment_id=None, # TODO(sdk-1410)
)

# TODO(sdk-1408)
# if update is not None:
# self._record_stream_init(False)

self._sse.close()

def stop(self):
Expand All @@ -207,6 +218,12 @@ def stop(self):
if self._sse:
self._sse.close()

def _record_stream_init(self, failed: bool):
if self._diagnostic_accumulator and self._connection_attempt_start_time:
current_time = int(time() * 1000)
elapsed = current_time - int(self._connection_attempt_start_time * 1000)
self._diagnostic_accumulator.record_stream_init(current_time, elapsed if elapsed >= 0 else 0, failed)

# pylint: disable=too-many-return-statements
def _process_message(
self, msg: Event, change_set_builder: ChangeSetBuilder
Expand Down Expand Up @@ -301,6 +318,9 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:

if isinstance(error, json.decoder.JSONDecodeError):
log.error("Unexpected error on stream connection: %s, will retry", error)
self._record_stream_init(True)
self._connection_attempt_start_time = time() + \
self._sse.next_retry_delay # type: ignore
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Ensure Accurate Connection Attempt Timing

After recording a failed stream initialization, _connection_attempt_start_time is set directly to a future time without first setting it to None. This differs from the v1 implementation and could cause incorrect elapsed time calculations if _record_stream_init is called again before the next connection attempt starts. The v1 pattern sets it to None first to prevent using the future timestamp in subsequent diagnostic recordings.

Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only seems that way. If you look carefully, you will see that the v1 streaming implementation sets this to None for the JSONDecodeError branch, but eventually falls through to the line that does

        self._connection_attempt_start_time = time.time() + self._sse.next_retry_delay

I just cut out the middle assignment.


update = Update(
state=DataSourceState.INTERRUPTED,
Expand All @@ -313,6 +333,10 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
return (update, True)

if isinstance(error, HTTPStatusError):
self._record_stream_init(True)
self._connection_attempt_start_time = time() + \
self._sse.next_retry_delay # type: ignore

error_info = DataSourceErrorInfo(
DataSourceErrorKind.ERROR_RESPONSE,
error.status,
Expand Down Expand Up @@ -344,6 +368,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
)

if not is_recoverable:
self._connection_attempt_start_time = None
log.error(http_error_message_result)
self.stop()
return (update, False)
Expand All @@ -352,6 +377,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
return (update, True)

log.warning("Unexpected error on stream connection: %s, will retry", error)
self._record_stream_init(True)
self._connection_attempt_start_time = time() + self._sse.next_retry_delay # type: ignore

update = Update(
state=DataSourceState.INTERRUPTED,
Expand Down
23 changes: 22 additions & 1 deletion ldclient/impl/datasystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from enum import Enum
from threading import Event
from typing import Callable, Generator, Optional, Protocol
from typing import Generator, Optional, Protocol, runtime_checkable

from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet, Selector
from ldclient.impl.util import _Result
Expand Down Expand Up @@ -151,6 +151,27 @@ def store(self) -> ReadOnlyStore:
raise NotImplementedError


class DiagnosticAccumulator(Protocol):
def record_stream_init(self, timestamp, duration, failed):
raise NotImplementedError

def record_events_in_batch(self, events_in_batch):
raise NotImplementedError

def create_event_and_reset(self, dropped_events, deduplicated_users):
raise NotImplementedError


@runtime_checkable
class DiagnosticSource(Protocol):
@abstractmethod
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
"""
Set the diagnostic_accumulator to be used for reporting diagnostic events.
"""
raise NotImplementedError


class SelectorStore(Protocol):
"""
SelectorStore represents a component capable of providing Selectors
Expand Down
6 changes: 3 additions & 3 deletions ldclient/impl/datasystem/fdv1.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
DataStoreStatusProviderImpl,
DataStoreUpdateSinkImpl
)
from ldclient.impl.datasystem import DataAvailability
from ldclient.impl.datasystem import DataAvailability, DiagnosticAccumulator
from ldclient.impl.flag_tracker import FlagTrackerImpl
from ldclient.impl.listeners import Listeners
from ldclient.impl.stubs import NullUpdateProcessor
Expand Down Expand Up @@ -78,7 +78,7 @@ def __init__(self, config: Config):
self._update_processor: Optional[UpdateProcessor] = None

# Diagnostic accumulator provided by client for streaming metrics
self._diagnostic_accumulator = None
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None

# Track current data availability
self._data_availability: DataAvailability = (
Expand Down Expand Up @@ -122,7 +122,7 @@ def set_flag_value_eval_fn(self, eval_fn):
"""
self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn)

def set_diagnostic_accumulator(self, diagnostic_accumulator):
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
"""
Sets the diagnostic accumulator for streaming initialization metrics.
This should be called before start() to ensure metrics are collected.
Expand Down
21 changes: 13 additions & 8 deletions ldclient/impl/datasystem/fdv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
DataSourceStatusProviderImpl,
DataStoreStatusProviderImpl
)
from ldclient.impl.datasystem import DataAvailability, Synchronizer
from ldclient.impl.datasystem import (
DataAvailability,
DiagnosticAccumulator,
DiagnosticSource,
Synchronizer
)
from ldclient.impl.datasystem.store import Store
from ldclient.impl.flag_tracker import FlagTrackerImpl
from ldclient.impl.listeners import Listeners
Expand Down Expand Up @@ -173,9 +178,7 @@ def __init__(
self._disabled = self._config.offline

# Diagnostic accumulator provided by client for streaming metrics
# TODO(fdv2): Either we need to use this, or we need to provide it to
# the streaming synchronizers
self._diagnostic_accumulator = None
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None

# Set up event listeners
self._flag_change_listeners = Listeners()
Expand Down Expand Up @@ -261,7 +264,7 @@ def stop(self):
# Close the store
self._store.close()

def set_diagnostic_accumulator(self, diagnostic_accumulator):
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
"""
Sets the diagnostic accumulator for streaming initialization metrics.
This should be called before start() to ensure metrics are collected.
Expand Down Expand Up @@ -334,6 +337,8 @@ def synchronizer_loop(self: 'FDv2'):
try:
self._lock.lock()
primary_sync = self._primary_synchronizer_builder(self._config)
if isinstance(primary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None:
primary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator)
self._active_synchronizer = primary_sync
self._lock.unlock()

Expand Down Expand Up @@ -367,6 +372,8 @@ def synchronizer_loop(self: 'FDv2'):

self._lock.lock()
secondary_sync = self._secondary_synchronizer_builder(self._config)
if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None:
secondary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator)
log.info("Secondary synchronizer %s is starting", secondary_sync.name)
self._active_synchronizer = secondary_sync
self._lock.unlock()
Expand All @@ -386,7 +393,6 @@ def synchronizer_loop(self: 'FDv2'):
DataSourceState.OFF,
self._data_source_status_provider.status.error
)
# TODO: WE might need to also set that threading.Event here
break

log.info("Recovery condition met, returning to primary synchronizer")
Expand All @@ -398,8 +404,7 @@ def synchronizer_loop(self: 'FDv2'):
log.error("Error in synchronizer loop: %s", e)
finally:
# Ensure we always set the ready event when exiting
if not set_on_ready.is_set():
set_on_ready.set()
set_on_ready.set()
self._lock.lock()
if self._active_synchronizer is not None:
self._active_synchronizer.stop()
Expand Down
13 changes: 11 additions & 2 deletions ldclient/impl/datasystem/protocolv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
from abc import abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, List, Optional, Protocol
from typing import TYPE_CHECKING, Generator, List, Optional, Protocol

from ldclient.impl.util import Result

if TYPE_CHECKING:
from ldclient.impl.datasystem import SelectorStore, Update


class EventName(str, Enum):
"""
Expand Down Expand Up @@ -502,7 +505,13 @@ def name(self) -> str:
"""Returns the name of the initializer."""
raise NotImplementedError

# TODO(fdv2): Need sync method
def sync(self, ss: "SelectorStore") -> "Generator[Update, None, None]":
"""
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
raise NotImplementedError

def close(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def __init__(
def all(self) -> Iterable[Action]:
return self._events

@property
def next_retry_delay(self):
return 1

def interrupt(self):
pass

Expand Down