@@ -649,6 +649,228 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
649649 rb->SendOk ();
650650}
651651
652+ namespace {
653+
654+ struct ShardJournalChannel : journal::JournalConsumerInterface {
655+ explicit ShardJournalChannel (fb2::EventCount& e, journal::Journal* journal)
656+ : ec{e}, journal_{journal} {
657+ CHECK (journal);
658+ journal_cb_id = journal_->RegisterOnChange (this );
659+ }
660+
661+ void Stop () {
662+ journal_->UnregisterOnChange (journal_cb_id);
663+ }
664+
665+ void ConsumeJournalChange (const journal::JournalChangeItem& item) override {
666+ if (rpos == wpos) {
667+ rpos = 0 ;
668+ wpos = 0 ;
669+ buffer = {};
670+ }
671+
672+ buffer.emplace_back (item.journal_item .data );
673+ wpos++;
674+
675+ ec.notifyAll ();
676+ }
677+
678+ void ThrottleIfNeeded () override {
679+ }
680+
681+ std::vector<std::string> Read () {
682+ CHECK_LT (rpos, wpos) << " Invalid read attempt" ;
683+
684+ auto i = rpos;
685+ std::vector<std::string> result;
686+ while (i < wpos) {
687+ result.emplace_back (std::move (buffer[i++]));
688+ }
689+ rpos = i;
690+ return result;
691+ }
692+
693+ bool HasData () const {
694+ return rpos < wpos;
695+ }
696+
697+ fb2::EventCount& ec;
698+ size_t rpos{0 };
699+ size_t wpos{0 };
700+ std::vector<std::string> buffer;
701+ uint32_t journal_cb_id;
702+ journal::Journal* journal_;
703+ };
704+
705+ struct Pipe final : io::Source, io::Sink {
706+ io::Result<unsigned long > ReadSome (const iovec* v, uint32_t len) override {
707+ if (done) {
708+ return 0 ;
709+ }
710+
711+ ec.await ([&] { return rpos < wpos; });
712+ auto bytes_read = 0 ;
713+
714+ while (rpos < wpos && len > 0 ) {
715+ const auto chunk_size = min (wpos - rpos, v->iov_len );
716+ std::copy_n (buffer.begin () + rpos, chunk_size, static_cast <char *>(v->iov_base ));
717+ bytes_read += chunk_size;
718+ rpos += chunk_size;
719+ ++v;
720+ --len;
721+ }
722+
723+ if (rpos == wpos && wpos == cap) {
724+ rpos = 0 ;
725+ wpos = 0 ;
726+ ec.notifyAll ();
727+ }
728+
729+ return bytes_read;
730+ }
731+
732+ io::Result<unsigned long > WriteSome (const iovec* v, uint32_t len) override {
733+ CHECK (!done);
734+ ec.await ([&] { return wpos < cap; });
735+ int bytes_written = 0 ;
736+
737+ while (wpos < cap && len > 0 ) {
738+ const auto chunk_size = std::min (cap - wpos, v->iov_len );
739+ auto p = static_cast <const char *>(v->iov_base );
740+ std::copy_n (p, chunk_size, buffer.begin () + wpos);
741+ bytes_written += chunk_size;
742+ wpos += chunk_size;
743+ ++v;
744+ --len;
745+ }
746+
747+ ec.notifyAll ();
748+ return bytes_written;
749+ }
750+
751+ std::array<uint8_t , 1024 > buffer;
752+ size_t rpos{0 };
753+ size_t wpos{0 };
754+ size_t cap{1024 };
755+ std::atomic_bool done{false };
756+ fb2::EventCount ec;
757+ };
758+
759+ } // namespace
760+
761+ void DflyCmd::StartValkeySync () {
762+ auto Write = [this ](auto v) {
763+ const auto buf = io::Bytes (reinterpret_cast <const unsigned char *>(v.data ()), v.size ());
764+ CHECK (!_valkey_replica->conn ->socket ()->Write (buf));
765+ };
766+
767+ CHECK (_valkey_replica.has_value ()) << " There is no valkey replica to sync with" ;
768+
769+ // Since we do not know the size of rdb up front, use the EOF protocol, send
770+ // "$EOF:<40-random-chars>\n" first, then the same 40 chars at the end
771+ std::string eof_mark (40 , ' X' );
772+ std::string eof_mark_with_prefix = absl::StrCat (" $EOF:" , eof_mark, " \n " );
773+
774+ Write (eof_mark_with_prefix);
775+
776+ for (unsigned i = 0 ; i < shard_set->size (); ++i) {
777+ Pipe p;
778+ auto cb = [&] {
779+ std::array<uint8_t , 128 > backing;
780+ const io::MutableBytes mb{backing};
781+ while (!p.done ) {
782+ if (auto n = p.Read (mb); !n.has_value () || n.value () == 0 ) {
783+ break ;
784+ }
785+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
786+ }
787+
788+ if (auto n = p.Read (mb); n.has_value () && n.value ()) {
789+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
790+ }
791+ };
792+ auto drain_fb = fb2::Fiber (" replica-drain-fb" , cb);
793+
794+ shard_set->Await (i, [&p, this , i] {
795+ auto shard = EngineShard::tlocal ();
796+ auto mode = i == 0 ? SaveMode::SINGLE_SHARD_WITH_SUMMARY : SaveMode::SINGLE_SHARD;
797+ RdbSaver saver{&p, mode, false , " " };
798+ if (mode == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
799+ CHECK (!saver.SaveHeader (saver.GetGlobalData (&sf_->service ())));
800+ }
801+
802+ saver.StartSnapshotInShard (false , &_valkey_replica->exec_st , shard);
803+ CHECK (!saver.WaitSnapshotInShard (shard));
804+ p.done = true ;
805+ VLOG (1 ) << " finished writing snapshot for shard " << shard->shard_id ();
806+ });
807+
808+ drain_fb.JoinIfNeeded ();
809+ }
810+
811+ Write (eof_mark);
812+
813+ // Stable sync
814+ VLOG (1 ) << " Entering stable sync.." ;
815+
816+ std::vector<std::unique_ptr<ShardJournalChannel>> channels (shard_set->size ());
817+ fb2::EventCount ec;
818+ JournalReader reader{nullptr , 0 };
819+
820+ auto cb = [&channels, &ec, this ](EngineShard* shard) {
821+ auto & channel = channels[shard->shard_id ()];
822+ sf_->journal ()->StartInThread ();
823+ channel.reset (new ShardJournalChannel (ec, sf_->journal ()));
824+ VLOG (1 ) << " Set channel for shard " << shard->shard_id ();
825+ };
826+ shard_set->RunBlockingInParallel (cb);
827+
828+ RedisReplyBuilder rb{_valkey_replica->conn ->socket ()};
829+ DbIndex current_dbid = std::numeric_limits<DbIndex>::max ();
830+
831+ while (true ) {
832+ ec.await ([&channels] {
833+ return std::any_of (channels.begin (), channels.end (),
834+ [](const auto & channel) { return channel->HasData (); });
835+ });
836+ for (const auto & channel : channels) {
837+ if (channel->HasData ()) {
838+ auto data = channel->Read ();
839+ auto total_size =
840+ std::accumulate (data.begin (), data.end (), 0 ,
841+ [](auto currsum, const auto & str) { return currsum + str.size (); });
842+ auto span = io::Bytes (reinterpret_cast <uint8_t *>(data.begin ()->data ()), total_size);
843+ auto src = io::BytesSource{span};
844+ reader.SetSource (&src);
845+ while (true ) {
846+ auto entry = reader.ReadEntry ();
847+ if (!entry.has_value ()) {
848+ // We read all the commands in the buffer
849+ CHECK_EQ (entry.error ().value (), EIO);
850+ break ;
851+ }
852+
853+ auto & parsed = entry.value ();
854+ if (parsed.dbid != current_dbid) {
855+ VLOG (1 ) << " Database changed from " << current_dbid << " to " << parsed.dbid ;
856+ std::string parsed_dbid = std::to_string (parsed.dbid );
857+ std::vector<std::string_view> select_cmd = {" SELECT" , parsed_dbid};
858+
859+ VLOG (1 ) << " sending command: " << select_cmd;
860+ rb.SendBulkStrArr (select_cmd);
861+ current_dbid = parsed.dbid ;
862+ }
863+
864+ VLOG (1 ) << " sending command: " << parsed.ToString () << " of size " << parsed.cmd .cmd_len ;
865+
866+ // valkey expects commands propagated as bulk array
867+ rb.SendBulkStrArr (parsed.cmd .cmd_args );
868+ }
869+ }
870+ }
871+ }
872+ }
873+
652874OpStatus DflyCmd::StartFullSyncInThread (FlowInfo* flow, ExecutionState* exec_st,
653875 EngineShard* shard) {
654876 DCHECK (shard);
@@ -730,6 +952,12 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
730952 };
731953}
732954
955+ void DflyCmd::CreateValkeySyncSession (facade::Connection* conn) {
956+ CHECK (!_valkey_replica.has_value ());
957+ fb2::LockGuard lk (mu_);
958+ _valkey_replica.emplace (conn, [](const GenericError&) {});
959+ }
960+
733961auto DflyCmd::CreateSyncSession (ConnectionState* state) -> std::pair<uint32_t, unsigned> {
734962 util::fb2::LockGuard lk (mu_);
735963 unsigned sync_id = next_sync_id_++;
0 commit comments