Skip to content

Commit 5fef132

Browse files
committed
Add flags for setting save format for SBF filters
Added a new flag `rdb_sbf_chunked` which determines the save format of SBFs. Also, separate functions for saving SBFs were added. Signed-off-by: Eric <[email protected]>
1 parent 4505644 commit 5fef132

File tree

6 files changed

+50
-22
lines changed

6 files changed

+50
-22
lines changed

src/server/rdb_extensions.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30;
1313
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
1414
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
1515
constexpr uint8_t RDB_TYPE_SBF = 33;
16+
constexpr uint8_t RDB_TYPE_SBF2 = 34;
1617

1718
constexpr bool rdbIsObjectTypeDF(uint8_t type) {
1819
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
1920
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
20-
(type == RDB_TYPE_SBF);
21+
(type == RDB_TYPE_SBF) || (type == RDB_TYPE_SBF2);
2122
}
2223

2324
// Opcodes: Range 200-240 is used by DF extensions.

src/server/rdb_load.cc

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ extern "C" {
5454
ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size);
5555
ABSL_DECLARE_FLAG(int32_t, list_compress_depth);
5656
ABSL_DECLARE_FLAG(uint32_t, dbnum);
57+
ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked);
5758
ABSL_FLAG(bool, rdb_load_dry_run, false, "Dry run RDB load without applying changes");
5859
ABSL_FLAG(bool, rdb_ignore_expiry, false, "Ignore Key Expiry when loding from RDB snapshot");
5960

@@ -188,7 +189,7 @@ string ModuleTypeName(uint64_t module_id) {
188189
bool RdbTypeAllowedEmpty(int type) {
189190
return type == RDB_TYPE_STRING || type == RDB_TYPE_JSON || type == RDB_TYPE_SBF ||
190191
type == RDB_TYPE_STREAM_LISTPACKS || type == RDB_TYPE_SET_WITH_EXPIRY ||
191-
type == RDB_TYPE_HASH_WITH_EXPIRY;
192+
type == RDB_TYPE_HASH_WITH_EXPIRY || type == RDB_TYPE_SBF2;
192193
}
193194

194195
DbSlice& GetCurrentDbSlice() {
@@ -1316,6 +1317,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
13161317
case RDB_TYPE_SBF:
13171318
iores = ReadSBF();
13181319
break;
1320+
case RDB_TYPE_SBF2:
1321+
iores = ReadSBF2();
1322+
break;
13191323
default:
13201324
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
13211325

@@ -1851,7 +1855,7 @@ auto RdbLoaderBase::ReadRedisJson() -> io::Result<OpaqueObj> {
18511855
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
18521856
}
18531857

1854-
auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
1858+
auto RdbLoaderBase::ReadSBFImpl(bool chunking) -> io::Result<OpaqueObj> {
18551859
RdbSBF res;
18561860
uint64_t options;
18571861
SET_OR_UNEXPECT(LoadLen(nullptr), options);
@@ -1875,20 +1879,24 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
18751879
string filter_data;
18761880
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
18771881

1878-
unsigned total_size = 0;
1879-
SET_OR_UNEXPECT(LoadLen(nullptr), total_size);
1882+
if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) {
1883+
unsigned total_size = 0;
1884+
SET_OR_UNEXPECT(LoadLen(nullptr), total_size);
18801885

1881-
filter_data.resize(total_size);
1882-
size_t offset = 0;
1883-
while (offset < total_size) {
1884-
unsigned chunk_size = 0;
1885-
SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size);
1886-
error_code ec = FetchBuf(chunk_size, filter_data.data() + offset);
1887-
if (ec) {
1888-
return make_unexpected(ec);
1889-
}
1886+
filter_data.resize(total_size);
1887+
size_t offset = 0;
1888+
while (offset < total_size) {
1889+
unsigned chunk_size = 0;
1890+
SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size);
1891+
error_code ec = FetchBuf(chunk_size, filter_data.data() + offset);
1892+
if (ec) {
1893+
return make_unexpected(ec);
1894+
}
18901895

1891-
offset += chunk_size;
1896+
offset += chunk_size;
1897+
}
1898+
} else {
1899+
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
18921900
}
18931901

18941902
size_t bit_len = filter_data.size() * 8;
@@ -1900,6 +1908,15 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
19001908
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
19011909
}
19021910

1911+
1912+
auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
1913+
return ReadSBFImpl(false);
1914+
}
1915+
1916+
auto RdbLoaderBase::ReadSBF2() -> io::Result<OpaqueObj> {
1917+
return ReadSBFImpl(true);
1918+
}
1919+
19031920
template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
19041921
auto ec = EnsureRead(sizeof(T));
19051922
if (ec)

src/server/rdb_load.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ class RdbLoaderBase {
169169
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
170170
::io::Result<OpaqueObj> ReadStreams(int rdbtype);
171171
::io::Result<OpaqueObj> ReadRedisJson();
172+
::io::Result<OpaqueObj> ReadSBFImpl(bool chunking);
172173
::io::Result<OpaqueObj> ReadSBF();
174+
::io::Result<OpaqueObj> ReadSBF2();
173175

174176
std::error_code SkipModuleData();
175177
std::error_code HandleCompressedBlob(int op_type);

src/server/rdb_save.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
4949
ABSL_RETIRED_FLAG(bool, stream_rdb_encode_v2, true,
5050
"Retired. Uses format, compatible with redis 7.2 and Dragonfly v1.26+");
5151

52+
// Flip this value to 'true' in March 2026.
53+
ABSL_FLAG(bool, rdb_sbf_chunked, false,
54+
"Enable new save format for saving SBFs in chunks.");
55+
5256
namespace dfly {
5357

5458
using namespace std;
@@ -623,13 +627,15 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
623627
RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i)));
624628

625629
string_view blob = sbf->data(i);
626-
size_t num_chunks = (blob.size() + kFilterChunkSize - 1) / kFilterChunkSize;
627-
RETURN_ON_ERR(SaveLen(blob.size()));
630+
if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) {
631+
RETURN_ON_ERR(SaveLen(blob.size()));
628632

629-
for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) {
630-
size_t offset = chunk_idx * kFilterChunkSize;
631-
size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset);
632-
RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len)));
633+
for (size_t offset = 0; offset< blob.size(); offset += kFilterChunkSize) {
634+
size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset);
635+
RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len)));
636+
}
637+
} else {
638+
RETURN_ON_ERR(SaveString(blob));
633639
}
634640

635641
FlushState flush_state = FlushState::kFlushMidEntry;

src/server/rdb_save.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ extern "C" {
2020
#include "server/journal/types.h"
2121
#include "server/table.h"
2222

23-
constexpr size_t kFilterChunkSize = 1ULL << 26;
2423

2524
typedef struct rax rax;
2625
typedef struct streamCG streamCG;
@@ -211,6 +210,7 @@ class SerializerBase {
211210
io::IoBuf mem_buf_;
212211
std::unique_ptr<detail::CompressorImpl> compressor_impl_;
213212

213+
static constexpr size_t kFilterChunkSize = 1ULL << 26;
214214
static constexpr size_t kMinStrSizeToCompress = 256;
215215
static constexpr size_t kMaxStrSizeToCompress = 1 * 1024 * 1024;
216216
static constexpr double kMinCompressionReductionPrecentage = 0.95;

src/server/rdb_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
3535
ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode);
3636
ABSL_DECLARE_FLAG(bool, rdb_ignore_expiry);
3737
ABSL_DECLARE_FLAG(uint32_t, num_shards);
38+
ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked);
3839

3940
namespace dfly {
4041

@@ -671,6 +672,7 @@ TEST_F(RdbTest, SBF) {
671672
}
672673

673674
TEST_F(RdbTest, SBFLargeFilterChunking) {
675+
absl::SetFlag(&FLAGS_rdb_sbf_chunked, true);
674676
max_memory_limit = 200000000;
675677

676678
// Using this set of parameters for the BF.RESERVE command resulted in a

0 commit comments

Comments
 (0)