Skip to content

Commit 44a669d

Browse files
Fetch fresh snapshot from peer when joining (#6700)
Co-authored-by: Amaury Chamayou <[email protected]>
1 parent 20b7a05 commit 44a669d

File tree

24 files changed

+1176
-216
lines changed

24 files changed

+1176
-216
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ jobs:
114114
# Build tools
115115
tdnf -y install build-essential clang cmake ninja-build which
116116
# Dependencies
117-
tdnf -y install openssl-devel libuv-devel
117+
tdnf -y install openssl-devel libuv-devel curl-devel
118118
# Test dependencies
119119
tdnf -y install libarrow-devel parquet-libs-devel lldb
120120
shell: bash

CMakeLists.txt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,15 @@ elseif(COMPILE_TARGET STREQUAL "virtual")
255255
endif()
256256

257257
target_link_libraries(
258-
cchost PRIVATE uv ${TLS_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT}
259-
${LINK_LIBCXX} ccfcrypto.host
258+
cchost
259+
PRIVATE uv
260+
${TLS_LIBRARY}
261+
${CMAKE_DL_LIBS}
262+
${CMAKE_THREAD_LIBS_INIT}
263+
${LINK_LIBCXX}
264+
ccfcrypto.host
265+
curl
266+
http_parser.host
260267
)
261268

262269
install(TARGETS cchost DESTINATION bin)
@@ -711,6 +718,7 @@ if(BUILD_TESTS)
711718
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/contiguous_set.cpp
712719
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/unit_strings.cpp
713720
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/dl_list.cpp
721+
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/nonstd.cpp
714722
)
715723
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})
716724

doc/host_config_schema/cchost_config.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,11 @@
396396
"type": "boolean",
397397
"default": true,
398398
"description": "Whether to follow redirects to the primary node of the existing service to join"
399+
},
400+
"fetch_recent_snapshot": {
401+
"type": "boolean",
402+
"default": true,
403+
"description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases"
399404
}
400405
},
401406
"required": ["target_rpc_address"],

include/ccf/ds/nonstd.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,14 @@ namespace ccf::nonstd
185185
});
186186
}
187187

188+
static inline std::string_view trim(
189+
std::string_view s, std::string_view trim_chars = " \t\r\n")
190+
{
191+
const auto start = std::min(s.find_first_not_of(trim_chars), s.size());
192+
const auto end = std::min(s.find_last_not_of(trim_chars) + 1, s.size());
193+
return s.substr(start, end - start);
194+
}
195+
188196
/// Iterate through tuple, calling functor on each element
189197
template <size_t I = 0, typename F, typename... Ts>
190198
static void tuple_for_each(const std::tuple<Ts...>& t, const F& f)

include/ccf/node/startup_config.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ namespace ccf
7676
bool operator==(const Attestation&) const = default;
7777
};
7878
Attestation attestation = {};
79+
80+
struct Snapshots
81+
{
82+
std::string directory = "snapshots";
83+
size_t tx_count = 10'000;
84+
std::optional<std::string> read_only_directory = std::nullopt;
85+
86+
bool operator==(const Snapshots&) const = default;
87+
};
88+
Snapshots snapshots = {};
7989
};
8090

8191
struct StartupConfig : CCFConfig

python/src/ccf/ledger.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,19 @@ def range_from_filename(filename: str) -> Tuple[int, Optional[int]]:
126126
raise ValueError(f"Could not read seqno range from ledger file {filename}")
127127

128128

129+
def snapshot_index_from_filename(filename: str) -> Tuple[int, int]:
130+
elements = (
131+
os.path.basename(filename)
132+
.replace(COMMITTED_FILE_SUFFIX, "")
133+
.replace("snapshot_", "")
134+
.split("_")
135+
)
136+
if len(elements) == 2:
137+
return (int(elements[0]), int(elements[1]))
138+
else:
139+
raise ValueError(f"Could not read snapshot index from file name {filename}")
140+
141+
129142
class GcmHeader:
130143
view: int
131144
seqno: int
@@ -851,6 +864,17 @@ def get_len(self) -> int:
851864
return self._file_size
852865

853866

867+
def latest_snapshot(snapshots_dir):
868+
best_name, best_seqno = None, None
869+
for s in os.listdir(snapshots_dir):
870+
with ccf.ledger.Snapshot(os.path.join(snapshots_dir, s)) as snapshot:
871+
snapshot_seqno = snapshot.get_public_domain().get_seqno()
872+
if best_seqno is None or snapshot_seqno > best_seqno:
873+
best_name = s
874+
best_seqno = snapshot_seqno
875+
return best_name
876+
877+
854878
class LedgerChunk:
855879
"""
856880
Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk.

src/common/configuration.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ namespace ccf
8484
snp_security_policy_file,
8585
snp_uvm_endorsements_file);
8686

87+
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::Snapshots);
88+
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::Snapshots);
89+
DECLARE_JSON_OPTIONAL_FIELDS(
90+
CCFConfig::Snapshots, directory, tx_count, read_only_directory);
91+
8792
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig);
8893
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network);
8994
DECLARE_JSON_OPTIONAL_FIELDS(
@@ -94,6 +99,7 @@ namespace ccf
9499
ledger_signatures,
95100
jwt,
96101
attestation,
102+
snapshots,
97103
node_to_node_message_limit,
98104
historical_cache_soft_limit);
99105

src/ds/test/nonstd.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,19 @@ TEST_CASE("rsplit" * doctest::test_suite("nonstd"))
265265
}
266266
}
267267
}
268+
269+
TEST_CASE("trim" * doctest::test_suite("nonstd"))
270+
{
271+
REQUIRE(ccf::nonstd::trim(" hello world ") == "hello world");
272+
REQUIRE(
273+
ccf::nonstd::trim(" \r\n\t\nhello world\n\n\r\t\t\n\t \n\t") ==
274+
"hello world");
275+
REQUIRE(ccf::nonstd::trim("..hello..") == "..hello..");
276+
REQUIRE(ccf::nonstd::trim("..hello..", ".") == "hello");
277+
278+
REQUIRE(ccf::nonstd::trim("hello") == "hello");
279+
REQUIRE(ccf::nonstd::trim(" h") == "h");
280+
REQUIRE(ccf::nonstd::trim("h ") == "h");
281+
REQUIRE(ccf::nonstd::trim(" ") == "");
282+
REQUIRE(ccf::nonstd::trim("") == "");
283+
}

src/host/configuration.h

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,6 @@ namespace host
103103
};
104104
Ledger ledger = {};
105105

106-
struct Snapshots
107-
{
108-
std::string directory = "snapshots";
109-
size_t tx_count = 10'000;
110-
std::optional<std::string> read_only_directory = std::nullopt;
111-
112-
bool operator==(const Snapshots&) const = default;
113-
};
114-
Snapshots snapshots = {};
115-
116106
struct Logging
117107
{
118108
ccf::LoggerLevel host_level = ccf::LoggerLevel::INFO;
@@ -155,6 +145,7 @@ namespace host
155145
ccf::NodeInfoNetwork::NetAddress target_rpc_address;
156146
ccf::ds::TimeString retry_timeout = {"1000ms"};
157147
bool follow_redirect = true;
148+
bool fetch_recent_snapshot = true;
158149

159150
bool operator==(const Join&) const = default;
160151
};
@@ -189,11 +180,6 @@ namespace host
189180
DECLARE_JSON_OPTIONAL_FIELDS(
190181
CCHostConfig::Ledger, directory, read_only_directories, chunk_size);
191182

192-
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Snapshots);
193-
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Snapshots);
194-
DECLARE_JSON_OPTIONAL_FIELDS(
195-
CCHostConfig::Snapshots, directory, tx_count, read_only_directory);
196-
197183
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Logging);
198184
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Logging);
199185
DECLARE_JSON_OPTIONAL_FIELDS(CCHostConfig::Logging, host_level, format);
@@ -216,7 +202,10 @@ namespace host
216202
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Join);
217203
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Join, target_rpc_address);
218204
DECLARE_JSON_OPTIONAL_FIELDS(
219-
CCHostConfig::Command::Join, retry_timeout, follow_redirect);
205+
CCHostConfig::Command::Join,
206+
retry_timeout,
207+
follow_redirect,
208+
fetch_recent_snapshot);
220209

221210
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover);
222211
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover);

src/host/main.cpp

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
#include "process_launcher.h"
2525
#include "rpc_connections.h"
2626
#include "sig_term.h"
27-
#include "snapshots.h"
27+
#include "snapshots/fetch.h"
28+
#include "snapshots/snapshot_manager.h"
2829
#include "ticker.h"
2930
#include "time_updater.h"
3031

@@ -376,7 +377,7 @@ int main(int argc, char** argv)
376377
config.ledger.read_only_directories);
377378
ledger.register_message_handlers(bp.get_dispatcher());
378379

379-
asynchost::SnapshotManager snapshots(
380+
snapshots::SnapshotManager snapshots(
380381
config.snapshots.directory,
381382
writer_factory,
382383
config.snapshots.read_only_directory);
@@ -507,8 +508,6 @@ int main(int argc, char** argv)
507508

508509
ccf::StartupConfig startup_config(config);
509510

510-
startup_config.snapshot_tx_interval = config.snapshots.tx_count;
511-
512511
if (startup_config.attestation.snp_security_policy_file.has_value())
513512
{
514513
auto security_policy_file =
@@ -690,22 +689,62 @@ int main(int argc, char** argv)
690689
config.command.type == StartType::Join ||
691690
config.command.type == StartType::Recover)
692691
{
693-
auto latest_committed_snapshot =
694-
snapshots.find_latest_committed_snapshot();
695-
if (latest_committed_snapshot.has_value())
696-
{
697-
auto& [snapshot_dir, snapshot_file] = latest_committed_snapshot.value();
698-
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);
692+
auto latest_local_snapshot = snapshots.find_latest_committed_snapshot();
699693

700-
LOG_INFO_FMT(
701-
"Found latest snapshot file: {} (size: {})",
702-
snapshot_dir / snapshot_file,
703-
startup_snapshot.size());
694+
if (
695+
config.command.type == StartType::Join &&
696+
config.command.join.fetch_recent_snapshot)
697+
{
698+
// Try to fetch a recent snapshot from peer
699+
const size_t latest_local_idx = latest_local_snapshot.has_value() ?
700+
snapshots::get_snapshot_idx_from_file_name(
701+
latest_local_snapshot->second) :
702+
0;
703+
auto latest_peer_snapshot = snapshots::fetch_from_peer(
704+
config.command.join.target_rpc_address,
705+
config.command.service_certificate_file,
706+
latest_local_idx);
707+
708+
if (latest_peer_snapshot.has_value())
709+
{
710+
LOG_INFO_FMT(
711+
"Received snapshot {} from peer (size: {}) - writing this to disk "
712+
"and using for join startup",
713+
latest_peer_snapshot->snapshot_name,
714+
latest_peer_snapshot->snapshot_data.size());
715+
716+
const auto dst_path = fs::path(config.snapshots.directory) /
717+
fs::path(latest_peer_snapshot->snapshot_name);
718+
if (files::exists(dst_path))
719+
{
720+
LOG_FATAL_FMT(
721+
"Unable to write peer snapshot - already have a file at {}. "
722+
"Exiting.",
723+
dst_path);
724+
return static_cast<int>(CLI::ExitCodes::FileError);
725+
}
726+
files::dump(latest_peer_snapshot->snapshot_data, dst_path);
727+
startup_snapshot = latest_peer_snapshot->snapshot_data;
728+
}
704729
}
705-
else
730+
731+
if (startup_snapshot.empty())
706732
{
707-
LOG_INFO_FMT(
708-
"No snapshot found: Node will replay all historical transactions");
733+
if (latest_local_snapshot.has_value())
734+
{
735+
auto& [snapshot_dir, snapshot_file] = latest_local_snapshot.value();
736+
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);
737+
738+
LOG_INFO_FMT(
739+
"Found latest local snapshot file: {} (size: {})",
740+
snapshot_dir / snapshot_file,
741+
startup_snapshot.size());
742+
}
743+
else
744+
{
745+
LOG_INFO_FMT(
746+
"No snapshot found: Node will replay all historical transactions");
747+
}
709748
}
710749
}
711750

0 commit comments

Comments
 (0)