Skip to content

Commit e93c53e

Browse files
committed
fix(tiering): Basic stash backpressure
1 parent e36f974 commit e93c53e

File tree

3 files changed

+49
-9
lines changed

3 files changed

+49
-9
lines changed

src/server/string_family.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class SetCmd {
8484
uint32_t memcache_flags = 0;
8585
uint64_t expire_after_ms = 0; // Relative value based on now. 0 means no expiration.
8686
optional<StringResult>* prev_val = nullptr; // if set, previous value will be stored if found
87+
optional<util::fb2::Future<bool>>* backpressure = nullptr;
8788

8889
constexpr bool IsConditionalSet() const {
8990
return flags & SET_IF_NOTEXIST || flags & SET_IF_EXISTS;
@@ -935,8 +936,11 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
935936
EngineShard* shard = op_args_.shard;
936937

937938
// Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded.
938-
if (auto* ts = shard->tiered_storage(); ts)
939-
ts->TryStash(op_args_.db_cntx.db_index, key, pv);
939+
if (auto* ts = shard->tiered_storage(); ts) {
940+
auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv);
941+
if (bp && params.backpressure)
942+
*params.backpressure = std::move(*bp);
943+
}
940944

941945
if (explicit_journal_ && op_args_.shard->journal()) {
942946
RecordJournal(params, key, value);
@@ -1056,11 +1060,17 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
10561060
if (sparams.flags & SetCmd::SET_GET)
10571061
sparams.prev_val = &prev;
10581062

1063+
optional<util::fb2::Future<bool>> backpressure;
1064+
sparams.backpressure = &backpressure;
1065+
10591066
OpStatus result = SetGeneric(sparams, key, value, cmnd_cntx);
10601067
if (result == OpStatus::WRONG_TYPE) {
10611068
return builder->SendError(kWrongTypeErr);
10621069
}
10631070

1071+
if (backpressure)
1072+
std::move(backpressure)->GetFor(100ms);
1073+
10641074
if (sparams.flags & SetCmd::SET_GET) {
10651075
return GetReplies{cmnd_cntx.rb}.Send(std::move(prev));
10661076
}

src/server/tiered_storage.cc

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
8585

8686
// Clear IO pending flag for entry
8787
void ClearIoPending(OpManager::KeyRef key) {
88+
FlagBackpressure(key, false);
8889
if (auto pv = Find(key); pv) {
8990
pv->SetStashPending(false);
9091
stats_.total_cancels++;
@@ -143,6 +144,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
143144
// Find entry by key in db_slice and store external segment in place of original value.
144145
// Update memory stats
145146
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
147+
FlagBackpressure(key, true);
146148
if (auto* pv = Find(key); pv) {
147149
auto* stats = GetDbTableStats(key.first);
148150

@@ -169,6 +171,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
169171
SetExternal({sub_dbid, sub_key}, sub_segment);
170172
}
171173

174+
void FlagBackpressure(OpManager::KeyRef id, bool result) {
175+
const auto& [dbid, key] = id;
176+
if (auto it = ts_->backpressure_.find({dbid, string{key}}); it != ts_->backpressure_.end()) {
177+
it->second.Resolve(result);
178+
ts_->backpressure_.erase(it);
179+
}
180+
}
181+
172182
struct {
173183
uint64_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
174184
uint64_t total_defrags = 0;
@@ -309,6 +319,8 @@ error_code TieredStorage::Open(string_view base_path) {
309319
}
310320

311321
void TieredStorage::Close() {
322+
for (auto& [_, f] : backpressure_)
323+
f.Resolve(false);
312324
op_manager_->Close();
313325
}
314326

@@ -350,9 +362,10 @@ template TieredStorage::TResult<size_t> TieredStorage::Modify(
350362
DbIndex dbid, std::string_view key, const PrimeValue& value,
351363
std::function<size_t(std::string*)> modf);
352364

353-
bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
365+
std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, string_view key,
366+
PrimeValue* value) {
354367
if (!ShouldStash(*value))
355-
return false;
368+
return {};
356369

357370
// This invariant should always hold because ShouldStash tests for IoPending flag.
358371
CHECK(!bins_->IsPending(dbid, key));
@@ -361,7 +374,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
361374
// with a lot of underutilized disk space.
362375
if (op_manager_->GetStats().pending_stash_cnt >= config_.write_depth_limit) {
363376
++stats_.stash_overflow_cnt;
364-
return false;
377+
return {};
365378
}
366379

367380
StringOrView raw_string = value->GetRawString();
@@ -375,15 +388,24 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
375388
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
376389
id = bin->first;
377390
ec = op_manager_->Stash(id, bin->second);
391+
} else {
392+
return {}; // Silently added to bin
378393
}
379394

380395
if (ec) {
381396
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
382397
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
383-
return false;
398+
return {};
399+
}
400+
401+
// Throttle stashes over the offload boundary
402+
if (ShouldOffload()) {
403+
util::fb2::Future<bool> fut;
404+
backpressure_[{dbid, string{key}}] = fut;
405+
return fut;
384406
}
385407

386-
return true;
408+
return {};
387409
}
388410

389411
void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
@@ -405,6 +427,11 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
405427

406428
void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
407429
DCHECK(value->HasStashPending());
430+
431+
// If any previous write was happening, it has been cancelled
432+
if (auto node = backpressure_.extract({dbid, string{key}}); !node.empty())
433+
std::move(node.mapped()).Resolve(false);
434+
408435
if (OccupiesWholePages(value->Size())) {
409436
op_manager_->Delete(KeyRef(dbid, key));
410437
} else if (auto bin = bins_->Delete(dbid, key); bin) {

src/server/tiered_storage.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ class TieredStorage {
6060
std::function<T(std::string*)> modf);
6161

6262
// Stash value. Sets IO_PENDING flag and unsets it on error or when finished
63-
// Returns true if item was scheduled for stashing.
64-
bool TryStash(DbIndex dbid, std::string_view key, PrimeValue* value);
63+
// Returns opional backpressure.
64+
std::optional<util::fb2::Future<bool>> TryStash(DbIndex dbid, std::string_view key,
65+
PrimeValue* value);
6566

6667
// Delete value, must be offloaded (external type)
6768
void Delete(DbIndex dbid, PrimeValue* value);
@@ -105,6 +106,8 @@ class TieredStorage {
105106

106107
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
107108

109+
absl::flat_hash_map<std::pair<DbIndex, std::string>, ::util::fb2::Future<bool>> backpressure_;
110+
108111
std::unique_ptr<ShardOpManager> op_manager_;
109112
std::unique_ptr<tiering::SmallBins> bins_;
110113

0 commit comments

Comments
 (0)