Skip to content

Commit d8e5039

Browse files
authored
feat: support topic sequence page (#492)
* feat: add support for topic sequence number and sequence page * chore: add tests
1 parent 9793c0c commit d8e5039

File tree

8 files changed

+131
-12
lines changed

8 files changed

+131
-12
lines changed

src/momento/internal/aio/_scs_pubsub_client.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,22 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) ->
7676
self._log_request_error("publish", e)
7777
return TopicPublish.Error(convert_error(e, Service.TOPICS))
7878

79-
async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
79+
async def subscribe(
80+
self,
81+
cache_name: str,
82+
topic_name: str,
83+
resume_at_topic_sequence_number: int = 0,
84+
resume_at_topic_sequence_page: int = 0,
85+
) -> TopicSubscribeResponse:
8086
try:
8187
_validate_cache_name(cache_name)
8288
_validate_topic_name(topic_name)
8389

8490
request = pubsub_pb._SubscriptionRequest(
8591
cache_name=cache_name,
8692
topic=topic_name,
87-
# TODO: resume_at_topic_sequence_number
93+
resume_at_topic_sequence_number=resume_at_topic_sequence_number,
94+
sequence_page=resume_at_topic_sequence_page,
8895
)
8996
stream = self._get_stream_stub().Subscribe( # type: ignore[misc]
9097
request,

src/momento/internal/synchronous/_scs_pubsub_client.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,22 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic
7676
self._log_request_error("publish", e)
7777
return TopicPublish.Error(convert_error(e, Service.TOPICS))
7878

79-
def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
79+
def subscribe(
80+
self,
81+
cache_name: str,
82+
topic_name: str,
83+
resume_at_topic_sequence_number: int = 0,
84+
resume_at_topic_sequence_page: int = 0,
85+
) -> TopicSubscribeResponse:
8086
try:
8187
_validate_cache_name(cache_name)
8288
_validate_topic_name(topic_name)
8389

8490
request = pubsub_pb._SubscriptionRequest(
8591
cache_name=cache_name,
8692
topic=topic_name,
87-
# TODO: resume_at_topic_sequence_number
93+
resume_at_topic_sequence_number=resume_at_topic_sequence_number,
94+
sequence_page=resume_at_topic_sequence_page,
8895
)
8996
stream = self._get_stream_stub().Subscribe( # type: ignore[misc]
9097
request,

src/momento/responses/pubsub/subscribe.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,23 @@ class SubscriptionBase(TopicSubscribeResponse):
3737

3838
_logger = logs.logger
3939
_last_known_sequence_number: Optional[int] = None
40+
_last_known_sequence_page: Optional[int] = None
4041

4142
def _process_result(self, result: cachepubsub_pb2._SubscriptionItem) -> Optional[TopicSubscriptionItemResponse]:
4243
msg_type: str = result.WhichOneof("kind")
4344
if msg_type == "item":
4445
self._last_known_sequence_number = result.item.topic_sequence_number
46+
self._last_known_sequence_page = result.item.sequence_page
4547
value = result.item.value
4648
value_type: str = value.WhichOneof("kind")
4749
if value_type == "text":
48-
return TopicSubscriptionItem.Text(value.text)
50+
return TopicSubscriptionItem.Text(
51+
value.text, self._last_known_sequence_number, self._last_known_sequence_page
52+
)
4953
elif value_type == "binary":
50-
return TopicSubscriptionItem.Binary(value.binary)
54+
return TopicSubscriptionItem.Binary(
55+
value.binary, self._last_known_sequence_number, self._last_known_sequence_page
56+
)
5157
else:
5258
err = SdkException(
5359
f"Could not find matching TopicSubscriptionItem response for type: {value_type}",

src/momento/responses/pubsub/subscription_item.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,16 @@ class Text(TopicSubscriptionItemResponse):
3737
"""Indicates the request was successful and value will be returned as a string."""
3838

3939
value: str
40+
sequence_number: int
41+
sequence_page: int
4042

4143
@dataclass
4244
class Binary(TopicSubscriptionItemResponse):
4345
"""Indicates the request was successful and value will be returned as bytes."""
4446

4547
value: bytes
48+
sequence_number: int
49+
sequence_page: int
4650

4751
class Error(TopicSubscriptionItemResponse, ErrorResponseMixin):
4852
"""Contains information about an error returned from a request.

src/momento/topic_client.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,29 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic
8181
"""
8282
return self._pubsub_client.publish(cache_name, topic_name, value)
8383

84-
def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
84+
def subscribe(
85+
self,
86+
cache_name: str,
87+
topic_name: str,
88+
resume_at_topic_sequence_number: int = 0,
89+
resume_at_topic_sequence_page: int = 0,
90+
) -> TopicSubscribeResponse:
8591
"""Subscribes to a topic.
8692
8793
Args:
8894
cache_name (str): The cache to subscribe to.
8995
topic_name (str): The topic to subscribe to.
96+
resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the
97+
latest messages.
98+
resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the
99+
latest messages.
90100
91101
Returns:
92102
TopicSubscribeResponse
93103
"""
94-
return self._pubsub_client.subscribe(cache_name, topic_name)
104+
return self._pubsub_client.subscribe(
105+
cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page
106+
)
95107

96108
def close(self) -> None:
97109
self._pubsub_client.close()

src/momento/topic_client_async.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,29 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) ->
8181
"""
8282
return await self._pubsub_client.publish(cache_name, topic_name, value)
8383

84-
async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
84+
async def subscribe(
85+
self,
86+
cache_name: str,
87+
topic_name: str,
88+
resume_at_topic_sequence_number: int = 0,
89+
resume_at_topic_sequence_page: int = 0,
90+
) -> TopicSubscribeResponse:
8591
"""Subscribes to a topic.
8692
8793
Args:
8894
cache_name (str): The cache to subscribe to.
8995
topic_name (str): The topic to subscribe to.
96+
resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the
97+
latest messages.
98+
resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the
99+
latest messages.
90100
91101
Returns:
92102
TopicSubscribeResponse
93103
"""
94-
return await self._pubsub_client.subscribe(cache_name, topic_name)
104+
return await self._pubsub_client.subscribe(
105+
cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page
106+
)
95107

96108
async def close(self) -> None:
97109
await self._pubsub_client.close()

tests/momento/topic_client/test_topics.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,45 @@ def subscribe_happy_path_string(client: CacheClient, topic_client: TopicClient,
5353
topic = uuid_str()
5454
value = uuid_str()
5555

56+
_ = topic_client.publish(cache_name, topic_name=topic, value=value)
57+
5658
subscribe_response = topic_client.subscribe(cache_name, topic_name=topic)
5759
assert isinstance(subscribe_response, TopicSubscribe.Subscription)
5860

61+
item_response = next(subscribe_response)
62+
assert isinstance(item_response, TopicSubscriptionItem.Text)
63+
assert item_response.value == value
64+
65+
def subscribe_happy_path_string_resume_at_sequence(
66+
client: CacheClient, topic_client: TopicClient, cache_name: str
67+
) -> None:
68+
topic = uuid_str()
69+
value = uuid_str()
70+
71+
_ = topic_client.publish(cache_name, topic_name=topic, value="foo")
72+
_ = topic_client.publish(cache_name, topic_name=topic, value=value)
73+
_ = topic_client.publish(cache_name, topic_name=topic, value="bar")
74+
75+
subscribe_response = topic_client.subscribe(cache_name, topic_name=topic, resume_at_topic_sequence_number=2)
76+
assert isinstance(subscribe_response, TopicSubscribe.Subscription)
77+
78+
item_response = next(subscribe_response)
79+
assert isinstance(item_response, TopicSubscriptionItem.Text)
80+
assert item_response.value == value
81+
82+
def subscribe_happy_path_string_resume_at_invalid_sequence(
83+
client: CacheClient, topic_client: TopicClient, cache_name: str
84+
) -> None:
85+
topic = uuid_str()
86+
value = uuid_str()
87+
5988
_ = topic_client.publish(cache_name, topic_name=topic, value=value)
6089

90+
subscribe_response = topic_client.subscribe(
91+
cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435
92+
)
93+
assert isinstance(subscribe_response, TopicSubscribe.Subscription)
94+
6195
item_response = next(subscribe_response)
6296
assert isinstance(item_response, TopicSubscriptionItem.Text)
6397
assert item_response.value == value

tests/momento/topic_client/test_topics_async.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,50 @@ async def subscribe_happy_path_string(
5757
topic = uuid_str()
5858
value = uuid_str()
5959

60+
_ = await topic_client_async.publish(cache_name, topic_name=topic, value=value)
61+
6062
subscribe_response = await topic_client_async.subscribe(cache_name, topic_name=topic)
6163
assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync)
6264

6365
item_task = subscribe_response.__anext__()
64-
publish_response = await topic_client_async.publish(cache_name, topic_name=topic, value=value)
66+
item_response = await item_task
67+
assert isinstance(item_response, TopicSubscriptionItem.Text)
68+
assert item_response.value == value
6569

66-
print(publish_response)
70+
async def subscribe_happy_path_string_resume_at_sequence(
71+
client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str
72+
) -> None:
73+
topic = uuid_str()
74+
value = uuid_str()
75+
76+
_ = await topic_client_async.publish(cache_name, topic_name=topic, value="foo")
77+
_ = await topic_client_async.publish(cache_name, topic_name=topic, value=value)
78+
_ = await topic_client_async.publish(cache_name, topic_name=topic, value="bar")
79+
80+
subscribe_response = await topic_client_async.subscribe(
81+
cache_name, topic_name=topic, resume_at_topic_sequence_number=2
82+
)
83+
assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync)
84+
85+
item_task = subscribe_response.__anext__()
86+
item_response = await item_task
87+
assert isinstance(item_response, TopicSubscriptionItem.Text)
88+
assert item_response.value == value
89+
90+
async def subscribe_happy_path_string_resume_at_invalid_sequence(
91+
client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str
92+
) -> None:
93+
topic = uuid_str()
94+
value = uuid_str()
95+
96+
_ = await topic_client_async.publish(cache_name, topic_name=topic, value=value)
97+
98+
subscribe_response = await topic_client_async.subscribe(
99+
cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435
100+
)
101+
assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync)
102+
103+
item_task = subscribe_response.__anext__()
67104
item_response = await item_task
68105
assert isinstance(item_response, TopicSubscriptionItem.Text)
69106
assert item_response.value == value

0 commit comments

Comments
 (0)