Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from ldclient.impl.http import _http_factory
from ldclient.impl.repeating_task import RepeatingTask
from ldclient.impl.util import (
_LD_ENVID_HEADER,
_LD_FD_FALLBACK_HEADER,
UnsuccessfulResponseException,
_Fail,
_headers,
Expand Down Expand Up @@ -117,6 +119,13 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
while self._stop.is_set() is False:
result = self._requester.fetch(ss.selector())
if isinstance(result, _Fail):
fallback = None
envid = None

if result.headers is not None:
fallback = result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
envid = result.headers.get(_LD_ENVID_HEADER)

if isinstance(result.exception, UnsuccessfulResponseException):
error_info = DataSourceErrorInfo(
kind=DataSourceErrorKind.ERROR_RESPONSE,
Expand All @@ -127,28 +136,28 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
),
)

fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true'
if fallback:
yield Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
)
break

status_code = result.exception.status
if is_http_error_recoverable(status_code):
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
environment_id=envid,
)
continue

# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.OFF,
error=error_info,
environment_id=envid,
)
break

Expand All @@ -159,19 +168,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
message=result.error,
)

# TODO(fdv2): Go has a designation here to handle JSON decoding separately.
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
environment_id=envid,
)
else:
(change_set, headers) = result.value
yield Update(
state=DataSourceState.VALID,
change_set=change_set,
environment_id=headers.get("X-LD-EnvID"),
revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true'
environment_id=headers.get(_LD_ENVID_HEADER),
revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
)

if self._event.wait(self._poll_interval):
Expand Down Expand Up @@ -208,7 +216,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult:

(change_set, headers) = result.value

env_id = headers.get("X-LD-EnvID")
env_id = headers.get(_LD_ENVID_HEADER)
if not isinstance(env_id, str):
env_id = None

Expand Down Expand Up @@ -273,14 +281,14 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
),
retries=1,
)
headers = response.headers

if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
f"HTTP error {response}", UnsuccessfulResponseException(response.status),
headers=headers,
)

headers = response.headers

if response.status == 304:
return _Success(value=(ChangeSetBuilder.no_changes(), headers))

Expand All @@ -304,6 +312,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
return _Fail(
error=changeset_result.error,
exception=changeset_result.exception,
headers=headers, # type: ignore
)


Expand Down Expand Up @@ -436,13 +445,13 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
retries=1,
)

headers = response.headers
if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
f"HTTP error {response}", UnsuccessfulResponseException(response.status),
headers=headers
)

headers = response.headers

if response.status == 304:
return _Success(value=(ChangeSetBuilder.no_changes(), headers))

Expand All @@ -466,6 +475,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
return _Fail(
error=changeset_result.error,
exception=changeset_result.exception,
headers=headers,
)


Expand Down
44 changes: 27 additions & 17 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
)
from ldclient.impl.http import HTTPFactory, _http_factory
from ldclient.impl.util import (
_LD_ENVID_HEADER,
_LD_FD_FALLBACK_HEADER,
http_error_message,
is_http_error_recoverable,
log
Expand All @@ -58,7 +60,6 @@

STREAMING_ENDPOINT = "/sdk/stream"


SseClientBuilder = Callable[[Config, SelectorStore], SSEClient]


Expand Down Expand Up @@ -154,29 +155,35 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
if action.error is None:
continue

(update, should_continue) = self._handle_error(action.error)
envid = action.headers.get(_LD_ENVID_HEADER) if action.headers is not None else None

(update, should_continue) = self._handle_error(action.error, envid)
if update is not None:
yield update

if not should_continue:
break
continue

envid = None
if isinstance(action, Start) and action.headers is not None:
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
envid = action.headers.get(_LD_ENVID_HEADER)

if fallback:
self._record_stream_init(True)
yield Update(
state=DataSourceState.OFF,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
)
break

if not isinstance(action, Event):
continue

try:
update = self._process_message(action, change_set_builder)
update = self._process_message(action, change_set_builder, envid)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Environment ID Fails to Persist

The envid variable is reset to None at the start of each loop iteration, causing all Event actions to lose the environment ID that was captured from the initial Start action. The environment ID should persist across the stream session, but resetting it means only the Start action itself retains the correct envid, while all subsequent events processed by _process_message receive None instead of the actual environment ID.

Fix in Cursor Fix in Web

if update is not None:
self._record_stream_init(False)
self._connection_attempt_start_time = None
Expand All @@ -187,7 +194,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
)
self._sse.interrupt()

(update, should_continue) = self._handle_error(e)
(update, should_continue) = self._handle_error(e, envid)
if update is not None:
yield update
if not should_continue:
Expand All @@ -204,7 +211,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

self._sse.close()
Expand All @@ -226,7 +233,7 @@ def _record_stream_init(self, failed: bool):

# pylint: disable=too-many-return-statements
def _process_message(
self, msg: Event, change_set_builder: ChangeSetBuilder
self, msg: Event, change_set_builder: ChangeSetBuilder, envid: Optional[str]
) -> Optional[Update]:
"""
Processes a single message from the SSE stream and returns an Update
Expand All @@ -247,7 +254,7 @@ def _process_message(
change_set_builder.expect_changes()
return Update(
state=DataSourceState.VALID,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
return None

Expand Down Expand Up @@ -293,13 +300,13 @@ def _process_message(
return Update(
state=DataSourceState.VALID,
change_set=change_set,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

log.info("Unexpected event found in stream: %s", msg.event)
return None

def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optional[Update], bool]:
"""
This method handles errors that occur during the streaming process.

Expand Down Expand Up @@ -328,7 +335,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
DataSourceErrorKind.INVALID_DATA, 0, time(), str(error)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
return (update, True)

Expand All @@ -344,11 +351,15 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
str(error),
)

if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true':
if envid is None and error.headers is not None:
envid = error.headers.get(_LD_ENVID_HEADER)

if error.headers is not None and error.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
update = Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
)
return (update, False)

Expand All @@ -364,7 +375,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
),
error=error_info,
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

if not is_recoverable:
Expand All @@ -386,7 +397,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
DataSourceErrorKind.UNKNOWN, 0, time(), str(error)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
# no stacktrace here because, for a typical connection error, it'll
# just be a lengthy tour of urllib3 internals
Expand All @@ -411,5 +422,4 @@ def __init__(self, config: Config):

def build(self) -> StreamingDataSource:
"""Builds a StreamingDataSource instance with the configured parameters."""
# TODO(fdv2): Add in the other controls here.
return StreamingDataSource(self._config)
15 changes: 0 additions & 15 deletions ldclient/impl/datasystem/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,3 @@ def persistent_store(store: FeatureStore) -> ConfigBuilder:
although it will keep it up-to-date.
"""
return default().data_store(store, DataStoreMode.READ_WRITE)


# TODO(fdv2): Implement these methods
#
# WithEndpoints configures the data system with custom endpoints for
# LaunchDarkly's streaming and polling synchronizers. This method is not
# necessary for most use-cases, but can be useful for testing or custom
# network configurations.
#
# Any endpoint that is not specified (empty string) will be treated as the
# default LaunchDarkly SaaS endpoint for that service.

# WithRelayProxyEndpoints configures the data system with a single endpoint
# for LaunchDarkly's streaming and polling synchronizers. The endpoint
# should be Relay Proxy's base URI, for example http://localhost:8123.
15 changes: 7 additions & 8 deletions ldclient/impl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Generic, Optional, TypeVar, Union
from typing import Any, Dict, Generic, Mapping, Optional, TypeVar, Union
from urllib.parse import urlparse, urlunparse

from ldclient.impl.http import _base_headers
Expand Down Expand Up @@ -35,6 +35,9 @@ def timedelta_millis(delta: timedelta) -> float:
# Compiled regex pattern for valid characters in application values and SDK keys
_VALID_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9._-]")

_LD_ENVID_HEADER = 'X-LD-EnvID'
_LD_FD_FALLBACK_HEADER = 'X-LD-FD-Fallback'


def validate_application_info(application: dict, logger: logging.Logger) -> dict:
return {
Expand Down Expand Up @@ -117,23 +120,18 @@ def __str__(self, *args, **kwargs):


class UnsuccessfulResponseException(Exception):
def __init__(self, status, headers={}):
def __init__(self, status):
super(UnsuccessfulResponseException, self).__init__("HTTP error %d" % status)
self._status = status
self._headers = headers

@property
def status(self):
return self._status

@property
def headers(self):
return self._headers


def throw_if_unsuccessful_response(resp):
if resp.status >= 400:
raise UnsuccessfulResponseException(resp.status, resp.headers)
raise UnsuccessfulResponseException(resp.status)


def is_http_error_recoverable(status):
Expand Down Expand Up @@ -290,6 +288,7 @@ class _Success(Generic[T]):
class _Fail(Generic[E]):
error: E
exception: Optional[Exception] = None
headers: Optional[Mapping[str, Any]] = None


# TODO(breaking): Replace the above Result class with an improved generic
Expand Down
12 changes: 8 additions & 4 deletions ldclient/integrations/test_datav2.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,17 +551,21 @@ class TestDataV2:
::

from ldclient.impl.datasystem import config as datasystem_config
from ldclient.integrations.test_datav2 import TestDataV2


td = TestDataV2.data_source()
td.update(td.flag('flag-key-1').variation_for_all(True))

# Configure the data system with TestDataV2 as both initializer and synchronizer
data_config = datasystem_config.custom()
data_config.initializers([lambda: td.build_initializer()])
data_config.synchronizers(lambda: td.build_synchronizer())
data_config.initializers([td.build_initializer])
data_config.synchronizers(td.build_synchronizer)

# TODO(fdv2): This will be integrated with the main Config in a future version
# For now, TestDataV2 is primarily intended for unit testing scenarios
config = Config(
sdk_key,
datasystem_config=data_config.build(),
)

# flags can be updated at any time:
td.update(td.flag('flag-key-1').
Expand Down