Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions ldclient/impl/datasourcev2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,77 @@
"""
This module houses FDv2 types and implementations of synchronizers and
initializers for the datasystem.

All types and implementations in this module are considered internal
and are not part of the public API of the LaunchDarkly Python SDK.
They are subject to change without notice and should not be used directly
by users of the SDK.

You have been warned.
"""

from abc import abstractmethod
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the shared types to this top level. In my next bit of work, I'm going to be re-arranging the polling stuff, so just focus on streaming for now.

from dataclasses import dataclass
from typing import Generator, Iterable, Mapping, Optional, Protocol, Tuple

from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector
from ldclient.impl.util import _Result
from ldclient.interfaces import DataSourceErrorInfo, DataSourceState

PollingResult = _Result[Tuple[ChangeSet, Mapping], str]


class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
"""
PollingRequester allows PollingDataSource to delegate fetching data to
another component.

This is useful for testing the PollingDataSource without needing to set up
a test HTTP server.
"""

@abstractmethod
def fetch(self, selector: Optional[Selector]) -> PollingResult:
"""
Fetches the data for the given selector.
Returns a Result containing a tuple of ChangeSet and any request headers,
or an error if the data could not be retrieved.
"""
raise NotImplementedError


@dataclass(frozen=True)
class Update:
"""
Update represents the results of a synchronizer's ongoing sync
method.
"""

state: DataSourceState
change_set: Optional[ChangeSet] = None
error: Optional[DataSourceErrorInfo] = None
revert_to_fdv1: bool = False
environment_id: Optional[str] = None


class Synchronizer(Protocol): # pylint: disable=too-few-public-methods
"""
Synchronizer represents a component capable of synchronizing data from an external
data source, such as a streaming or polling API.

It is responsible for yielding Update objects that represent the current state
of the data source, including any changes that have occurred since the last
synchronization.
"""

@abstractmethod
def sync(self) -> Generator[Update, None, None]:
"""
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
raise NotImplementedError


__all__: list[str] = []
29 changes: 4 additions & 25 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""
Default implementation of the polling synchronizer and initializer.
This module contains the implementations of a polling synchronizer and
initializer, along with any required supporting classes and protocols.
"""

import json
from abc import abstractmethod
from collections import namedtuple
from collections.abc import Mapping
from typing import Optional, Protocol, Tuple
from typing import Iterable, Optional
from urllib import parse

import urllib3

from ldclient.impl.datasourcev2 import PollingRequester, PollingResult, Update
from ldclient.impl.datasystem.protocolv2 import (
Basis,
ChangeSet,
Expand Down Expand Up @@ -38,27 +38,6 @@

POLLING_ENDPOINT = "/sdk/poll"

PollingResult = _Result[Tuple[ChangeSet, Mapping], str]


class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
"""
PollingRequester allows PollingDataSource to delegate fetching data to
another component.

This is useful for testing the PollingDataSource without needing to set up
a test HTTP server.
"""

@abstractmethod
def fetch(self, selector: Optional[Selector]) -> PollingResult:
"""
Fetches the data for the given selector.
Returns a Result containing a tuple of ChangeSet and any request headers,
or an error if the data could not be retrieved.
"""
raise NotImplementedError


CacheEntry = namedtuple("CacheEntry", ["data", "etag"])

Expand Down
Loading