Skip to content

Commit af0f248

Browse files
2 parents 2bca68e + 3c97c40 commit af0f248

19 files changed

+292
-18
lines changed

src/momento/config/configurations.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def v1() -> Configurations.Laptop:
3535
This configuration is guaranteed not to change in future releases of the Momento Python SDK.
3636
"""
3737
return Configurations.Laptop(
38-
StaticTransportStrategy(StaticGrpcConfiguration(timedelta(seconds=15))),
38+
StaticTransportStrategy(StaticGrpcConfiguration(deadline=timedelta(seconds=15))),
3939
FixedCountRetryStrategy(max_attempts=3),
4040
)
4141

@@ -49,7 +49,14 @@ def latest() -> Configurations.Lambda:
4949
This configuration will be updated every time there is a new version of the Lambda configuration.
5050
"""
5151
return Configurations.Lambda(
52-
StaticTransportStrategy(StaticGrpcConfiguration(timedelta(milliseconds=1100))),
52+
StaticTransportStrategy(
53+
StaticGrpcConfiguration(
54+
deadline=timedelta(milliseconds=1100),
55+
keepalive_permit_without_calls=False,
56+
keepalive_time=None,
57+
keepalive_timeout=None,
58+
)
59+
),
5360
FixedCountRetryStrategy(max_attempts=3),
5461
)
5562

@@ -81,7 +88,7 @@ def v1() -> Configurations.InRegion.Default:
8188
This configuration is guaranteed not to change in future releases of the Momento Python SDK.
8289
"""
8390
return Configurations.InRegion.Default(
84-
StaticTransportStrategy(StaticGrpcConfiguration(timedelta(milliseconds=1100))),
91+
StaticTransportStrategy(StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100))),
8592
FixedCountRetryStrategy(max_attempts=3),
8693
)
8794

@@ -108,6 +115,6 @@ def v1() -> Configurations.InRegion.LowLatency:
108115
This configuration is guaranteed not to change in future releases of the Momento Python SDK.
109116
"""
110117
return Configurations.InRegion.LowLatency(
111-
StaticTransportStrategy(StaticGrpcConfiguration(timedelta(milliseconds=500))),
118+
StaticTransportStrategy(StaticGrpcConfiguration(deadline=timedelta(milliseconds=500))),
112119
FixedCountRetryStrategy(max_attempts=3),
113120
)

src/momento/config/transport/grpc_configuration.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,23 @@ def with_root_certificates_pem(self, root_certificates_pem_path: Path) -> GrpcCo
2222
@abstractmethod
2323
def get_root_certificates_pem(self) -> Optional[bytes]:
2424
pass
25+
26+
@abstractmethod
27+
def get_max_send_message_length(self) -> Optional[int]:
28+
pass
29+
30+
@abstractmethod
31+
def get_max_receive_message_length(self) -> Optional[int]:
32+
pass
33+
34+
@abstractmethod
35+
def get_keepalive_permit_without_calls(self) -> Optional[int]:
36+
pass
37+
38+
@abstractmethod
39+
def get_keepalive_time(self) -> Optional[timedelta]:
40+
pass
41+
42+
@abstractmethod
43+
def get_keepalive_timeout(self) -> Optional[timedelta]:
44+
pass

src/momento/config/transport/transport_strategy.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,23 @@ def with_client_timeout(self, client_timeout: timedelta) -> TransportStrategy:
4848

4949

5050
class StaticGrpcConfiguration(GrpcConfiguration):
51-
def __init__(self, deadline: timedelta, root_certificates_pem: Optional[bytes] = None):
51+
def __init__(
52+
self,
53+
deadline: timedelta,
54+
root_certificates_pem: Optional[bytes] = None,
55+
max_send_message_length: Optional[int] = None,
56+
max_receive_message_length: Optional[int] = None,
57+
keepalive_permit_without_calls: Optional[bool] = True,
58+
keepalive_time: Optional[timedelta] = timedelta(milliseconds=5000),
59+
keepalive_timeout: Optional[timedelta] = timedelta(milliseconds=1000),
60+
):
5261
self._deadline = deadline
5362
self._root_certificates_pem = root_certificates_pem
63+
self._max_send_message_length = max_send_message_length
64+
self._max_receive_message_length = max_receive_message_length
65+
self._keepalive_permit_without_calls = keepalive_permit_without_calls
66+
self._keepalive_time = keepalive_time
67+
self._keepalive_timeout = keepalive_timeout
5468

5569
def get_deadline(self) -> timedelta:
5670
return self._deadline
@@ -71,6 +85,21 @@ def with_root_certificates_pem(self, root_certificates_pem_path: Path) -> GrpcCo
7185
def get_root_certificates_pem(self) -> Optional[bytes]:
7286
return self._root_certificates_pem
7387

88+
def get_max_send_message_length(self) -> Optional[int]:
89+
return self._max_send_message_length
90+
91+
def get_max_receive_message_length(self) -> Optional[int]:
92+
return self._max_receive_message_length
93+
94+
def get_keepalive_permit_without_calls(self) -> Optional[int]:
95+
return int(self._keepalive_permit_without_calls) if self._keepalive_permit_without_calls is not None else None
96+
97+
def get_keepalive_time(self) -> Optional[timedelta]:
98+
return self._keepalive_time
99+
100+
def get_keepalive_timeout(self) -> Optional[timedelta]:
101+
return self._keepalive_timeout
102+
74103

75104
class StaticTransportStrategy(TransportStrategy):
76105
def __init__(self, grpc_configuration: GrpcConfiguration):

src/momento/config/vector_index_configurations.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ def latest() -> VectorIndexConfigurations.Default:
2525
This configuration will be updated every time there is a new version of the laptop configuration.
2626
"""
2727
return VectorIndexConfigurations.Default(
28-
# This is high to account for time-intensive adds.
29-
StaticTransportStrategy(StaticGrpcConfiguration(timedelta(seconds=120)))
28+
# The deadline is high to account for time-intensive adds.
29+
StaticTransportStrategy(StaticGrpcConfiguration(deadline=timedelta(seconds=120)))
3030
)

src/momento/internal/_utilities/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
_validate_ttl,
1515
)
1616
from ._momento_version import momento_version
17+
from ._time import _timedelta_to_ms
1718
from ._vector_index_validation import (
1819
_validate_index_name,
1920
_validate_num_dimensions,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from __future__ import annotations
2+
3+
from typing import Sequence, Tuple, Union
4+
5+
from momento.config.transport.grpc_configuration import GrpcConfiguration
6+
from momento.config.transport.transport_strategy import StaticGrpcConfiguration
7+
from momento.internal._utilities import _timedelta_to_ms
8+
9+
DEFAULT_MAX_MESSAGE_SIZE = 5_243_000 # bytes
10+
11+
ChannelArguments = Sequence[Tuple[str, Union[int, None]]]
12+
13+
14+
def grpc_data_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments:
15+
"""Create gRPC channel options from a GrpcConfiguration.
16+
17+
Args:
18+
grpc_config (GrpcConfiguration): the gRPC configuration.
19+
20+
Returns:
21+
grpc.aio.ChannelArgumentType: a list of gRPC channel options as key-value tuples.
22+
"""
23+
channel_options = []
24+
25+
max_send_length = grpc_config.get_max_send_message_length()
26+
channel_options.append(
27+
("grpc.max_send_message_length", max_send_length if max_send_length is not None else DEFAULT_MAX_MESSAGE_SIZE)
28+
)
29+
30+
max_receive_length = grpc_config.get_max_receive_message_length()
31+
channel_options.append(
32+
(
33+
"grpc.max_receive_message_length",
34+
max_receive_length if max_receive_length is not None else DEFAULT_MAX_MESSAGE_SIZE,
35+
)
36+
)
37+
38+
keepalive_permit = grpc_config.get_keepalive_permit_without_calls()
39+
if keepalive_permit is not None:
40+
channel_options.append(("grpc.keepalive_permit_without_calls", keepalive_permit))
41+
42+
keepalive_time = grpc_config.get_keepalive_time()
43+
if keepalive_time is not None:
44+
channel_options.append(("grpc.keepalive_time_ms", _timedelta_to_ms(keepalive_time)))
45+
46+
keepalive_timeout = grpc_config.get_keepalive_timeout()
47+
if keepalive_timeout is not None:
48+
channel_options.append(("grpc.keepalive_timeout_ms", _timedelta_to_ms(keepalive_timeout)))
49+
50+
return channel_options
51+
52+
53+
def grpc_control_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments:
54+
"""Create gRPC channel options from a GrpcConfiguration, but disable keepalives.
55+
56+
Args:
57+
grpc_config (GrpcConfiguration): the gRPC configuration.
58+
59+
Returns:
60+
grpc.aio.ChannelArgumentType: a list of gRPC channel options as key-value tuples.
61+
"""
62+
# Override the keepalive options to disable keepalives
63+
control_grpc_config = StaticGrpcConfiguration(
64+
deadline=grpc_config.get_deadline(),
65+
root_certificates_pem=grpc_config.get_root_certificates_pem(),
66+
max_send_message_length=grpc_config.get_max_send_message_length(),
67+
max_receive_message_length=grpc_config.get_max_receive_message_length(),
68+
keepalive_permit_without_calls=None,
69+
keepalive_time=None,
70+
keepalive_timeout=None,
71+
)
72+
return grpc_data_channel_options_from_grpc_config(control_grpc_config)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from datetime import timedelta
2+
3+
4+
def _timedelta_to_ms(delta: timedelta) -> int:
5+
"""Expresses a timedelta as milliseconds.
6+
7+
Note: truncates the microseconds.
8+
9+
Args:
10+
delta (timedelta): The timedelta to convert.
11+
12+
Returns:
13+
int: The total duration of the timedelta in milliseconds.
14+
"""
15+
return int(delta.total_seconds() * 1000)

src/momento/internal/aio/_scs_data_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
_gen_dictionary_items_as_bytes,
1717
_gen_list_as_bytes,
1818
_gen_set_input_as_bytes,
19+
_timedelta_to_ms,
1920
_validate_cache_name,
2021
_validate_dictionary_name,
2122
_validate_list_name,
@@ -154,7 +155,7 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
154155
self._endpoint = endpoint
155156

156157
default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline()
157-
self._default_deadline_seconds = int(default_deadline.total_seconds())
158+
self._default_deadline_seconds = default_deadline.total_seconds()
158159

159160
self._grpc_manager = _DataGrpcManager(configuration, credential_provider)
160161
_validate_ttl(default_ttl)
@@ -1107,7 +1108,7 @@ def _ttl_or_default_milliseconds(self, ttl: Optional[timedelta]) -> int:
11071108
if ttl is not None:
11081109
which_ttl = ttl
11091110

1110-
return int(which_ttl.total_seconds() * 1000)
1111+
return _timedelta_to_ms(which_ttl)
11111112

11121113
def _build_stub(self) -> cache_grpc.ScsStub:
11131114
return self._grpc_manager.async_stub()

src/momento/internal/aio/_scs_grpc_manager.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
from datetime import timedelta
45
from typing import Optional
56

67
import grpc
@@ -10,10 +11,15 @@
1011

1112
from momento.auth import CredentialProvider
1213
from momento.config import Configuration, TopicConfiguration
14+
from momento.config.transport.transport_strategy import StaticGrpcConfiguration
1315
from momento.internal._utilities import momento_version
1416
from momento.internal._utilities._channel_credentials import (
1517
channel_credentials_from_root_certs_or_default,
1618
)
19+
from momento.internal._utilities._grpc_channel_options import (
20+
grpc_control_channel_options_from_grpc_config,
21+
grpc_data_channel_options_from_grpc_config,
22+
)
1723
from momento.retry import RetryStrategy
1824

1925
from ... import logs
@@ -35,6 +41,9 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
3541
target=credential_provider.control_endpoint,
3642
credentials=channel_credentials_from_root_certs_or_default(configuration),
3743
interceptors=_interceptors(credential_provider.auth_token, configuration.get_retry_strategy()),
44+
options=grpc_control_channel_options_from_grpc_config(
45+
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
46+
),
3847
)
3948

4049
async def close(self) -> None:
@@ -64,11 +73,14 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
6473
# https://grpc.github.io/grpc/python/grpc.html
6574
# https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
6675
# https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h#L140
67-
options=[
68-
# ('grpc.max_concurrent_streams', 1000),
69-
# ('grpc.use_local_subchannel_pool', 1),
70-
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
71-
],
76+
# options=[
77+
# ('grpc.max_concurrent_streams', 1000),
78+
# ('grpc.use_local_subchannel_pool', 1),
79+
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
80+
# ],
81+
options=grpc_data_channel_options_from_grpc_config(
82+
configuration.get_transport_strategy().get_grpc_configuration()
83+
),
7284
)
7385

7486
async def eagerly_connect(self, timeout_seconds: float) -> None:
@@ -79,6 +91,9 @@ async def eagerly_connect(self, timeout_seconds: float) -> None:
7991
await asyncio.wait_for(self.wait_for_ready(), timeout_seconds)
8092
except Exception as error:
8193
self._logger.debug(f"Failed to connect to the server within the given timeout. {error}")
94+
raise RuntimeError(
95+
f"Failed to connect to Momento's server within given eager connection timeout {error}"
96+
) from error
8297

8398
async def wait_for_ready(self) -> None:
8499
latest_state = self._secure_channel.get_state(True) # try_to_connect
@@ -117,10 +132,14 @@ class _PubsubGrpcManager:
117132
version = momento_version
118133

119134
def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
135+
# NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients.
136+
grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100))
137+
120138
self._secure_channel = grpc.aio.secure_channel(
121139
target=credential_provider.cache_endpoint,
122140
credentials=grpc.ssl_channel_credentials(),
123141
interceptors=_interceptors(credential_provider.auth_token, None),
142+
options=grpc_data_channel_options_from_grpc_config(grpc_config),
124143
)
125144

126145
async def close(self) -> None:
@@ -136,10 +155,14 @@ class _PubsubGrpcStreamManager:
136155
version = momento_version
137156

138157
def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
158+
# NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients.
159+
grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100))
160+
139161
self._secure_channel = grpc.aio.secure_channel(
140162
target=credential_provider.cache_endpoint,
141163
credentials=grpc.ssl_channel_credentials(),
142164
interceptors=_stream_interceptors(credential_provider.auth_token),
165+
options=grpc_data_channel_options_from_grpc_config(grpc_config),
143166
)
144167

145168
async def close(self) -> None:

src/momento/internal/aio/_vector_index_data_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(self, configuration: VectorIndexConfiguration, credential_provider:
4545
self._endpoint = endpoint
4646

4747
default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline()
48-
self._default_deadline_seconds = int(default_deadline.total_seconds())
48+
self._default_deadline_seconds = default_deadline.total_seconds()
4949

5050
self._grpc_manager = _VectorIndexDataGrpcManager(configuration, credential_provider)
5151

0 commit comments

Comments
 (0)