Skip to content

Commit 4f1283f

Browse files
committed
fixes
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent e93c53e commit 4f1283f

File tree

6 files changed

+62
-30
lines changed

6 files changed

+62
-30
lines changed

src/server/string_family.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -936,8 +936,9 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
936936
EngineShard* shard = op_args_.shard;
937937

938938
// Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded.
939+
// If we are beyound the offloading threshold, TryStash might return a backpressure future.
939940
if (auto* ts = shard->tiered_storage(); ts) {
940-
auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv);
941+
auto bp = ts->TryStash(op_args_.db_cntx.db_index, key, pv, true);
941942
if (bp && params.backpressure)
942943
*params.backpressure = std::move(*bp);
943944
}
@@ -1068,8 +1069,10 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
10681069
return builder->SendError(kWrongTypeErr);
10691070
}
10701071

1071-
if (backpressure)
1072-
std::move(backpressure)->GetFor(100ms);
1072+
// If backpressure was provided, wait with reasonable limit (to avoid client deadlocking).
1073+
if (backpressure) {
1074+
std::move(backpressure)->GetFor(10ms);
1075+
}
10731076

10741077
if (sparams.flags & SetCmd::SET_GET) {
10751078
return GetReplies{cmnd_cntx.rb}.Send(std::move(prev));

src/server/tiered_storage.cc

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
172172
}
173173

174174
void FlagBackpressure(OpManager::KeyRef id, bool result) {
175-
const auto& [dbid, key] = id;
176-
if (auto it = ts_->backpressure_.find({dbid, string{key}}); it != ts_->backpressure_.end()) {
177-
it->second.Resolve(result);
178-
ts_->backpressure_.erase(it);
179-
}
175+
if (auto node = ts_->stash_backpressure_.extract(id); !node.empty())
176+
node.mapped().Resolve(result);
180177
}
181178

182179
struct {
@@ -319,7 +316,7 @@ error_code TieredStorage::Open(string_view base_path) {
319316
}
320317

321318
void TieredStorage::Close() {
322-
for (auto& [_, f] : backpressure_)
319+
for (auto& [_, f] : stash_backpressure_)
323320
f.Resolve(false);
324321
op_manager_->Close();
325322
}
@@ -363,7 +360,7 @@ template TieredStorage::TResult<size_t> TieredStorage::Modify(
363360
std::function<size_t(std::string*)> modf);
364361

365362
std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, string_view key,
366-
PrimeValue* value) {
363+
PrimeValue* value, bool provide_bp) {
367364
if (!ShouldStash(*value))
368365
return {};
369366

@@ -398,12 +395,9 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
398395
return {};
399396
}
400397

401-
// Throttle stashes over the offload boundary
402-
if (ShouldOffload()) {
403-
util::fb2::Future<bool> fut;
404-
backpressure_[{dbid, string{key}}] = fut;
405-
return fut;
406-
}
398+
// If we are in the active offloading phase, throttle stashes by providing backpressure future
399+
if (provide_bp && ShouldOffload())
400+
return stash_backpressure_[{dbid, string{key}}];
407401

408402
return {};
409403
}
@@ -429,7 +423,7 @@ void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue*
429423
DCHECK(value->HasStashPending());
430424

431425
// If any previous write was happening, it has been cancelled
432-
if (auto node = backpressure_.extract({dbid, string{key}}); !node.empty())
426+
if (auto node = stash_backpressure_.extract(make_pair(dbid, key)); !node.empty())
433427
std::move(node.mapped()).Resolve(false);
434428

435429
if (OccupiesWholePages(value->Size())) {

src/server/tiered_storage.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "server/common.h"
1212
#include "server/table.h"
1313
#include "server/tiering/common.h"
14+
#include "server/tiering/entry_map.h"
1415
#include "server/tx_base.h"
1516
#include "util/fibers/future.h"
1617

@@ -59,10 +60,10 @@ class TieredStorage {
5960
TResult<T> Modify(DbIndex dbid, std::string_view key, const PrimeValue& value,
6061
std::function<T(std::string*)> modf);
6162

62-
// Stash value. Sets IO_PENDING flag and unsets it on error or when finished
63-
// Returns opional backpressure.
63+
// Stash value. Sets IO_PENDING flag and unsets it on error or when finished.
64+
// Returns optional future for backpressure if `provide_bp` is set and conditons are met.
6465
std::optional<util::fb2::Future<bool>> TryStash(DbIndex dbid, std::string_view key,
65-
PrimeValue* value);
66+
PrimeValue* value, bool provide_bp = false);
6667

6768
// Delete value, must be offloaded (external type)
6869
void Delete(DbIndex dbid, PrimeValue* value);
@@ -106,7 +107,8 @@ class TieredStorage {
106107

107108
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
108109

109-
absl::flat_hash_map<std::pair<DbIndex, std::string>, ::util::fb2::Future<bool>> backpressure_;
110+
// Stash operations waiting for completion to throttle
111+
tiering::EntryMap<::util::fb2::Future<bool>> stash_backpressure_;
110112

111113
std::unique_ptr<ShardOpManager> op_manager_;
112114
std::unique_ptr<tiering::SmallBins> bins_;

src/server/tiering/entry_map.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <absl/container/flat_hash_map.h>
8+
9+
#include <string>
10+
11+
#include "server/tx_base.h"
12+
13+
namespace dfly::tiering {
14+
15+
namespace detail {
16+
struct Hasher {
17+
using is_transparent = void;
18+
template <typename S> size_t operator()(const std::pair<DbIndex, S>& p) const {
19+
return absl::Hash(p)();
20+
}
21+
};
22+
23+
struct Eq {
24+
using is_transparent = void;
25+
template <typename S1, typename S2>
26+
bool operator()(const std::pair<DbIndex, S1>& l, const std::pair<DbIndex, S2>& r) const {
27+
return l == r;
28+
}
29+
};
30+
} // namespace detail
31+
32+
// Map of key (db index, string key) -> T with heterogeneous lookup
33+
template <typename T>
34+
using EntryMap =
35+
absl::flat_hash_map<std::pair<DbIndex, std::string>, T, detail::Hasher, detail::Eq>;
36+
37+
} // namespace dfly::tiering

src/server/tiering/small_bins.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,7 @@ std::vector<std::pair<DbIndex, std::string>> SmallBins::ReportStashAborted(BinId
122122
}
123123

124124
std::optional<SmallBins::BinId> SmallBins::Delete(DbIndex dbid, std::string_view key) {
125-
std::pair<DbIndex, std::string> key_pair{dbid, key};
126-
auto it = current_bin_.find(key_pair);
127-
128-
if (it != current_bin_.end()) {
125+
if (auto it = current_bin_.find(make_pair(dbid, key)); it != current_bin_.end()) {
129126
size_t stashed_size = StashedValueSize(it->second);
130127
DCHECK_GE(current_bin_bytes_, stashed_size);
131128

@@ -135,7 +132,7 @@ std::optional<SmallBins::BinId> SmallBins::Delete(DbIndex dbid, std::string_view
135132
}
136133

137134
for (auto& [id, keys] : pending_bins_) {
138-
if (keys.erase(key_pair))
135+
if (keys.erase(make_pair(dbid, key)))
139136
return keys.empty() ? std::make_optional(id) : std::nullopt;
140137
}
141138
return std::nullopt;

src/server/tiering/small_bins.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <vector>
1212

1313
#include "server/tiering/disk_storage.h"
14+
#include "server/tiering/entry_map.h"
1415

1516
namespace dfly::tiering {
1617

@@ -84,12 +85,10 @@ class SmallBins {
8485
BinId last_bin_id_ = 0;
8586

8687
unsigned current_bin_bytes_ = 0;
87-
absl::flat_hash_map<std::pair<DbIndex, std::string>, std::string> current_bin_;
88+
tiering::EntryMap<std::string> current_bin_;
8889

8990
// Pending stashes, their keys and value sizes
90-
absl::flat_hash_map<unsigned /* id */,
91-
absl::flat_hash_map<std::pair<DbIndex, std::string> /* key*/, DiskSegment>>
92-
pending_bins_;
91+
absl::flat_hash_map<unsigned /* id */, tiering::EntryMap<DiskSegment>> pending_bins_;
9392

9493
// Map of bins that were stashed and should be deleted when number of entries reaches 0
9594
absl::flat_hash_map<size_t /*offset*/, StashInfo> stashed_bins_;

0 commit comments

Comments
 (0)