Skip to content

Commit 3e8113f

Browse files
authored
chore: Add support for FDv1 polling synchronizer (#368)
1 parent cde2f68 commit 3e8113f

File tree

11 files changed

+684
-32
lines changed

11 files changed

+684
-32
lines changed

ldclient/config.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,6 @@ class DataSystemConfig:
179179
data_store: Optional[FeatureStore] = None
180180
"""The (optional) persistent data store instance."""
181181

182-
# TODO(fdv2): Implement this synchronizer up and hook it up everywhere.
183-
# TODO(fdv2): Remove this when FDv2 is fully launched
184182
fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
185183
"""An optional fallback synchronizer that will read from FDv1"""
186184

ldclient/impl/datasourcev2/polling.py

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import urllib3
1515

1616
from ldclient.config import Config
17+
from ldclient.impl.datasource.feature_requester import LATEST_ALL_URI
1718
from ldclient.impl.datasystem import BasisResult, SelectorStore, Update
1819
from ldclient.impl.datasystem.protocolv2 import (
1920
Basis,
@@ -22,6 +23,8 @@
2223
DeleteObject,
2324
EventName,
2425
IntentCode,
26+
ObjectKind,
27+
Payload,
2528
PutObject,
2629
Selector,
2730
ServerIntent
@@ -43,6 +46,7 @@
4346
DataSourceErrorKind,
4447
DataSourceState
4548
)
49+
from ldclient.versioned_data_kind import FEATURES, SEGMENTS
4650

4751
POLLING_ENDPOINT = "/sdk/poll"
4852

@@ -123,6 +127,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
123127
),
124128
)
125129

130+
fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true'
131+
if fallback:
132+
yield Update(
133+
state=DataSourceState.OFF,
134+
error=error_info,
135+
revert_to_fdv1=True
136+
)
137+
break
138+
126139
status_code = result.exception.status
127140
if is_http_error_recoverable(status_code):
128141
# TODO(fdv2): Add support for environment ID
@@ -158,6 +171,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
158171
state=DataSourceState.VALID,
159172
change_set=change_set,
160173
environment_id=headers.get("X-LD-EnvID"),
174+
revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true'
161175
)
162176

163177
if self._event.wait(self._poll_interval):
@@ -262,7 +276,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
262276

263277
if response.status >= 400:
264278
return _Fail(
265-
f"HTTP error {response}", UnsuccessfulResponseException(response.status)
279+
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
266280
)
267281

268282
headers = response.headers
@@ -375,3 +389,118 @@ def build(self) -> PollingDataSource:
375389
return PollingDataSource(
376390
poll_interval=self._config.poll_interval, requester=requester
377391
)
392+
393+
394+
# pylint: disable=too-few-public-methods
395+
class Urllib3FDv1PollingRequester:
396+
"""
397+
Urllib3PollingRequesterFDv1 is a Requester that uses urllib3 to make HTTP
398+
requests.
399+
"""
400+
401+
def __init__(self, config: Config):
402+
self._etag = None
403+
self._http = _http_factory(config).create_pool_manager(1, config.base_uri)
404+
self._config = config
405+
self._poll_uri = config.base_uri + LATEST_ALL_URI
406+
407+
def fetch(self, selector: Optional[Selector]) -> PollingResult:
408+
"""
409+
Fetches the data for the given selector.
410+
Returns a Result containing a tuple of ChangeSet and any request headers,
411+
or an error if the data could not be retrieved.
412+
"""
413+
query_params = {}
414+
if self._config.payload_filter_key is not None:
415+
query_params["filter"] = self._config.payload_filter_key
416+
417+
uri = self._poll_uri
418+
if len(query_params) > 0:
419+
filter_query = parse.urlencode(query_params)
420+
uri += f"?{filter_query}"
421+
422+
hdrs = _headers(self._config)
423+
hdrs["Accept-Encoding"] = "gzip"
424+
425+
if self._etag is not None:
426+
hdrs["If-None-Match"] = self._etag
427+
428+
response = self._http.request(
429+
"GET",
430+
uri,
431+
headers=hdrs,
432+
timeout=urllib3.Timeout(
433+
connect=self._config.http.connect_timeout,
434+
read=self._config.http.read_timeout,
435+
),
436+
retries=1,
437+
)
438+
439+
if response.status >= 400:
440+
return _Fail(
441+
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
442+
)
443+
444+
headers = response.headers
445+
446+
if response.status == 304:
447+
return _Success(value=(ChangeSetBuilder.no_changes(), headers))
448+
449+
data = json.loads(response.data.decode("UTF-8"))
450+
etag = headers.get("ETag")
451+
452+
if etag is not None:
453+
self._etag = etag
454+
455+
log.debug(
456+
"%s response status:[%d] ETag:[%s]",
457+
uri,
458+
response.status,
459+
etag,
460+
)
461+
462+
changeset_result = fdv1_polling_payload_to_changeset(data)
463+
if isinstance(changeset_result, _Success):
464+
return _Success(value=(changeset_result.value, headers))
465+
466+
return _Fail(
467+
error=changeset_result.error,
468+
exception=changeset_result.exception,
469+
)
470+
471+
472+
# pylint: disable=too-many-branches,too-many-return-statements
473+
def fdv1_polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]:
474+
"""
475+
Converts a fdv1 polling payload into a ChangeSet.
476+
"""
477+
builder = ChangeSetBuilder()
478+
builder.start(IntentCode.TRANSFER_FULL)
479+
selector = Selector.no_selector()
480+
481+
# FDv1 uses "flags" instead of "features", so we need to map accordingly
482+
# Map FDv1 JSON keys to ObjectKind enum values
483+
kind_mappings = [
484+
(ObjectKind.FLAG, "flags"),
485+
(ObjectKind.SEGMENT, "segments")
486+
]
487+
488+
for kind, fdv1_key in kind_mappings:
489+
kind_data = data.get(fdv1_key)
490+
if kind_data is None:
491+
continue
492+
if not isinstance(kind_data, dict):
493+
return _Fail(error=f"Invalid format: {fdv1_key} is not a dictionary")
494+
495+
for key in kind_data:
496+
flag_or_segment = kind_data.get(key)
497+
if flag_or_segment is None or not isinstance(flag_or_segment, dict):
498+
return _Fail(error=f"Invalid format: {key} is not a dictionary")
499+
500+
version = flag_or_segment.get('version')
501+
if version is None:
502+
return _Fail(error=f"Invalid format: {key} does not have a version set")
503+
504+
builder.add_put(kind, key, version, flag_or_segment)
505+
506+
return _Success(builder.finish(selector))

ldclient/impl/datasourcev2/streaming.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44
"""
55

66
import json
7-
from abc import abstractmethod
87
from time import time
9-
from typing import Callable, Generator, Iterable, Optional, Protocol, Tuple
8+
from typing import Callable, Generator, Optional, Tuple
109
from urllib import parse
1110

1211
from ld_eventsource import SSEClient
13-
from ld_eventsource.actions import Action, Event, Fault
12+
from ld_eventsource.actions import Event, Fault, Start
1413
from ld_eventsource.config import (
1514
ConnectStrategy,
1615
ErrorStrategy,
@@ -151,6 +150,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
151150
break
152151
continue
153152

153+
if isinstance(action, Start) and action.headers is not None:
154+
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
155+
if fallback:
156+
yield Update(
157+
state=DataSourceState.OFF,
158+
revert_to_fdv1=True
159+
)
160+
break
161+
154162
if not isinstance(action, Event):
155163
continue
156164

@@ -188,11 +196,6 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
188196
# if update is not None:
189197
# self._record_stream_init(False)
190198

191-
# if self._data_source_update_sink is not None:
192-
# self._data_source_update_sink.update_status(
193-
# DataSourceState.VALID, None
194-
# )
195-
196199
self._sse.close()
197200

198201
def stop(self):
@@ -288,6 +291,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
288291
289292
If an update is provided, it should be forward upstream, regardless of
290293
whether or not we are going to retry this failure.
294+
295+
The return should be thought of (update, should_continue)
291296
"""
292297
if not self._running:
293298
return (None, False) # don't retry if we've been deliberately stopped
@@ -315,12 +320,18 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
315320
str(error),
316321
)
317322

323+
if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true':
324+
update = Update(
325+
state=DataSourceState.OFF,
326+
error=error_info,
327+
revert_to_fdv1=True
328+
)
329+
return (update, False)
330+
318331
http_error_message_result = http_error_message(
319332
error.status, "stream connection"
320333
)
321-
322334
is_recoverable = is_http_error_recoverable(error.status)
323-
324335
update = Update(
325336
state=(
326337
DataSourceState.INTERRUPTED

ldclient/impl/datasystem/config.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ldclient.impl.datasourcev2.polling import (
1010
PollingDataSource,
1111
PollingDataSourceBuilder,
12+
Urllib3FDv1PollingRequester,
1213
Urllib3PollingRequester
1314
)
1415
from ldclient.impl.datasourcev2.streaming import (
@@ -55,6 +56,17 @@ def synchronizers(
5556
self._secondary_synchronizer = secondary
5657
return self
5758

59+
def fdv1_compatible_synchronizer(
60+
self,
61+
fallback: Builder[Synchronizer]
62+
) -> "ConfigBuilder":
63+
"""
64+
Configures the SDK with a fallback synchronizer that is compatible with
65+
the Flag Delivery v1 API.
66+
"""
67+
self._fdv1_fallback_synchronizer = fallback
68+
return self
69+
5870
def data_store(self, data_store: FeatureStore, store_mode: DataStoreMode) -> "ConfigBuilder":
5971
"""
6072
Sets the data store configuration for the data system.
@@ -91,6 +103,17 @@ def builder(config: LDConfig) -> PollingDataSource:
91103
return builder
92104

93105

106+
def fdv1_fallback_ds_builder() -> Builder[PollingDataSource]:
107+
def builder(config: LDConfig) -> PollingDataSource:
108+
requester = Urllib3FDv1PollingRequester(config)
109+
polling_ds = PollingDataSourceBuilder(config)
110+
polling_ds.requester(requester)
111+
112+
return polling_ds.build()
113+
114+
return builder
115+
116+
94117
def streaming_ds_builder() -> Builder[StreamingDataSource]:
95118
def builder(config: LDConfig) -> StreamingDataSource:
96119
return StreamingDataSourceBuilder(config).build()
@@ -114,10 +137,12 @@ def default() -> ConfigBuilder:
114137

115138
polling_builder = polling_ds_builder()
116139
streaming_builder = streaming_ds_builder()
140+
fallback = fdv1_fallback_ds_builder()
117141

118142
builder = ConfigBuilder()
119143
builder.initializers([polling_builder])
120144
builder.synchronizers(streaming_builder, polling_builder)
145+
builder.fdv1_compatible_synchronizer(fallback)
121146

122147
return builder
123148

@@ -130,9 +155,11 @@ def streaming() -> ConfigBuilder:
130155
"""
131156

132157
streaming_builder = streaming_ds_builder()
158+
fallback = fdv1_fallback_ds_builder()
133159

134160
builder = ConfigBuilder()
135161
builder.synchronizers(streaming_builder)
162+
builder.fdv1_compatible_synchronizer(fallback)
136163

137164
return builder
138165

@@ -145,9 +172,11 @@ def polling() -> ConfigBuilder:
145172
"""
146173

147174
polling_builder: Builder[Synchronizer] = polling_ds_builder()
175+
fallback = fdv1_fallback_ds_builder()
148176

149177
builder = ConfigBuilder()
150178
builder.synchronizers(polling_builder)
179+
builder.fdv1_compatible_synchronizer(fallback)
151180

152181
return builder
153182

ldclient/impl/datasystem/fdv2.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,13 @@ def _consume_synchronizer_results(
443443
# Update status
444444
self._data_source_status_provider.update_status(update.state, update.error)
445445

446+
# Check if we should revert to FDv1 immediately
447+
if update.revert_to_fdv1:
448+
return True, True
449+
446450
# Check for OFF state indicating permanent failure
447451
if update.state == DataSourceState.OFF:
448-
return True, update.revert_to_fdv1
452+
return True, False
449453

450454
# Check condition periodically
451455
current_status = self._data_source_status_provider.status

0 commit comments

Comments
 (0)