Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ invoke==2.2.0
mock
packaging>=20.4
pytest
pytest-asyncio>=0.23.0
pytest-asyncio>=0.24.0
pytest-cov
pytest-profiling==1.8.1
pytest-timeout
Expand Down
2 changes: 1 addition & 1 deletion redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def int_or_str(value):
return value


__version__ = "6.2.0"
__version__ = "6.4.0"
VERSION = tuple(map(int_or_str, __version__.split(".")))


Expand Down
263 changes: 133 additions & 130 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ async def _execute_command(
# Reset the counter
self.reinitialize_counter = 0
else:
self.nodes_manager._moved_exception = e
self.nodes_manager.move_slot(e)
moved = True
except AskError as e:
redirect_addr = get_node_name(host=e.host, port=e.port)
Expand Down Expand Up @@ -1266,12 +1266,13 @@ async def _mock(self, error: RedisError):
class NodesManager:
__slots__ = (
"_dynamic_startup_nodes",
"_moved_exception",
"_event_dispatcher",
"connection_kwargs",
"default_node",
"nodes_cache",
"_epoch",
"read_load_balancer",
"_initialize_lock",
"require_full_coverage",
"slots_cache",
"startup_nodes",
Expand All @@ -1295,10 +1296,11 @@ def __init__(
self.default_node: "ClusterNode" = None
self.nodes_cache: Dict[str, "ClusterNode"] = {}
self.slots_cache: Dict[int, List["ClusterNode"]] = {}
self._epoch: int = 0
self.read_load_balancer = LoadBalancer()
self._initialize_lock: asyncio.Lock = asyncio.Lock()

self._dynamic_startup_nodes: bool = dynamic_startup_nodes
self._moved_exception: MovedError = None
if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
else:
Expand Down Expand Up @@ -1340,11 +1342,7 @@ def set_nodes(
task = asyncio.create_task(old[name].disconnect()) # noqa
old[name] = node

def update_moved_exception(self, exception):
self._moved_exception = exception

def _update_moved_slots(self) -> None:
e = self._moved_exception
def move_slot(self, e: AskError | MovedError):
redirected_node = self.get_node(host=e.host, port=e.port)
if redirected_node:
# The node already exists
Expand Down Expand Up @@ -1378,18 +1376,13 @@ def _update_moved_slots(self) -> None:
# shard. We need to remove all current nodes from the slot's list
# (including replications) and add just the new node.
self.slots_cache[e.slot_id] = [redirected_node]
# Reset moved_exception
self._moved_exception = None

def get_node_from_slot(
self,
slot: int,
read_from_replicas: bool = False,
load_balancing_strategy=None,
) -> "ClusterNode":
if self._moved_exception:
self._update_moved_slots()

if read_from_replicas is True and load_balancing_strategy is None:
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN

Expand Down Expand Up @@ -1423,135 +1416,147 @@ async def initialize(self) -> None:
startup_nodes_reachable = False
fully_covered = False
exception = None
# Convert to tuple to prevent RuntimeError if self.startup_nodes
# is modified during iteration
for startup_node in tuple(self.startup_nodes.values()):
try:
# Make sure cluster mode is enabled on this node
epoch = self._epoch

async with self._initialize_lock:
if self._epoch != epoch:
# another initialize call has already reinitialized the
# nodes since we started waiting for the lock;
# we don't need to do it again.
return

# Convert to tuple to prevent RuntimeError if self.startup_nodes
# is modified during iteration
for startup_node in tuple(self.startup_nodes.values()):
try:
self._event_dispatcher.dispatch(
AfterAsyncClusterInstantiationEvent(
self.nodes_cache,
self.connection_kwargs.get("credential_provider", None),
# Make sure cluster mode is enabled on this node
try:
self._event_dispatcher.dispatch(
AfterAsyncClusterInstantiationEvent(
self.nodes_cache,
self.connection_kwargs.get("credential_provider", None),
)
)
)
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
except ResponseError:
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
startup_nodes_reachable = True
except Exception as e:
# Try the next startup node.
# The exception is saved and raised only if we have no more nodes.
exception = e
continue

# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
# where each node contains the following list: [IP, port, node_id]
# Therefore, cluster_slots[0][2][0] will be the IP address of the
# primary node of the first slot section.
# If there's only one server in the cluster, its ``host`` is ''
# Fix it to the host in startup_nodes
if (
len(cluster_slots) == 1
and not cluster_slots[0][2][0]
and len(self.startup_nodes) == 1
):
cluster_slots[0][2][0] = startup_node.host

for slot in cluster_slots:
for i in range(2, len(slot)):
slot[i] = [str_if_bytes(val) for val in slot[i]]
primary_node = slot[2]
host = primary_node[0]
if host == "":
host = startup_node.host
port = int(primary_node[1])
host, port = self.remap_host_port(host, port)

nodes_for_slot = []

target_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_node:
target_node = ClusterNode(
host, port, PRIMARY, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_node.name] = target_node
nodes_for_slot.append(target_node)

replica_nodes = slot[3:]
for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
cluster_slots = await startup_node.execute_command(
"CLUSTER SLOTS"
)
except ResponseError:
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
startup_nodes_reachable = True
except Exception as e:
# Try the next startup node.
# The exception is saved and raised only if we have no more nodes.
exception = e
continue

# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
# where each node contains the following list: [IP, port, node_id]
# Therefore, cluster_slots[0][2][0] will be the IP address of the
# primary node of the first slot section.
# If there's only one server in the cluster, its ``host`` is ''
# Fix it to the host in startup_nodes
if (
len(cluster_slots) == 1
and not cluster_slots[0][2][0]
and len(self.startup_nodes) == 1
):
cluster_slots[0][2][0] = startup_node.host

for slot in cluster_slots:
for i in range(2, len(slot)):
slot[i] = [str_if_bytes(val) for val in slot[i]]
primary_node = slot[2]
host = primary_node[0]
if host == "":
host = startup_node.host
port = int(primary_node[1])
host, port = self.remap_host_port(host, port)

target_replica_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
nodes_for_slot = []

target_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_node:
target_node = ClusterNode(
host, port, PRIMARY, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = target_replica_node
nodes_for_slot.append(target_replica_node)
tmp_nodes_cache[target_node.name] = target_node
nodes_for_slot.append(target_node)

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = nodes_for_slot
else:
# Validate that 2 nodes want to use the same slot cache
# setup
tmp_slot = tmp_slots[i][0]
if tmp_slot.name != target_node.name:
disagreements.append(
f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
)
replica_nodes = slot[3:]
for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
host, port = self.remap_host_port(host, port)

if len(disagreements) > 5:
raise RedisClusterException(
f"startup_nodes could not agree on a valid "
f"slots cache: {', '.join(disagreements)}"
target_replica_node = tmp_nodes_cache.get(
get_node_name(host, port)
)
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = target_replica_node
nodes_for_slot.append(target_replica_node)

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = nodes_for_slot
else:
# Validate that 2 nodes want to use the same slot cache
# setup
tmp_slot = tmp_slots[i][0]
if tmp_slot.name != target_node.name:
disagreements.append(
f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
)

# Validate if all slots are covered or if we should try next startup node
fully_covered = True
for i in range(REDIS_CLUSTER_HASH_SLOTS):
if i not in tmp_slots:
fully_covered = False
if len(disagreements) > 5:
raise RedisClusterException(
f"startup_nodes could not agree on a valid "
f"slots cache: {', '.join(disagreements)}"
)

# Validate if all slots are covered or if we should try next startup node
fully_covered = True
for i in range(REDIS_CLUSTER_HASH_SLOTS):
if i not in tmp_slots:
fully_covered = False
break
if fully_covered:
break
if fully_covered:
break

if not startup_nodes_reachable:
raise RedisClusterException(
f"Redis Cluster cannot be connected. Please provide at least "
f"one reachable node: {str(exception)}"
) from exception

# Check if the slots are not fully covered
if not fully_covered and self.require_full_coverage:
# Despite the requirement that the slots be covered, there
# isn't a full coverage
raise RedisClusterException(
f"All slots are not covered after query all startup_nodes. "
f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"covered..."
)
if not startup_nodes_reachable:
raise RedisClusterException(
f"Redis Cluster cannot be connected. Please provide at least "
f"one reachable node: {str(exception)}"
) from exception

# Check if the slots are not fully covered
if not fully_covered and self.require_full_coverage:
# Despite the requirement that the slots be covered, there
# isn't a full coverage
raise RedisClusterException(
f"All slots are not covered after query all startup_nodes. "
f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"covered..."
)

# Set the tmp variables to the real variables
self.slots_cache = tmp_slots
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
# Set the tmp variables to the real variables
self.slots_cache = tmp_slots
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)

if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
# If initialize was called after a MovedError, clear it
self._moved_exception = None
# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
self._epoch += 1

async def aclose(self, attr: str = "nodes_cache") -> None:
self.default_node = None
Expand Down Expand Up @@ -2255,9 +2260,7 @@ async def _reinitialize_on_error(self, error):
self.reinitialize_counter = 0
else:
if isinstance(error, AskError):
self._pipe.cluster_client.nodes_manager.update_moved_exception(
error
)
self._pipe.cluster_client.nodes_manager.move_slot(error)

self._executing = False

Expand Down
Loading