Skip to content

Commit de93989

Browse files
dranikpgabhijat
authored andcommitted
fix(tiering): Basic stash backpressure (#5973)
Provide basic backpressure for tiered storage stashes Signed-off-by: Abhijat Malviya <[email protected]>
1 parent faa34a8 commit de93989

File tree

10 files changed

+101
-66
lines changed

10 files changed

+101
-66
lines changed

.github/actions/repeat/action.yml

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -44,44 +44,5 @@ runs:
4444
shell: bash
4545
run: |
4646
ls -l ${GITHUB_WORKSPACE}/
47-
cd ${GITHUB_WORKSPACE}/tests
48-
echo "Current commit is ${{github.sha}}"
49-
pip3 install -r dragonfly/requirements.txt
50-
# used by PyTests
51-
export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}"
52-
export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors
53-
if [[ "${{ inputs.epoll }}" == "epoll" ]]; then
54-
FORCE_EPOLL="--df force_epoll=true"
55-
else
56-
FORCE_EPOLL=""
57-
fi
58-
if [[ $"{{ inputs.vmodule_expression }}" != "" ]]; then
59-
VMOD="--df vmodule=${{ inputs.vmodule_expression }}"
60-
else
61-
VMOD=""
62-
fi
63-
echo Running command: timeout ${{ inputs.timeout }} pytest ${{ inputs.expression }} --drop-data-after-each-test ${FORCE_EPOLL} ${VMOD} --color=yes --json-report --json-report-file=report.json --log-cli-level=DEBUG --count=${{ inputs.count }}
64-
timeout ${{ inputs.timeout }} pytest ${{ inputs.expression }} --drop-data-after-each-test ${FORCE_EPOLL} ${VMOD} --color=yes --json-report --json-report-file=report.json --log-cli-level=DEBUG --count=${{ inputs.count }} || code=$?
65-
# timeout returns 124 if we exceeded the timeout duration
66-
if [[ $code -eq 124 ]]; then
67-
# Add an extra new line here because when tests timeout the first line below continues from the test failure name
68-
echo "\n"
69-
echo "🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑"
70-
echo "🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 TESTS TIMEDOUT 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑"
71-
echo "🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑 🛑"
72-
# Copy the last log file because we timedout and pytest did not copy it over
73-
# the /tmp/failed/ folder
74-
cat /tmp/last_test_log_dir.txt | xargs -I {} mv {}/ /tmp/failed/
75-
exit 1
76-
fi
77-
78-
# when a test fails in pytest it returns 1 but there are other return codes as well so we just check if the code is non zero
79-
if [[ $code -ne 0 ]]; then
80-
exit 1
81-
fi
82-
env:
83-
# Add environment variables to enable the S3 snapshot test.
84-
DRAGONFLY_S3_BUCKET: ${{ inputs.s3-bucket }}
85-
AWS_ACCESS_KEY_ID: ${{ inputs.aws-access-key-id }}
86-
AWS_SECRET_ACCESS_KEY: ${{ inputs.aws-secret-access-key }}
87-
AWS_REGION: us-east-1
47+
cd ${GITHUB_WORKSPACE}/build
48+
./dragonfly_test --logtostderr --gtest_filter=DflyEngineTest.ZeroAllocationEviction --v 3 --vmodule=uring_proactor=0,main_service=0,generic_family=0,scheduler=0,string_family=0,proactor_base=0 --gtest_repeat=1000 --gtest_break_on_failure

.github/workflows/repeat-tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ jobs:
8282
if: ${{ inputs.use_release != 'yes' }}
8383
run: |
8484
# -no-pie to disable address randomization so we could symbolize stacktraces
85-
cmake -B ${GITHUB_WORKSPACE}/build -DCMAKE_BUILD_TYPE=${{matrix.build-type}} -GNinja \
85+
cmake -B ${GITHUB_WORKSPACE}/build -DCMAKE_BUILD_TYPE=Release -GNinja \
8686
-DCMAKE_CXX_COMPILER_LAUNCHER=ccache -DPRINT_STACKTRACES_ON_SIGNAL=ON \
8787
-DCMAKE_CXX_FLAGS=-no-pie -DHELIO_STACK_CHECK:STRING=4096
88-
cd ${GITHUB_WORKSPACE}/build && ninja dragonfly
88+
cd ${GITHUB_WORKSPACE}/build && ninja dragonfly_test
8989
pwd
9090
ls -l ..
9191

src/server/dragonfly_test.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,6 @@ TEST_F(DflyEngineTest, StickyEviction) {
564564
#endif
565565

566566
TEST_F(DflyEngineTest, ZeroAllocationEviction) {
567-
GTEST_SKIP() << "Currently being fixed, unblock CI";
568567
max_memory_limit = 500000; // 0.5mb
569568
shard_set->TEST_EnableCacheMode();
570569

src/server/string_family.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class SetCmd {
8484
uint32_t memcache_flags = 0;
8585
uint64_t expire_after_ms = 0; // Relative value based on now. 0 means no expiration.
8686
optional<StringResult>* prev_val = nullptr; // if set, previous value will be stored if found
87+
optional<util::fb2::Future<bool>>* backpressure = nullptr;
8788

8889
constexpr bool IsConditionalSet() const {
8990
return flags & SET_IF_NOTEXIST || flags & SET_IF_EXISTS;
@@ -935,8 +936,12 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
935936
EngineShard* shard = op_args_.shard;
936937

937938
// Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded.
938-
if (auto* ts = shard->tiered_storage(); ts)
939-
ts->TryStash(op_args_.db_cntx.db_index, key, pv);
939+
// If we are beyond the offloading threshold, TryStash might return a backpressure future.
940+
if (auto* ts = shard->tiered_storage(); ts) {
941+
auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv, true);
942+
if (bp && params.backpressure)
943+
*params.backpressure = std::move(*bp);
944+
}
940945

941946
if (explicit_journal_ && op_args_.shard->journal()) {
942947
RecordJournal(params, key, value);
@@ -1056,11 +1061,19 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
10561061
if (sparams.flags & SetCmd::SET_GET)
10571062
sparams.prev_val = &prev;
10581063

1064+
optional<util::fb2::Future<bool>> backpressure;
1065+
sparams.backpressure = &backpressure;
1066+
10591067
OpStatus result = SetGeneric(sparams, key, value, cmnd_cntx);
10601068
if (result == OpStatus::WRONG_TYPE) {
10611069
return builder->SendError(kWrongTypeErr);
10621070
}
10631071

1072+
// If backpressure was provided, wait with reasonable limit (to avoid client deadlocking).
1073+
if (backpressure) {
1074+
std::move(backpressure)->GetFor(5ms);
1075+
}
1076+
10641077
if (sparams.flags & SetCmd::SET_GET) {
10651078
return GetReplies{cmnd_cntx.rb}.Send(std::move(prev));
10661079
}

src/server/tiered_storage.cc

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
8585

8686
// Clear IO pending flag for entry
8787
void ClearIoPending(OpManager::KeyRef key) {
88+
UnblockBackpressure(key, false);
8889
if (auto pv = Find(key); pv) {
8990
pv->SetStashPending(false);
9091
stats_.total_cancels++;
@@ -143,6 +144,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
143144
// Find entry by key in db_slice and store external segment in place of original value.
144145
// Update memory stats
145146
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
147+
UnblockBackpressure(key, true);
146148
if (auto* pv = Find(key); pv) {
147149
auto* stats = GetDbTableStats(key.first);
148150

@@ -169,6 +171,12 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
169171
SetExternal({sub_dbid, sub_key}, sub_segment);
170172
}
171173

174+
// If any backpressure (throttling) is active, notify that the operation finished
175+
void UnblockBackpressure(OpManager::KeyRef id, bool result) {
176+
if (auto node = ts_->stash_backpressure_.extract(id); !node.empty())
177+
node.mapped().Resolve(result);
178+
}
179+
172180
struct {
173181
uint64_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
174182
uint64_t total_defrags = 0;
@@ -309,6 +317,8 @@ error_code TieredStorage::Open(string_view base_path) {
309317
}
310318

311319
void TieredStorage::Close() {
320+
for (auto& [_, f] : stash_backpressure_)
321+
f.Resolve(false);
312322
op_manager_->Close();
313323
}
314324

@@ -350,9 +360,10 @@ template TieredStorage::TResult<size_t> TieredStorage::Modify(
350360
DbIndex dbid, std::string_view key, const PrimeValue& value,
351361
std::function<size_t(std::string*)> modf);
352362

353-
bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
363+
std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, string_view key,
364+
PrimeValue* value, bool provide_bp) {
354365
if (!ShouldStash(*value))
355-
return false;
366+
return {};
356367

357368
// This invariant should always hold because ShouldStash tests for IoPending flag.
358369
CHECK(!bins_->IsPending(dbid, key));
@@ -361,7 +372,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
361372
// with a lot of underutilized disk space.
362373
if (op_manager_->GetStats().pending_stash_cnt >= config_.write_depth_limit) {
363374
++stats_.stash_overflow_cnt;
364-
return false;
375+
return {};
365376
}
366377

367378
StringOrView raw_string = value->GetRawString();
@@ -375,15 +386,21 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
375386
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
376387
id = bin->first;
377388
ec = op_manager_->Stash(id, bin->second);
389+
} else {
390+
return {}; // Silently added to bin
378391
}
379392

380393
if (ec) {
381394
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
382395
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
383-
return false;
396+
return {};
384397
}
385398

386-
return true;
399+
// If we are in the active offloading phase, throttle stashes by providing backpressure future
400+
if (provide_bp && ShouldOffload())
401+
return stash_backpressure_[{dbid, string{key}}];
402+
403+
return {};
387404
}
388405

389406
void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
@@ -405,6 +422,11 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
405422

406423
void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
407424
DCHECK(value->HasStashPending());
425+
426+
// If any previous write was happening, it has been cancelled
427+
if (auto node = stash_backpressure_.extract(make_pair(dbid, key)); !node.empty())
428+
std::move(node.mapped()).Resolve(false);
429+
408430
if (OccupiesWholePages(value->Size())) {
409431
op_manager_->Delete(KeyRef(dbid, key));
410432
} else if (auto bin = bins_->Delete(dbid, key); bin) {

src/server/tiered_storage.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "server/common.h"
1212
#include "server/table.h"
1313
#include "server/tiering/common.h"
14+
#include "server/tiering/entry_map.h"
1415
#include "server/tx_base.h"
1516
#include "util/fibers/future.h"
1617

@@ -59,9 +60,10 @@ class TieredStorage {
5960
TResult<T> Modify(DbIndex dbid, std::string_view key, const PrimeValue& value,
6061
std::function<T(std::string*)> modf);
6162

62-
// Stash value. Sets IO_PENDING flag and unsets it on error or when finished
63-
// Returns true if item was scheduled for stashing.
64-
bool TryStash(DbIndex dbid, std::string_view key, PrimeValue* value);
63+
// Stash value. Sets IO_PENDING flag and unsets it on error or when finished.
64+
// Returns optional future for backpressure if `provide_bp` is set and conditions are met.
65+
std::optional<util::fb2::Future<bool>> TryStash(DbIndex dbid, std::string_view key,
66+
PrimeValue* value, bool provide_bp = false);
6567

6668
// Delete value, must be offloaded (external type)
6769
void Delete(DbIndex dbid, PrimeValue* value);
@@ -105,6 +107,9 @@ class TieredStorage {
105107

106108
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
107109

110+
// Stash operations waiting for completion to throttle
111+
tiering::EntryMap<::util::fb2::Future<bool>> stash_backpressure_;
112+
108113
std::unique_ptr<ShardOpManager> op_manager_;
109114
std::unique_ptr<tiering::SmallBins> bins_;
110115

src/server/tiering/entry_map.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <absl/container/flat_hash_map.h>
8+
9+
#include <string>
10+
11+
#include "server/tx_base.h"
12+
13+
namespace dfly::tiering {
14+
15+
namespace detail {
16+
struct Hasher {
17+
using is_transparent = void;
18+
template <typename S> size_t operator()(const std::pair<DbIndex, S>& p) const {
19+
return absl::HashOf(p);
20+
}
21+
};
22+
23+
struct Eq {
24+
using is_transparent = void;
25+
template <typename S1, typename S2>
26+
bool operator()(const std::pair<DbIndex, S1>& l, const std::pair<DbIndex, S2>& r) const {
27+
const auto& [i1, s1] = l;
28+
const auto& [i2, s2] = r;
29+
return i1 == i2 && s1 == s2;
30+
}
31+
};
32+
} // namespace detail
33+
34+
// Map of key (db index, string key) -> T with heterogeneous lookup
35+
template <typename T>
36+
using EntryMap =
37+
absl::flat_hash_map<std::pair<DbIndex, std::string>, T, detail::Hasher, detail::Eq>;
38+
39+
} // namespace dfly::tiering

src/server/tiering/small_bins.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,7 @@ std::vector<std::pair<DbIndex, std::string>> SmallBins::ReportStashAborted(BinId
122122
}
123123

124124
std::optional<SmallBins::BinId> SmallBins::Delete(DbIndex dbid, std::string_view key) {
125-
std::pair<DbIndex, std::string> key_pair{dbid, key};
126-
auto it = current_bin_.find(key_pair);
127-
128-
if (it != current_bin_.end()) {
125+
if (auto it = current_bin_.find(make_pair(dbid, key)); it != current_bin_.end()) {
129126
size_t stashed_size = StashedValueSize(it->second);
130127
DCHECK_GE(current_bin_bytes_, stashed_size);
131128

@@ -135,7 +132,7 @@ std::optional<SmallBins::BinId> SmallBins::Delete(DbIndex dbid, std::string_view
135132
}
136133

137134
for (auto& [id, keys] : pending_bins_) {
138-
if (keys.erase(key_pair))
135+
if (keys.erase(make_pair(dbid, key)))
139136
return keys.empty() ? std::make_optional(id) : std::nullopt;
140137
}
141138
return std::nullopt;

src/server/tiering/small_bins.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <vector>
1212

1313
#include "server/tiering/disk_storage.h"
14+
#include "server/tiering/entry_map.h"
1415

1516
namespace dfly::tiering {
1617

@@ -84,12 +85,10 @@ class SmallBins {
8485
BinId last_bin_id_ = 0;
8586

8687
unsigned current_bin_bytes_ = 0;
87-
absl::flat_hash_map<std::pair<DbIndex, std::string>, std::string> current_bin_;
88+
tiering::EntryMap<std::string> current_bin_;
8889

8990
// Pending stashes, their keys and value sizes
90-
absl::flat_hash_map<unsigned /* id */,
91-
absl::flat_hash_map<std::pair<DbIndex, std::string> /* key*/, DiskSegment>>
92-
pending_bins_;
91+
absl::flat_hash_map<unsigned /* id */, tiering::EntryMap<DiskSegment>> pending_bins_;
9392

9493
// Map of bins that were stashed and should be deleted when number of entries reaches 0
9594
absl::flat_hash_map<size_t /*offset*/, StashInfo> stashed_bins_;

tests/dragonfly/tiering_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ async def run(sub_ops):
9898
"tiered_prefix": "/tmp/tiered/backing_master",
9999
"maxmemory": "2.0G",
100100
"cache_mode": True,
101-
"tiered_offload_threshold": "0.9",
101+
"tiered_offload_threshold": "0.6",
102102
"tiered_upload_threshold": "0.2",
103-
"tiered_storage_write_depth": 100,
103+
"tiered_storage_write_depth": 1000,
104104
}
105105
)
106106
async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceFactory):
@@ -109,7 +109,7 @@ async def test_full_sync(async_client: aioredis.Redis, df_factory: DflyInstanceF
109109
cache_mode=True,
110110
maxmemory="2.0G",
111111
tiered_prefix="/tmp/tiered/backing_replica",
112-
tiered_offload_threshold="0.8",
112+
tiered_offload_threshold="0.5",
113113
tiered_storage_write_depth=1000,
114114
)
115115
replica.start()

0 commit comments

Comments
 (0)