Skip to content
48 changes: 41 additions & 7 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2482,6 +2482,8 @@ error_code RdbLoader::HandleAux() {
/* Just ignored. */
} else if (auxkey == "search-index") {
LoadSearchIndexDefFromAux(std::move(auxval));
} else if (auxkey == "search-synonyms") {
LoadSearchSynonymsFromAux(std::move(auxval));
} else if (auxkey == "shard-count") {
uint32_t shard_count;
if (absl::SimpleAtoi(auxval, &shard_count)) {
Expand Down Expand Up @@ -2802,7 +2804,10 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
}
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
namespace {

void LoadSearchCommandFromAux(Service* service, string&& def, string_view command_name,
string_view error_context) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr};
cntx.is_replicating = true;
Expand All @@ -2821,24 +2826,47 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
auto res = parser.Parse(buffer, &consumed, &resp_vec);

if (res != facade::RedisParser::Result::OK) {
LOG(ERROR) << "Bad index definition: " << def;
LOG(ERROR) << "Bad " << error_context << ": " << def;
return;
}

// Prepend FT.CREATE to index definiton
CmdArgVec arg_vec;
facade::RespExpr::VecToArgList(resp_vec, &arg_vec);
string ft_create = "FT.CREATE";
arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()});

service_->DispatchCommand(absl::MakeSpan(arg_vec), &crb, &cntx);
// Prepend command name (FT.CREATE or FT.SYNUPDATE)
string cmd_str{command_name};
arg_vec.insert(arg_vec.begin(), MutableSlice{cmd_str.data(), cmd_str.size()});

service->DispatchCommand(absl::MakeSpan(arg_vec), &crb, &cntx);

auto response = crb.Take();
if (auto err = facade::CapturingReplyBuilder::TryExtractError(response); err) {
LOG(ERROR) << "Bad index definition: " << def << " " << err->first;
LOG(ERROR) << "Bad " << error_context << ": " << def << " " << err->first;
}
}

} // namespace

// Static storage for synonym commands collected from all RdbLoader instances
std::vector<std::string> RdbLoader::pending_synonym_cmds_;

std::vector<std::string> RdbLoader::TakePendingSynonymCommands() {
std::vector<std::string> result;
result.swap(pending_synonym_cmds_);
return result;
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
// FT.CREATE command - execute immediately during RDB load
LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition");
}

void RdbLoader::LoadSearchSynonymsFromAux(string&& def) {
// FT.SYNUPDATE command - defer execution until after RebuildAllIndices
// Add to shared static vector (may be called from multiple RdbLoader instances)
pending_synonym_cmds_.push_back(std::move(def));
}

void RdbLoader::PerformPostLoad(Service* service) {
const CommandId* cmd = service->FindCmd("FT.CREATE");
if (cmd == nullptr) // On MacOS we don't include search so FT.CREATE won't exist.
Expand All @@ -2849,6 +2877,12 @@ void RdbLoader::PerformPostLoad(Service* service) {
es->search_indices()->RebuildAllIndices(
OpArgs{es, nullptr, DbContext{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()}});
});

// Now execute all pending synonym commands after indices are rebuilt
std::vector<std::string> synonym_cmds = TakePendingSynonymCommands();
for (auto& syn_cmd : synonym_cmds) {
LoadSearchCommandFromAux(service, std::move(syn_cmd), "FT.SYNUPDATE", "synonym definition");
}
}

} // namespace dfly
7 changes: 7 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,14 @@ class RdbLoader : protected RdbLoaderBase {
// issues an FT.CREATE call, but does not start indexing
void LoadSearchIndexDefFromAux(std::string&& value);

// Load synonyms from RESP string and issue FT.SYNUPDATE call
void LoadSearchSynonymsFromAux(std::string&& value);

// Get pending synonym commands collected from all RdbLoader instances
static std::vector<std::string> TakePendingSynonymCommands();

Service* service_;
static std::vector<std::string> pending_synonym_cmds_;
std::string snapshot_id_;
bool override_existing_keys_ = false;
bool load_unowned_slots_ = false;
Expand Down
26 changes: 23 additions & 3 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ error_code RdbSaver::Impl::FlushSerializer() {
}

RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
StringVec script_bodies, search_indices;
StringVec script_bodies, search_indices, search_synonyms;

{
auto scripts = service->script_mgr()->GetAll();
Expand All @@ -1367,9 +1367,23 @@ RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
if (shard->shard_id() == 0) {
auto* indices = shard->search_indices();
for (const auto& index_name : indices->GetIndexNames()) {
auto index_info = indices->GetIndex(index_name)->GetInfo();
auto* index = indices->GetIndex(index_name);
auto index_info = index->GetInfo();

// Save index definition
search_indices.emplace_back(
absl::StrCat(index_name, " ", index_info.BuildRestoreCommand()));

// Save synonym groups to separate vector
const auto& synonym_groups = index->GetSynonyms().GetGroups();
for (const auto& [group_id, terms] : synonym_groups) {
if (!terms.empty()) {
// Format: "index_name group_id term1 term2 term3"
std::string syn_cmd =
absl::StrCat(index_name, " ", group_id, " ", absl::StrJoin(terms, " "));
search_synonyms.emplace_back(std::move(syn_cmd));
}
}
}
}
#endif
Expand All @@ -1386,7 +1400,7 @@ RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
});

return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices),
table_mem.load(memory_order_relaxed)};
std::move(search_synonyms), table_mem.load(memory_order_relaxed)};
}

void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {
Expand Down Expand Up @@ -1526,6 +1540,12 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_indices.empty());
for (const string& s : glob_state.search_indices)
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s));

// Save synonyms in separate aux fields
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_synonyms.empty());
for (const string& s : glob_state.search_synonyms)
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-synonyms", s));

if (save_mode_ == SaveMode::SINGLE_SHARD_WITH_SUMMARY || save_mode_ == SaveMode::SUMMARY) {
// We save the shard id in the summary file, so that we can restore it later.
RETURN_ON_ERR(SaveAuxFieldStrInt("shard-count", shard_set->size()));
Expand Down
7 changes: 4 additions & 3 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ class RdbSaver {
public:
// Global data which doesn't belong to shards and is serialized in header
struct GlobalData {
const StringVec lua_scripts; // bodies of lua scripts
const StringVec search_indices; // ft.create commands to re-create search indices
size_t table_used_memory = 0; // total memory used by all tables in all shards
const StringVec lua_scripts; // bodies of lua scripts
const StringVec search_indices; // ft.create commands to re-create search indices
const StringVec search_synonyms; // ft.synupdate commands to restore synonyms
size_t table_used_memory = 0; // total memory used by all tables in all shards
};

// single_shard - true means that we run RdbSaver on a single shard and we do not use
Expand Down
26 changes: 26 additions & 0 deletions tests/dragonfly/search_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,3 +586,29 @@ def make_car(producer, description, speed):

for index in client.execute_command("FT._LIST"):
client.ft(index.decode()).dropindex()


@dfly_args({"proactor_threads": 4, "dbfilename": "synonym-persistence"})
async def test_synonym_persistence(df_server):
"""Test that synonyms are persisted across server restarts"""
client = aioredis.Redis(port=df_server.port)

# Create index and add documents
idx = client.ft("idx")
await idx.create_index([TextField("txt")], definition=IndexDefinition(prefix=["d:"]))
await client.hset("d:1", mapping={"txt": "car"})
await client.hset("d:2", mapping={"txt": "automobile"})

# Add synonyms and verify they work
await client.execute_command("FT.SYNUPDATE", "idx", "grp", "car", "automobile")
assert (await idx.search(Query("car"))).total == 2

# Restart server
df_server.stop()
df_server.start()
client = aioredis.Redis(port=df_server.port)
await wait_available_async(client)
idx = client.ft("idx")

# Verify synonyms still work after restart
assert (await idx.search(Query("car"))).total == 2
Loading