Skip to content

Commit c05eb78

Browse files
Switch BucketManager to AppConnector (#4975)
2 parents b9b6654 + d774ea0 commit c05eb78

12 files changed

+86
-64
lines changed

src/bucket/BucketManager.cpp

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ namespace stellar
5252
{
5353

5454
std::unique_ptr<BucketManager>
55-
BucketManager::create(Application& app)
55+
BucketManager::create(AppConnector& app)
5656
{
5757
auto bucketManagerPtr =
5858
std::unique_ptr<BucketManager>(new BucketManager(app));
@@ -99,7 +99,7 @@ BucketManager::initialize()
9999
mLiveBucketList = std::make_unique<LiveBucketList>();
100100
mHotArchiveBucketList = std::make_unique<HotArchiveBucketList>();
101101
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
102-
mApp,
102+
mAppConnector,
103103
std::make_unique<BucketListSnapshot<LiveBucket>>(*mLiveBucketList,
104104
LedgerHeader()),
105105
std::make_unique<BucketListSnapshot<HotArchiveBucket>>(
@@ -131,40 +131,41 @@ BucketManager::getTmpDirManager()
131131
return *mTmpDirManager;
132132
}
133133

134-
BucketManager::BucketManager(Application& app)
135-
: mApp(app)
134+
BucketManager::BucketManager(AppConnector& appConnector)
135+
: mAppConnector(appConnector)
136136
, mLiveBucketList(nullptr)
137137
, mHotArchiveBucketList(nullptr)
138138
, mSnapshotManager(nullptr)
139139
, mTmpDirManager(nullptr)
140140
, mWorkDir(nullptr)
141141
, mLockedBucketDir(nullptr)
142-
, mBucketLiveObjectInsertBatch(app.getMetrics().NewMeter(
142+
, mBucketLiveObjectInsertBatch(appConnector.getMetrics().NewMeter(
143143
{"bucket", "batch", "objectsadded"}, "object"))
144-
, mBucketArchiveObjectInsertBatch(app.getMetrics().NewMeter(
144+
, mBucketArchiveObjectInsertBatch(appConnector.getMetrics().NewMeter(
145145
{"bucket", "batch-archive", "objectsadded"}, "object"))
146146
, mBucketAddLiveBatch(
147-
app.getMetrics().NewTimer({"bucket", "batch", "addtime"}))
148-
, mBucketAddArchiveBatch(
149-
app.getMetrics().NewTimer({"bucket", "batch-archive", "addtime"}))
150-
, mBucketSnapMerge(app.getMetrics().NewTimer({"bucket", "snap", "merge"}))
147+
appConnector.getMetrics().NewTimer({"bucket", "batch", "addtime"}))
148+
, mBucketAddArchiveBatch(appConnector.getMetrics().NewTimer(
149+
{"bucket", "batch-archive", "addtime"}))
150+
, mBucketSnapMerge(
151+
appConnector.getMetrics().NewTimer({"bucket", "snap", "merge"}))
151152
, mSharedBucketsSize(
152-
app.getMetrics().NewCounter({"bucket", "memory", "shared"}))
153+
appConnector.getMetrics().NewCounter({"bucket", "memory", "shared"}))
153154
, mLiveBucketListSizeCounter(
154-
app.getMetrics().NewCounter({"bucketlist", "size", "bytes"}))
155-
, mArchiveBucketListSizeCounter(
156-
app.getMetrics().NewCounter({"bucketlist-archive", "size", "bytes"}))
157-
, mCacheHitMeter(app.getMetrics().NewMeter({"bucketlistDB", "cache", "hit"},
158-
"bucketlistDB"))
159-
, mCacheMissMeter(app.getMetrics().NewMeter(
155+
appConnector.getMetrics().NewCounter({"bucketlist", "size", "bytes"}))
156+
, mArchiveBucketListSizeCounter(appConnector.getMetrics().NewCounter(
157+
{"bucketlist-archive", "size", "bytes"}))
158+
, mCacheHitMeter(appConnector.getMetrics().NewMeter(
159+
{"bucketlistDB", "cache", "hit"}, "bucketlistDB"))
160+
, mCacheMissMeter(appConnector.getMetrics().NewMeter(
160161
{"bucketlistDB", "cache", "miss"}, "bucketlistDB"))
161-
, mLiveBucketIndexCacheEntries(
162-
app.getMetrics().NewCounter({"bucketlistDB", "cache", "entries"}))
163-
, mLiveBucketIndexCacheBytes(
164-
app.getMetrics().NewCounter({"bucketlistDB", "cache", "bytes"}))
165-
, mBucketListEvictionCounters(app)
162+
, mLiveBucketIndexCacheEntries(appConnector.getMetrics().NewCounter(
163+
{"bucketlistDB", "cache", "entries"}))
164+
, mLiveBucketIndexCacheBytes(appConnector.getMetrics().NewCounter(
165+
{"bucketlistDB", "cache", "bytes"}))
166+
, mBucketListEvictionCounters(appConnector)
166167
, mEvictionStatistics(std::make_shared<EvictionStatistics>())
167-
, mConfig(app.getConfig())
168+
, mConfig(appConnector.getConfig())
168169
{
169170
for (uint32_t t =
170171
static_cast<uint32_t>(LedgerEntryTypeAndDurability::ACCOUNT);
@@ -174,10 +175,10 @@ BucketManager::BucketManager(Application& app)
174175
auto type = static_cast<LedgerEntryTypeAndDurability>(t);
175176
auto typeString = toString(type);
176177
mBucketListEntryCountCounters.emplace(
177-
type, app.getMetrics().NewCounter(
178+
type, appConnector.getMetrics().NewCounter(
178179
{"bucketlist", "entryCounts", typeString}));
179180
mBucketListEntrySizeCounters.emplace(
180-
type, app.getMetrics().NewCounter(
181+
type, appConnector.getMetrics().NewCounter(
181182
{"bucketlist", "entrySizes", typeString}));
182183
}
183184
}
@@ -334,7 +335,7 @@ medida::Meter&
334335
BucketManager::getBloomMissMeter() const
335336
{
336337
BUCKET_TYPE_ASSERT(BucketT);
337-
return mApp.getMetrics().NewMeter(
338+
return mAppConnector.getMetrics().NewMeter(
338339
{BucketT::METRIC_STRING, "bloom", "misses"}, "bloom");
339340
}
340341

@@ -343,7 +344,7 @@ medida::Meter&
343344
BucketManager::getBloomLookupMeter() const
344345
{
345346
BUCKET_TYPE_ASSERT(BucketT);
346-
return mApp.getMetrics().NewMeter(
347+
return mAppConnector.getMetrics().NewMeter(
347348
{BucketT::METRIC_STRING, "bloom", "lookups"}, "bloom");
348349
}
349350

@@ -843,8 +844,8 @@ BucketManager::getAllReferencedBuckets(HistoryArchiveState const& has) const
843844
}
844845

845846
// retain buckets that are referenced by a state in the publish queue.
846-
for (auto const& h :
847-
HistoryManager::getBucketsReferencedByPublishQueue(mApp.getConfig()))
847+
for (auto const& h : HistoryManager::getBucketsReferencedByPublishQueue(
848+
mAppConnector.getConfig()))
848849
{
849850
auto rhash = hexToBin256(h);
850851
auto rit = referenced.emplace(rhash);
@@ -1158,7 +1159,7 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
11581159
});
11591160

11601161
mEvictionFuture = task->get_future();
1161-
mApp.postOnEvictionBackgroundThread(
1162+
mAppConnector.postOnEvictionBackgroundThread(
11621163
bind(&task_t::operator(), task),
11631164
"SearchableLiveBucketListSnapshot: eviction scan");
11641165
}
@@ -1295,7 +1296,7 @@ BucketManager::checkForMissingBucketsFiles(HistoryArchiveState const& has)
12951296
}
12961297

12971298
void
1298-
BucketManager::assumeState(HistoryArchiveState const& has,
1299+
BucketManager::assumeState(Application& app, HistoryArchiveState const& has,
12991300
uint32_t maxProtocolVersion, bool restartMerges)
13001301
{
13011302
ZoneScoped;
@@ -1355,11 +1356,11 @@ BucketManager::assumeState(HistoryArchiveState const& has,
13551356

13561357
if (restartMerges)
13571358
{
1358-
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
1359+
mLiveBucketList->restartMerges(app, maxProtocolVersion,
13591360
has.currentLedger);
13601361
if (has.hasHotArchiveBuckets())
13611362
{
1362-
mHotArchiveBucketList->restartMerges(mApp, maxProtocolVersion,
1363+
mHotArchiveBucketList->restartMerges(app, maxProtocolVersion,
13631364
has.currentLedger);
13641365
}
13651366
}
@@ -1462,14 +1463,14 @@ BucketManager::loadCompleteLedgerState(HistoryArchiveState const& has)
14621463
}
14631464

14641465
std::shared_ptr<LiveBucket>
1465-
BucketManager::mergeBuckets(HistoryArchiveState const& has)
1466+
BucketManager::mergeBuckets(asio::io_context& ctx,
1467+
HistoryArchiveState const& has)
14661468
{
14671469
ZoneScoped;
14681470
releaseAssert(threadIsMain());
14691471
std::map<LedgerKey, LedgerEntry> ledgerMap = loadCompleteLedgerState(has);
14701472
BucketMetadata meta;
14711473
MergeCounters mc;
1472-
auto& ctx = mApp.getClock().getIOContext();
14731474
meta.ledgerVersion = mConfig.LEDGER_PROTOCOL_VERSION;
14741475
LiveBucketOutputIterator out(getTmpDir(), /*keepTombstoneEntries=*/false,
14751476
meta, mc, ctx, /*doFsync=*/true);
@@ -1656,7 +1657,7 @@ BucketManager::visitLedgerEntries(
16561657

16571658
std::shared_ptr<BasicWork>
16581659
BucketManager::scheduleVerifyReferencedBucketsWork(
1659-
HistoryArchiveState const& has)
1660+
Application& app, HistoryArchiveState const& has)
16601661
{
16611662
releaseAssert(threadIsMain());
16621663
std::set<Hash> hashes = getAllReferencedBuckets(has);
@@ -1699,7 +1700,7 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
16991700

17001701
seq.emplace_back(
17011702
std::make_shared<VerifyBucketWork<HotArchiveBucket>>(
1702-
mApp, filename, hash, indexIter->second, nullptr));
1703+
app, filename, hash, indexIter->second, nullptr));
17031704
}
17041705
else
17051706
{
@@ -1708,10 +1709,10 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
17081709
auto [indexIter, _] = liveIndexMap.emplace(i++, nullptr);
17091710

17101711
seq.emplace_back(std::make_shared<VerifyBucketWork<LiveBucket>>(
1711-
mApp, filename, hash, indexIter->second, nullptr));
1712+
app, filename, hash, indexIter->second, nullptr));
17121713
}
17131714
}
1714-
return mApp.getWorkScheduler().scheduleWork<WorkSequence>(
1715+
return app.getWorkScheduler().scheduleWork<WorkSequence>(
17151716
"verify-referenced-buckets", seq);
17161717
}
17171718

@@ -1734,7 +1735,7 @@ BucketManager::reportBucketEntryCountMetrics()
17341735
countCounter =
17351736
mBucketListEntryCountCounters
17361737
.emplace(type,
1737-
mApp.getMetrics().NewCounter(
1738+
mAppConnector.getMetrics().NewCounter(
17381739
{"bucketlist", "entryCounts", typeString}))
17391740
.first;
17401741
}
@@ -1747,7 +1748,7 @@ BucketManager::reportBucketEntryCountMetrics()
17471748
sizeCounter =
17481749
mBucketListEntrySizeCounters
17491750
.emplace(type,
1750-
mApp.getMetrics().NewCounter(
1751+
mAppConnector.getMetrics().NewCounter(
17511752
{"bucketlist", "entrySizes", typeString}))
17521753
.first;
17531754
}

src/bucket/BucketManager.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace stellar
3232

3333
class TmpDir;
3434
class AbstractLedgerTxn;
35-
class Application;
35+
class AppConnector;
3636
class Bucket;
3737
class LiveBucketList;
3838
class HotArchiveBucketList;
@@ -74,12 +74,10 @@ class BucketManager : NonMovableOrCopyable
7474

7575
static std::string const kLockFilename;
7676

77-
// NB: ideally, BucketManager should have no access to mApp, as it's too
78-
// dangerous in the context of parallel application. BucketManager is quite
79-
// bloated, with lots of legacy code, so to ensure safety, annotate all
80-
// functions using mApp with `releaseAssert(threadIsMain())` and avoid
81-
// accessing mApp in the background.
82-
Application& mApp;
77+
// BucketManager uses AppConnector for thread-safe access to Application
78+
// services. AppConnector methods are either explicitly main-thread only
79+
// (with releaseAssert) or documented as thread-safe.
80+
AppConnector& mAppConnector;
8381
std::unique_ptr<LiveBucketList> mLiveBucketList;
8482
std::unique_ptr<HotArchiveBucketList> mHotArchiveBucketList;
8583
std::unique_ptr<BucketSnapshotManager> mSnapshotManager;
@@ -181,13 +179,13 @@ class BucketManager : NonMovableOrCopyable
181179
#endif
182180

183181
protected:
184-
BucketManager(Application& app);
182+
BucketManager(AppConnector& appConnector);
185183
void calculateSkipValues(LedgerHeader& currentHeader);
186184
std::string bucketFilename(std::string const& bucketHexHash);
187185
std::string bucketFilename(Hash const& hash);
188186

189187
public:
190-
static std::unique_ptr<BucketManager> create(Application& app);
188+
static std::unique_ptr<BucketManager> create(AppConnector& app);
191189
virtual ~BucketManager();
192190

193191
void initialize();
@@ -361,7 +359,7 @@ class BucketManager : NonMovableOrCopyable
361359

362360
// Assume state from `has` in BucketList: find and attach all buckets in
363361
// `has`, set current BL.
364-
void assumeState(HistoryArchiveState const& has,
362+
void assumeState(Application& app, HistoryArchiveState const& has,
365363
uint32_t maxProtocolVersion, bool restartMerges);
366364

367365
void shutdown();
@@ -382,7 +380,8 @@ class BucketManager : NonMovableOrCopyable
382380

383381
// Merge the bucket list of the provided HAS into a single "super bucket"
384382
// consisting of only live entries, and return it.
385-
std::shared_ptr<LiveBucket> mergeBuckets(HistoryArchiveState const& has);
383+
std::shared_ptr<LiveBucket> mergeBuckets(asio::io_context& ctx,
384+
HistoryArchiveState const& has);
386385

387386
// Visits all the active ledger entries or subset thereof.
388387
//
@@ -410,7 +409,8 @@ class BucketManager : NonMovableOrCopyable
410409
// Schedule a Work class that verifies the hashes of all referenced buckets
411410
// on background threads.
412411
std::shared_ptr<BasicWork>
413-
scheduleVerifyReferencedBucketsWork(HistoryArchiveState const& has);
412+
scheduleVerifyReferencedBucketsWork(Application& app,
413+
HistoryArchiveState const& has);
414414

415415
Config const& getConfig() const;
416416
void reportBucketEntryCountMetrics();

src/bucket/BucketSnapshotManager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ copyHistoricalSnapshots(
3737
}
3838

3939
BucketSnapshotManager::BucketSnapshotManager(
40-
Application& app, SnapshotPtrT<LiveBucket>&& snapshot,
40+
AppConnector& app, SnapshotPtrT<LiveBucket>&& snapshot,
4141
SnapshotPtrT<HotArchiveBucket>&& hotArchiveSnapshot,
4242
uint32_t numLiveHistoricalSnapshots)
4343
: mAppConnector(app)

src/bucket/BucketSnapshotManager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ class Timer;
2525
namespace stellar
2626
{
2727

28-
class Application;
2928
class LiveBucketList;
3029
template <class BucketT> class BucketListSnapshot;
3130
class SearchableLiveBucketListSnapshot;
@@ -75,7 +74,8 @@ class BucketSnapshotManager : NonMovableOrCopyable
7574
// numHistoricalLedgers is the number of historical snapshots that the
7675
// snapshot manager will maintain. If numHistoricalLedgers is 5, snapshots
7776
// will be capable of querying state from ledger [lcl, lcl - 5].
78-
BucketSnapshotManager(Application& app, SnapshotPtrT<LiveBucket>&& snapshot,
77+
BucketSnapshotManager(AppConnector& app,
78+
SnapshotPtrT<LiveBucket>&& snapshot,
7979
SnapshotPtrT<HotArchiveBucket>&& hotArchiveSnapshot,
8080
uint32_t numHistoricalLedgers);
8181

src/bucket/BucketUtils.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "bucket/HotArchiveBucket.h"
77
#include "bucket/LiveBucket.h"
88
#include "ledger/LedgerTypeUtils.h"
9+
#include "main/AppConnector.h"
910
#include "main/Application.h"
1011
#include "util/types.h"
1112
#include "xdr/Stellar-ledger-entries.h"
@@ -167,7 +168,7 @@ EvictionResultCandidates::isValid(uint32_t currLedgerSeq,
167168
currSas.startingEvictionScanLevel;
168169
}
169170

170-
EvictionCounters::EvictionCounters(Application& app)
171+
EvictionCounters::EvictionCounters(AppConnector& app)
171172
: entriesEvicted(app.getMetrics().NewCounter(
172173
{"state-archival", "eviction", "entries-evicted"}))
173174
, bytesScannedForEviction(app.getMetrics().NewCounter(

src/bucket/BucketUtils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ namespace stellar
2323
{
2424

2525
class Application;
26+
class AppConnector;
2627
class LiveBucket;
2728
class HotArchiveBucket;
2829
template <class BucketT> class BucketListSnapshot;
@@ -156,7 +157,7 @@ struct EvictionCounters
156157
medida::Counter& evictionCyclePeriod;
157158
medida::Counter& averageEvictedEntryAge;
158159

159-
EvictionCounters(Application& app);
160+
EvictionCounters(AppConnector& app);
160161
};
161162

162163
class EvictionStatistics

src/bucket/test/BucketManagerTests.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ TEST_CASE("skip list", "[bucket][bucketmanager]")
104104
class BucketManagerTest : public BucketManager
105105
{
106106
public:
107-
BucketManagerTest(Application& app) : BucketManager(app)
107+
BucketManagerTest(Application& app)
108+
: BucketManager(app.getAppConnector())
108109
{
109110
}
110111
void

src/catchup/AssumeStateWork.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ AssumeStateWork::doWork()
9292
&liveBuckets = mLiveBuckets,
9393
&hotArchiveBuckets =
9494
mHotArchiveBuckets](Application& app) {
95-
app.getBucketManager().assumeState(has, maxProtocolVersion,
95+
app.getBucketManager().assumeState(app, has, maxProtocolVersion,
9696
restartMerges);
9797

9898
// Drop bucket references once assume state complete since buckets

src/main/AppConnector.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,20 @@ AppConnector::postOnOverlayThread(std::function<void()>&& f,
9797
mApp.postOnOverlayThread(std::move(f), message);
9898
}
9999

100+
void
101+
AppConnector::postOnBackgroundThread(std::function<void()>&& f,
102+
std::string const& jobName)
103+
{
104+
mApp.postOnBackgroundThread(std::move(f), jobName);
105+
}
106+
107+
void
108+
AppConnector::postOnEvictionBackgroundThread(std::function<void()>&& f,
109+
std::string const& jobName)
110+
{
111+
mApp.postOnEvictionBackgroundThread(std::move(f), jobName);
112+
}
113+
100114
Config const&
101115
AppConnector::getConfig() const
102116
{

0 commit comments

Comments
 (0)