@@ -959,17 +959,27 @@ void PartialSort(absl::Span<SerializedSearchDoc*> docs, size_t limit, SortOrder
959959 partial_sort (docs.begin (), docs.begin () + min (limit, docs.size ()), docs.end (), cb);
960960}
961961
962- void SearchReply (const SearchParams& params,
962+ bool SearchReply (const SearchParams& params,
963963 std::optional<search::KnnScoreSortOption> knn_sort_option,
964964 absl::Span<SearchResult> results, SinkReplyBuilder* builder) {
965+ // Count total number of hits
965966 size_t total_hits = 0 ;
967+ for (auto & shard_results : results)
968+ total_hits += shard_results.total_hits ;
969+
970+ // Arrange documents in a stride
966971 absl::InlinedVector<SerializedSearchDoc*, 5 > docs;
967972 docs.reserve (results.size ());
968- for (auto & shard_results : results) {
969- total_hits += shard_results.total_hits ;
970- for (auto & doc : shard_results.docs ) {
971- docs.push_back (&doc);
973+ for (size_t i = 0 ;; ++i) {
974+ bool added = false ;
975+ for (auto & shard_results : results) {
976+ if (i < shard_results.docs .size ()) {
977+ added = true ;
978+ docs.push_back (&shard_results.docs [i]);
979+ }
972980 }
981+ if (!added)
982+ break ;
973983 }
974984
975985 // Reorder and cut KNN results before applying SORT and LIMIT
@@ -995,6 +1005,14 @@ void SearchReply(const SearchParams& params,
9951005 PartialSort (absl::MakeSpan (docs), end, params.sort_option ->order ,
9961006 &SerializedSearchDoc::sort_score);
9971007
1008+ // Check if we havent' chosen too few documents due to cutoffs
1009+ size_t left = docs.size () - params.limit_offset ;
1010+ size_t expected = std::min (limit, total_hits - params.limit_offset );
1011+ if (left < expected)
1012+ return false ;
1013+
1014+ // TODO: Check sort correctness
1015+
9981016 const bool reply_with_ids_only = params.IdsOnly ();
9991017 auto * rb = static_cast <RedisReplyBuilder*>(builder);
10001018 RedisReplyBuilder::ArrayScope scope{rb, reply_with_ids_only ? (limit + 1 ) : (limit * 2 + 1 )};
@@ -1011,6 +1029,7 @@ void SearchReply(const SearchParams& params,
10111029
10121030 SendSerializedDoc (*docs[i], builder);
10131031 }
1032+ return true ;
10141033}
10151034
10161035// Warms up the query parser to avoid first-call slowness
@@ -1279,23 +1298,37 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
12791298 atomic<bool > index_not_found{false };
12801299 vector<SearchResult> docs (shard_set->size ());
12811300
1282- cmd_cntx.tx ->ScheduleSingleHop ([&](Transaction* t, EngineShard* es) {
1283- if (auto * index = es->search_indices ()->GetIndex (index_name); index)
1284- docs[es->shard_id ()] = index->Search (t->GetOpArgs (es), *params, &search_algo);
1301+ bool succeeded = true ;
1302+ do {
1303+ if (succeeded)
1304+ params->limit_serialization =
1305+ params->limit_offset +
1306+ std::min (params->limit_total , 2 * params->limit_total / shard_set->size ());
12851307 else
1286- index_not_found.store (true , memory_order_relaxed);
1287- return OpStatus::OK;
1288- });
1308+ params->limit_serialization = params->limit_total + params->limit_offset ;
12891309
1290- if (index_not_found.load ())
1291- return builder->SendError (string{index_name} + " : no such index" );
1310+ cmd_cntx.tx ->ScheduleSingleHop ([&](Transaction* t, EngineShard* es) {
1311+ if (auto * index = es->search_indices ()->GetIndex (index_name); index)
1312+ docs[es->shard_id ()] = index->Search (t->GetOpArgs (es), *params, &search_algo);
1313+ else
1314+ index_not_found.store (true , memory_order_relaxed);
1315+ return OpStatus::OK;
1316+ });
12921317
1293- for (const auto & res : docs) {
1294- if (res.error )
1295- return builder->SendError (*res.error );
1296- }
1318+ if (index_not_found.load ())
1319+ return builder->SendError (string{index_name} + " : no such index" );
1320+
1321+ for (const auto & res : docs) {
1322+ if (res.error )
1323+ return builder->SendError (*res.error );
1324+ }
12971325
1298- SearchReply (*params, search_algo.GetKnnScoreSortOption (), absl::MakeSpan (docs), builder);
1326+ bool did_succeed =
1327+ SearchReply (*params, search_algo.GetKnnScoreSortOption (), absl::MakeSpan (docs), builder);
1328+ CHECK (did_succeed);
1329+ DCHECK (succeeded || did_succeed);
1330+ succeeded = did_succeed;
1331+ } while (!succeeded);
12991332}
13001333
13011334void SearchFamily::FtProfile (CmdArgList args, const CommandContext& cmd_cntx) {
@@ -1331,6 +1364,7 @@ void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) {
13311364 std::vector<SearchResult> search_results (shards_count);
13321365 std::vector<absl::Duration> profile_results (shards_count);
13331366
1367+ params->limit_serialization = params->limit_offset + params->limit_total ;
13341368 cmd_cntx.tx ->ScheduleSingleHop ([&](Transaction* t, EngineShard* es) {
13351369 auto * index = es->search_indices ()->GetIndex (index_name);
13361370 if (!index) {
0 commit comments