Skip to content

Commit 203a033

Browse files
committed
chore: Support x-ld-envid in updates
1 parent 383a396 commit 203a033

File tree

5 files changed

+64
-57
lines changed

5 files changed

+64
-57
lines changed

ldclient/impl/datasourcev2/polling.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
from ldclient.impl.http import _http_factory
3333
from ldclient.impl.repeating_task import RepeatingTask
3434
from ldclient.impl.util import (
35+
_LD_ENVID_HEADER,
36+
_LD_FD_FALLBACK_HEADER,
3537
UnsuccessfulResponseException,
3638
_Fail,
3739
_headers,
@@ -117,6 +119,13 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
117119
while self._stop.is_set() is False:
118120
result = self._requester.fetch(ss.selector())
119121
if isinstance(result, _Fail):
122+
fallback = None
123+
envid = None
124+
125+
if result.headers is not None:
126+
fallback = result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
127+
envid = result.headers.get(_LD_ENVID_HEADER)
128+
120129
if isinstance(result.exception, UnsuccessfulResponseException):
121130
error_info = DataSourceErrorInfo(
122131
kind=DataSourceErrorKind.ERROR_RESPONSE,
@@ -127,28 +136,28 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
127136
),
128137
)
129138

130-
fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true'
131139
if fallback:
132140
yield Update(
133141
state=DataSourceState.OFF,
134142
error=error_info,
135-
revert_to_fdv1=True
143+
revert_to_fdv1=True,
144+
environment_id=envid,
136145
)
137146
break
138147

139148
status_code = result.exception.status
140149
if is_http_error_recoverable(status_code):
141-
# TODO(fdv2): Add support for environment ID
142150
yield Update(
143151
state=DataSourceState.INTERRUPTED,
144152
error=error_info,
153+
environment_id=envid,
145154
)
146155
continue
147156

148-
# TODO(fdv2): Add support for environment ID
149157
yield Update(
150158
state=DataSourceState.OFF,
151159
error=error_info,
160+
environment_id=envid,
152161
)
153162
break
154163

@@ -159,19 +168,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
159168
message=result.error,
160169
)
161170

162-
# TODO(fdv2): Go has a designation here to handle JSON decoding separately.
163-
# TODO(fdv2): Add support for environment ID
164171
yield Update(
165172
state=DataSourceState.INTERRUPTED,
166173
error=error_info,
174+
environment_id=envid,
167175
)
168176
else:
169177
(change_set, headers) = result.value
170178
yield Update(
171179
state=DataSourceState.VALID,
172180
change_set=change_set,
173-
environment_id=headers.get("X-LD-EnvID"),
174-
revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true'
181+
environment_id=headers.get(_LD_ENVID_HEADER),
182+
revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
175183
)
176184

177185
if self._event.wait(self._poll_interval):
@@ -208,7 +216,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult:
208216

209217
(change_set, headers) = result.value
210218

211-
env_id = headers.get("X-LD-EnvID")
219+
env_id = headers.get(_LD_ENVID_HEADER)
212220
if not isinstance(env_id, str):
213221
env_id = None
214222

@@ -273,14 +281,14 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
273281
),
274282
retries=1,
275283
)
284+
headers = response.headers
276285

277286
if response.status >= 400:
278287
return _Fail(
279-
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
288+
f"HTTP error {response}", UnsuccessfulResponseException(response.status),
289+
headers=headers,
280290
)
281291

282-
headers = response.headers
283-
284292
if response.status == 304:
285293
return _Success(value=(ChangeSetBuilder.no_changes(), headers))
286294

@@ -304,6 +312,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
304312
return _Fail(
305313
error=changeset_result.error,
306314
exception=changeset_result.exception,
315+
headers=headers, # type: ignore
307316
)
308317

309318

@@ -438,7 +447,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
438447

439448
if response.status >= 400:
440449
return _Fail(
441-
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
450+
f"HTTP error {response}", UnsuccessfulResponseException(response.status)
442451
)
443452

444453
headers = response.headers

ldclient/impl/datasourcev2/streaming.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
)
3939
from ldclient.impl.http import HTTPFactory, _http_factory
4040
from ldclient.impl.util import (
41+
_LD_ENVID_HEADER,
42+
_LD_FD_FALLBACK_HEADER,
4143
http_error_message,
4244
is_http_error_recoverable,
4345
log
@@ -58,7 +60,6 @@
5860

5961
STREAMING_ENDPOINT = "/sdk/stream"
6062

61-
6263
SseClientBuilder = Callable[[Config, SelectorStore], SSEClient]
6364

6465

@@ -154,29 +155,35 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
154155
if action.error is None:
155156
continue
156157

157-
(update, should_continue) = self._handle_error(action.error)
158+
envid = action.headers.get(_LD_ENVID_HEADER) if action.headers is not None else None
159+
160+
(update, should_continue) = self._handle_error(action.error, envid)
158161
if update is not None:
159162
yield update
160163

161164
if not should_continue:
162165
break
163166
continue
164167

168+
envid = None
165169
if isinstance(action, Start) and action.headers is not None:
166-
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
170+
fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
171+
envid = action.headers.get(_LD_ENVID_HEADER)
172+
167173
if fallback:
168174
self._record_stream_init(True)
169175
yield Update(
170176
state=DataSourceState.OFF,
171-
revert_to_fdv1=True
177+
revert_to_fdv1=True,
178+
environment_id=envid,
172179
)
173180
break
174181

175182
if not isinstance(action, Event):
176183
continue
177184

178185
try:
179-
update = self._process_message(action, change_set_builder)
186+
update = self._process_message(action, change_set_builder, envid)
180187
if update is not None:
181188
self._record_stream_init(False)
182189
self._connection_attempt_start_time = None
@@ -187,7 +194,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
187194
)
188195
self._sse.interrupt()
189196

190-
(update, should_continue) = self._handle_error(e)
197+
(update, should_continue) = self._handle_error(e, envid)
191198
if update is not None:
192199
yield update
193200
if not should_continue:
@@ -204,7 +211,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
204211
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
205212
),
206213
revert_to_fdv1=False,
207-
environment_id=None, # TODO(sdk-1410)
214+
environment_id=envid,
208215
)
209216

210217
self._sse.close()
@@ -226,7 +233,7 @@ def _record_stream_init(self, failed: bool):
226233

227234
# pylint: disable=too-many-return-statements
228235
def _process_message(
229-
self, msg: Event, change_set_builder: ChangeSetBuilder
236+
self, msg: Event, change_set_builder: ChangeSetBuilder, envid: Optional[str]
230237
) -> Optional[Update]:
231238
"""
232239
Processes a single message from the SSE stream and returns an Update
@@ -247,7 +254,7 @@ def _process_message(
247254
change_set_builder.expect_changes()
248255
return Update(
249256
state=DataSourceState.VALID,
250-
environment_id=None, # TODO(sdk-1410)
257+
environment_id=envid,
251258
)
252259
return None
253260

@@ -293,13 +300,13 @@ def _process_message(
293300
return Update(
294301
state=DataSourceState.VALID,
295302
change_set=change_set,
296-
environment_id=None, # TODO(sdk-1410)
303+
environment_id=envid,
297304
)
298305

299306
log.info("Unexpected event found in stream: %s", msg.event)
300307
return None
301308

302-
def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
309+
def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optional[Update], bool]:
303310
"""
304311
This method handles errors that occur during the streaming process.
305312
@@ -328,7 +335,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
328335
DataSourceErrorKind.INVALID_DATA, 0, time(), str(error)
329336
),
330337
revert_to_fdv1=False,
331-
environment_id=None, # TODO(sdk-1410)
338+
environment_id=envid,
332339
)
333340
return (update, True)
334341

@@ -344,11 +351,15 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
344351
str(error),
345352
)
346353

347-
if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true':
354+
if envid is None and error.headers is not None:
355+
envid = error.headers.get(_LD_ENVID_HEADER)
356+
357+
if error.headers is not None and error.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
348358
update = Update(
349359
state=DataSourceState.OFF,
350360
error=error_info,
351-
revert_to_fdv1=True
361+
revert_to_fdv1=True,
362+
environment_id=envid,
352363
)
353364
return (update, False)
354365

@@ -364,7 +375,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
364375
),
365376
error=error_info,
366377
revert_to_fdv1=False,
367-
environment_id=None, # TODO(sdk-1410)
378+
environment_id=envid,
368379
)
369380

370381
if not is_recoverable:
@@ -386,7 +397,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
386397
DataSourceErrorKind.UNKNOWN, 0, time(), str(error)
387398
),
388399
revert_to_fdv1=False,
389-
environment_id=None, # TODO(sdk-1410)
400+
environment_id=envid,
390401
)
391402
# no stacktrace here because, for a typical connection error, it'll
392403
# just be a lengthy tour of urllib3 internals
@@ -411,5 +422,4 @@ def __init__(self, config: Config):
411422

412423
def build(self) -> StreamingDataSource:
413424
"""Builds a StreamingDataSource instance with the configured parameters."""
414-
# TODO(fdv2): Add in the other controls here.
415425
return StreamingDataSource(self._config)

ldclient/impl/datasystem/config.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -210,18 +210,3 @@ def persistent_store(store: FeatureStore) -> ConfigBuilder:
210210
although it will keep it up-to-date.
211211
"""
212212
return default().data_store(store, DataStoreMode.READ_WRITE)
213-
214-
215-
# TODO(fdv2): Implement these methods
216-
#
217-
# WithEndpoints configures the data system with custom endpoints for
218-
# LaunchDarkly's streaming and polling synchronizers. This method is not
219-
# necessary for most use-cases, but can be useful for testing or custom
220-
# network configurations.
221-
#
222-
# Any endpoint that is not specified (empty string) will be treated as the
223-
# default LaunchDarkly SaaS endpoint for that service.
224-
225-
# WithRelayProxyEndpoints configures the data system with a single endpoint
226-
# for LaunchDarkly's streaming and polling synchronizers. The endpoint
227-
# should be Relay Proxy's base URI, for example http://localhost:8123.

ldclient/impl/util.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import time
55
from dataclasses import dataclass
66
from datetime import timedelta
7-
from typing import Any, Dict, Generic, Optional, TypeVar, Union
7+
from typing import Any, Dict, Generic, Mapping, Optional, TypeVar, Union
88
from urllib.parse import urlparse, urlunparse
99

1010
from ldclient.impl.http import _base_headers
@@ -35,6 +35,9 @@ def timedelta_millis(delta: timedelta) -> float:
3535
# Compiled regex pattern for valid characters in application values and SDK keys
3636
_VALID_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9._-]")
3737

38+
_LD_ENVID_HEADER = 'X-LD-EnvID'
39+
_LD_FD_FALLBACK_HEADER = 'X-LD-FD-Fallback'
40+
3841

3942
def validate_application_info(application: dict, logger: logging.Logger) -> dict:
4043
return {
@@ -117,23 +120,18 @@ def __str__(self, *args, **kwargs):
117120

118121

119122
class UnsuccessfulResponseException(Exception):
120-
def __init__(self, status, headers={}):
123+
def __init__(self, status):
121124
super(UnsuccessfulResponseException, self).__init__("HTTP error %d" % status)
122125
self._status = status
123-
self._headers = headers
124126

125127
@property
126128
def status(self):
127129
return self._status
128130

129-
@property
130-
def headers(self):
131-
return self._headers
132-
133131

134132
def throw_if_unsuccessful_response(resp):
135133
if resp.status >= 400:
136-
raise UnsuccessfulResponseException(resp.status, resp.headers)
134+
raise UnsuccessfulResponseException(resp.status)
137135

138136

139137
def is_http_error_recoverable(status):
@@ -290,6 +288,7 @@ class _Success(Generic[T]):
290288
class _Fail(Generic[E]):
291289
error: E
292290
exception: Optional[Exception] = None
291+
headers: Optional[Mapping[str, Any]] = None
293292

294293

295294
# TODO(breaking): Replace the above Result class with an improved generic

ldclient/integrations/test_datav2.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,17 +551,21 @@ class TestDataV2:
551551
::
552552
553553
from ldclient.impl.datasystem import config as datasystem_config
554+
from ldclient.integrations.test_datav2 import TestDataV2
555+
554556
555557
td = TestDataV2.data_source()
556558
td.update(td.flag('flag-key-1').variation_for_all(True))
557559
558560
# Configure the data system with TestDataV2 as both initializer and synchronizer
559561
data_config = datasystem_config.custom()
560-
data_config.initializers([lambda: td.build_initializer()])
561-
data_config.synchronizers(lambda: td.build_synchronizer())
562+
data_config.initializers([td.build_initializer])
563+
data_config.synchronizers(td.build_synchronizer)
562564
563-
# TODO(fdv2): This will be integrated with the main Config in a future version
564-
# For now, TestDataV2 is primarily intended for unit testing scenarios
565+
config = Config(
566+
sdk_key,
567+
datasystem_config=data_config.build(),
568+
)
565569
566570
# flags can be updated at any time:
567571
td.update(td.flag('flag-key-1').

0 commit comments

Comments
 (0)