Skip to content

Commit 9802ee3

Browse files
Retries for fetching snapshots (#7317)
Co-authored-by: Amaury Chamayou <[email protected]>
1 parent b502534 commit 9802ee3

File tree

7 files changed

+125
-8
lines changed

7 files changed

+125
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1515
- Added logging of the initial node attestation value ("Initial node attestation...") (#7256).
1616
- Improved handling of socket errors in curlm callbacks (#7308)
1717
- Accept UVM endorsements with SVNs encoded as integers (#7316)
18+
- Node will now retry when fetching snapshots. This is controlled with `command.join.fetch_snapshot_max_attempts` and `command.join.fetch_snapshot_retry_interval`. (#7317)
1819

1920
### Fixed
2021

doc/host_config_schema/cchost_config.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,17 @@
382382
"type": "boolean",
383383
"default": true,
384384
"description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases"
385+
},
386+
"fetch_snapshot_max_attempts": {
387+
"type": "integer",
388+
"default": 3,
389+
"description": "Maximum number of attempts to fetch a recent snapshot from the target node",
390+
"minimum": 1
391+
},
392+
"fetch_snapshot_retry_interval": {
393+
"type": "string",
394+
"default": "1000ms",
395+
"description": "Interval (time string) between retries to fetch a recent snapshot from the target node"
385396
}
386397
},
387398
"required": ["target_rpc_address"],

src/host/configuration.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ namespace host
107107
ccf::ds::TimeString retry_timeout = {"1000ms"};
108108
bool follow_redirect = true;
109109
bool fetch_recent_snapshot = true;
110+
size_t fetch_snapshot_max_attempts = 3;
111+
ccf::ds::TimeString fetch_snapshot_retry_interval = {"1000ms"};
110112

111113
bool operator==(const Join&) const = default;
112114
};
@@ -160,7 +162,9 @@ namespace host
160162
CCHostConfig::Command::Join,
161163
retry_timeout,
162164
follow_redirect,
163-
fetch_recent_snapshot);
165+
fetch_recent_snapshot,
166+
fetch_snapshot_max_attempts,
167+
fetch_snapshot_retry_interval);
164168

165169
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover);
166170
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover);

src/host/run.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,9 @@ namespace ccf
861861
auto latest_peer_snapshot = snapshots::fetch_from_peer(
862862
config.command.join.target_rpc_address,
863863
config.command.service_certificate_file,
864-
latest_local_idx);
864+
latest_local_idx,
865+
config.command.join.fetch_snapshot_max_attempts,
866+
config.command.join.fetch_snapshot_retry_interval.count_ms());
865867

866868
if (latest_peer_snapshot.has_value())
867869
{

src/snapshots/fetch.h

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ namespace snapshots
3939
std::vector<uint8_t> snapshot_data;
4040
};
4141

42-
static std::optional<SnapshotResponse> fetch_from_peer(
42+
static std::optional<SnapshotResponse> try_fetch_from_peer(
4343
const std::string& peer_address,
4444
const std::string& path_to_peer_cert,
4545
size_t latest_local_snapshot)
@@ -259,4 +259,36 @@ namespace snapshots
259259
return std::nullopt;
260260
}
261261
}
262+
263+
static std::optional<SnapshotResponse> fetch_from_peer(
264+
const std::string& peer_address,
265+
const std::string& path_to_peer_cert,
266+
size_t latest_local_snapshot,
267+
size_t max_attempts,
268+
size_t retry_delay_ms)
269+
{
270+
for (size_t attempt = 0; attempt < max_attempts; ++attempt)
271+
{
272+
LOG_INFO_FMT(
273+
"Fetching snapshot from {} (attempt {}/{})",
274+
peer_address,
275+
attempt + 1,
276+
max_attempts);
277+
278+
if (attempt > 0)
279+
{
280+
std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms));
281+
}
282+
283+
auto response = try_fetch_from_peer(
284+
peer_address, path_to_peer_cert, latest_local_snapshot);
285+
if (response.has_value())
286+
{
287+
return response;
288+
}
289+
}
290+
LOG_INFO_FMT(
291+
"Exceeded maximum snapshot fetch retries ({}), giving up", max_attempts);
292+
return std::nullopt;
293+
}
262294
}

tests/e2e_operations.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import infra.concurrency
3434
from collections import defaultdict
3535
import ccf.read_ledger
36+
import re
3637

3738
from loguru import logger as LOG
3839

@@ -1749,6 +1750,67 @@ def test_error_message_on_failure_to_read_aci_sec_context(args):
17491750
), f"Did not find expected log messages: {expected_log_messages}"
17501751

17511752

1753+
def test_error_message_on_failure_to_fetch_snapshot(const_args):
1754+
args = copy.deepcopy(const_args)
1755+
args.nodes = infra.e2e_args.min_nodes(args, 0)
1756+
with infra.network.network(
1757+
args.nodes,
1758+
args.binary_dir,
1759+
args.debug_nodes,
1760+
pdb=args.pdb,
1761+
) as network:
1762+
network.start_and_open(args)
1763+
1764+
primary, _ = network.find_primary()
1765+
1766+
new_node = network.create_node("local://localhost")
1767+
1768+
# Shut down primary to cause snapshot fetch to fail
1769+
primary.remote.stop()
1770+
1771+
failed = False
1772+
try:
1773+
LOG.info("Starting join")
1774+
network.join_node(
1775+
new_node,
1776+
args.package,
1777+
args,
1778+
target_node=primary,
1779+
timeout=10,
1780+
from_snapshot=False,
1781+
wait_for_node_in_store=False,
1782+
)
1783+
new_node.wait_for_node_to_join(timeout=5)
1784+
except Exception as e:
1785+
LOG.info(f"Joining node could not join as expected {e}")
1786+
failed = True
1787+
1788+
assert failed, "Joining node could not join failed node as expected"
1789+
1790+
expected_log_messages = [
1791+
re.compile(r"Fetching snapshot from .* \(attempt 1/3\)"),
1792+
re.compile(r"Fetching snapshot from .* \(attempt 2/3\)"),
1793+
re.compile(r"Fetching snapshot from .* \(attempt 3/3\)"),
1794+
re.compile(
1795+
r"Exceeded maximum snapshot fetch retries \([0-9]+\), giving up"
1796+
),
1797+
]
1798+
1799+
out_path, _ = new_node.get_logs()
1800+
for line in open(out_path, "r", encoding="utf-8").readlines():
1801+
for expected in expected_log_messages:
1802+
match = re.search(expected, line)
1803+
if match:
1804+
expected_log_messages.remove(expected)
1805+
LOG.info(f"Found expected log message: {line}")
1806+
if len(expected_log_messages) == 0:
1807+
break
1808+
1809+
assert (
1810+
len(expected_log_messages) == 0
1811+
), f"Did not find expected log messages: {expected_log_messages}"
1812+
1813+
17521814
def run(args):
17531815
run_max_uncommitted_tx_count(args)
17541816
run_file_operations(args)

tests/infra/network.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,11 @@ def _setup_node(
344344
**kwargs,
345345
):
346346
# Contact primary if no target node is set
347-
primary, _ = self.find_primary(
348-
timeout=args.ledger_recovery_timeout if recovery else 10
349-
)
350-
target_node = target_node or primary
347+
if target_node is None:
348+
primary, _ = self.find_primary(
349+
timeout=args.ledger_recovery_timeout if recovery else 10
350+
)
351+
target_node = primary
351352
LOG.info(f"Joining from target node {target_node.local_node_id}")
352353

353354
committed_ledger_dirs = read_only_ledger_dirs or []
@@ -359,6 +360,9 @@ def _setup_node(
359360
if from_snapshot:
360361
# Only retrieve snapshot from primary if the snapshot directory is not specified
361362
if snapshots_dir is None:
363+
primary, _ = self.find_primary(
364+
timeout=args.ledger_recovery_timeout if recovery else 10
365+
)
362366
read_only_snapshots_dir = self.get_committed_snapshots(primary)
363367
if os.listdir(snapshots_dir) or os.listdir(read_only_snapshots_dir):
364368
LOG.info(
@@ -1028,10 +1032,11 @@ def join_node(
10281032
target_node=None,
10291033
timeout=JOIN_TIMEOUT,
10301034
stop_on_error=False,
1035+
wait_for_node_in_store=True,
10311036
**kwargs,
10321037
):
10331038
self.setup_join_node(node, lib_name, args, target_node, **kwargs)
1034-
self.run_join_node(node, timeout, stop_on_error)
1039+
self.run_join_node(node, timeout, stop_on_error, wait_for_node_in_store)
10351040

10361041
def trust_node(
10371042
self,

0 commit comments

Comments
 (0)