Skip to content

Commit 383a396

Browse files
committed
chore: Add support for diagnostic events
1 parent 3e8113f commit 383a396

File tree

6 files changed

+86
-20
lines changed

6 files changed

+86
-20
lines changed

ldclient/impl/datasourcev2/streaming.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@
1818
from ld_eventsource.errors import HTTPStatusError
1919

2020
from ldclient.config import Config
21-
from ldclient.impl.datasystem import SelectorStore, Synchronizer, Update
21+
from ldclient.impl.datasystem import (
22+
DiagnosticAccumulator,
23+
DiagnosticSource,
24+
SelectorStore,
25+
Synchronizer,
26+
Update
27+
)
2228
from ldclient.impl.datasystem.protocolv2 import (
2329
ChangeSetBuilder,
2430
DeleteObject,
@@ -98,7 +104,7 @@ def query_params() -> dict[str, str]:
98104
)
99105

100106

101-
class StreamingDataSource(Synchronizer):
107+
class StreamingDataSource(Synchronizer, DiagnosticSource):
102108
"""
103109
StreamingSynchronizer is a specific type of Synchronizer that handles
104110
streaming data sources.
@@ -112,6 +118,11 @@ def __init__(self, config: Config):
112118
self._config = config
113119
self._sse: Optional[SSEClient] = None
114120
self._running = False
121+
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
122+
self._connection_attempt_start_time: Optional[float] = None
123+
124+
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
125+
self._diagnostic_accumulator = diagnostic_accumulator
115126

116127
@property
117128
def name(self) -> str:
@@ -133,6 +144,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
133144

134145
change_set_builder = ChangeSetBuilder()
135146
self._running = True
147+
self._connection_attempt_start_time = time()
136148

137149
for action in self._sse.all:
138150
if isinstance(action, Fault):
@@ -153,6 +165,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
153165
if isinstance(action, Start) and action.headers is not None:
154166
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
155167
if fallback:
168+
self._record_stream_init(True)
156169
yield Update(
157170
state=DataSourceState.OFF,
158171
revert_to_fdv1=True
@@ -165,6 +178,8 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
165178
try:
166179
update = self._process_message(action, change_set_builder)
167180
if update is not None:
181+
self._record_stream_init(False)
182+
self._connection_attempt_start_time = None
168183
yield update
169184
except json.decoder.JSONDecodeError as e:
170185
log.info(
@@ -192,10 +207,6 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
192207
environment_id=None, # TODO(sdk-1410)
193208
)
194209

195-
# TODO(sdk-1408)
196-
# if update is not None:
197-
# self._record_stream_init(False)
198-
199210
self._sse.close()
200211

201212
def stop(self):
@@ -207,6 +218,12 @@ def stop(self):
207218
if self._sse:
208219
self._sse.close()
209220

221+
def _record_stream_init(self, failed: bool):
222+
if self._diagnostic_accumulator and self._connection_attempt_start_time:
223+
current_time = int(time() * 1000)
224+
elapsed = current_time - int(self._connection_attempt_start_time * 1000)
225+
self._diagnostic_accumulator.record_stream_init(current_time, elapsed if elapsed >= 0 else 0, failed)
226+
210227
# pylint: disable=too-many-return-statements
211228
def _process_message(
212229
self, msg: Event, change_set_builder: ChangeSetBuilder
@@ -301,6 +318,9 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
301318

302319
if isinstance(error, json.decoder.JSONDecodeError):
303320
log.error("Unexpected error on stream connection: %s, will retry", error)
321+
self._record_stream_init(True)
322+
self._connection_attempt_start_time = time() + \
323+
self._sse.next_retry_delay # type: ignore
304324

305325
update = Update(
306326
state=DataSourceState.INTERRUPTED,
@@ -313,6 +333,10 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
313333
return (update, True)
314334

315335
if isinstance(error, HTTPStatusError):
336+
self._record_stream_init(True)
337+
self._connection_attempt_start_time = time() + \
338+
self._sse.next_retry_delay # type: ignore
339+
316340
error_info = DataSourceErrorInfo(
317341
DataSourceErrorKind.ERROR_RESPONSE,
318342
error.status,
@@ -344,6 +368,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
344368
)
345369

346370
if not is_recoverable:
371+
self._connection_attempt_start_time = None
347372
log.error(http_error_message_result)
348373
self.stop()
349374
return (update, False)
@@ -352,6 +377,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
352377
return (update, True)
353378

354379
log.warning("Unexpected error on stream connection: %s, will retry", error)
380+
self._record_stream_init(True)
381+
self._connection_attempt_start_time = time() + self._sse.next_retry_delay # type: ignore
355382

356383
update = Update(
357384
state=DataSourceState.INTERRUPTED,

ldclient/impl/datasystem/__init__.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from dataclasses import dataclass
88
from enum import Enum
99
from threading import Event
10-
from typing import Callable, Generator, Optional, Protocol
10+
from typing import Generator, Optional, Protocol, runtime_checkable
1111

1212
from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet, Selector
1313
from ldclient.impl.util import _Result
@@ -151,6 +151,27 @@ def store(self) -> ReadOnlyStore:
151151
raise NotImplementedError
152152

153153

154+
class DiagnosticAccumulator(Protocol):
155+
def record_stream_init(self, timestamp, duration, failed):
156+
raise NotImplementedError
157+
158+
def record_events_in_batch(self, events_in_batch):
159+
raise NotImplementedError
160+
161+
def create_event_and_reset(self, dropped_events, deduplicated_users):
162+
raise NotImplementedError
163+
164+
165+
@runtime_checkable
166+
class DiagnosticSource(Protocol):
167+
@abstractmethod
168+
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
169+
"""
170+
Set the diagnostic_accumulator to be used for reporting diagnostic events.
171+
"""
172+
raise NotImplementedError
173+
174+
154175
class SelectorStore(Protocol):
155176
"""
156177
SelectorStore represents a component capable of providing Selectors

ldclient/impl/datasystem/fdv1.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
DataStoreStatusProviderImpl,
1414
DataStoreUpdateSinkImpl
1515
)
16-
from ldclient.impl.datasystem import DataAvailability
16+
from ldclient.impl.datasystem import DataAvailability, DiagnosticAccumulator
1717
from ldclient.impl.flag_tracker import FlagTrackerImpl
1818
from ldclient.impl.listeners import Listeners
1919
from ldclient.impl.stubs import NullUpdateProcessor
@@ -78,7 +78,7 @@ def __init__(self, config: Config):
7878
self._update_processor: Optional[UpdateProcessor] = None
7979

8080
# Diagnostic accumulator provided by client for streaming metrics
81-
self._diagnostic_accumulator = None
81+
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
8282

8383
# Track current data availability
8484
self._data_availability: DataAvailability = (
@@ -122,7 +122,7 @@ def set_flag_value_eval_fn(self, eval_fn):
122122
"""
123123
self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn)
124124

125-
def set_diagnostic_accumulator(self, diagnostic_accumulator):
125+
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
126126
"""
127127
Sets the diagnostic accumulator for streaming initialization metrics.
128128
This should be called before start() to ensure metrics are collected.

ldclient/impl/datasystem/fdv2.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@
99
DataSourceStatusProviderImpl,
1010
DataStoreStatusProviderImpl
1111
)
12-
from ldclient.impl.datasystem import DataAvailability, Synchronizer
12+
from ldclient.impl.datasystem import (
13+
DataAvailability,
14+
DiagnosticAccumulator,
15+
DiagnosticSource,
16+
Synchronizer
17+
)
1318
from ldclient.impl.datasystem.store import Store
1419
from ldclient.impl.flag_tracker import FlagTrackerImpl
1520
from ldclient.impl.listeners import Listeners
@@ -173,9 +178,7 @@ def __init__(
173178
self._disabled = self._config.offline
174179

175180
# Diagnostic accumulator provided by client for streaming metrics
176-
# TODO(fdv2): Either we need to use this, or we need to provide it to
177-
# the streaming synchronizers
178-
self._diagnostic_accumulator = None
181+
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
179182

180183
# Set up event listeners
181184
self._flag_change_listeners = Listeners()
@@ -261,7 +264,7 @@ def stop(self):
261264
# Close the store
262265
self._store.close()
263266

264-
def set_diagnostic_accumulator(self, diagnostic_accumulator):
267+
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
265268
"""
266269
Sets the diagnostic accumulator for streaming initialization metrics.
267270
This should be called before start() to ensure metrics are collected.
@@ -334,6 +337,8 @@ def synchronizer_loop(self: 'FDv2'):
334337
try:
335338
self._lock.lock()
336339
primary_sync = self._primary_synchronizer_builder(self._config)
340+
if isinstance(primary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None:
341+
primary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator)
337342
self._active_synchronizer = primary_sync
338343
self._lock.unlock()
339344

@@ -367,6 +372,8 @@ def synchronizer_loop(self: 'FDv2'):
367372

368373
self._lock.lock()
369374
secondary_sync = self._secondary_synchronizer_builder(self._config)
375+
if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None:
376+
secondary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator)
370377
log.info("Secondary synchronizer %s is starting", secondary_sync.name)
371378
self._active_synchronizer = secondary_sync
372379
self._lock.unlock()
@@ -386,7 +393,6 @@ def synchronizer_loop(self: 'FDv2'):
386393
DataSourceState.OFF,
387394
self._data_source_status_provider.status.error
388395
)
389-
# TODO: WE might need to also set that threading.Event here
390396
break
391397

392398
log.info("Recovery condition met, returning to primary synchronizer")
@@ -398,8 +404,7 @@ def synchronizer_loop(self: 'FDv2'):
398404
log.error("Error in synchronizer loop: %s", e)
399405
finally:
400406
# Ensure we always set the ready event when exiting
401-
if not set_on_ready.is_set():
402-
set_on_ready.set()
407+
set_on_ready.set()
403408
self._lock.lock()
404409
if self._active_synchronizer is not None:
405410
self._active_synchronizer.stop()

ldclient/impl/datasystem/protocolv2.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
from abc import abstractmethod
77
from dataclasses import dataclass
88
from enum import Enum
9-
from typing import Any, List, Optional, Protocol
9+
from typing import TYPE_CHECKING, Generator, List, Optional, Protocol
1010

1111
from ldclient.impl.util import Result
1212

13+
if TYPE_CHECKING:
14+
from ldclient.impl.datasystem import SelectorStore, Update
15+
1316

1417
class EventName(str, Enum):
1518
"""
@@ -502,7 +505,13 @@ def name(self) -> str:
502505
"""Returns the name of the initializer."""
503506
raise NotImplementedError
504507

505-
# TODO(fdv2): Need sync method
508+
def sync(self, ss: "SelectorStore") -> "Generator[Update, None, None]":
509+
"""
510+
sync should begin the synchronization process for the data source, yielding
511+
Update objects until the connection is closed or an unrecoverable error
512+
occurs.
513+
"""
514+
raise NotImplementedError
506515

507516
def close(self):
508517
"""

ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ def __init__(
5353
def all(self) -> Iterable[Action]:
5454
return self._events
5555

56+
@property
57+
def next_retry_delay(self):
58+
return 1
59+
5660
def interrupt(self):
5761
pass
5862

0 commit comments

Comments
 (0)