@@ -649,6 +649,242 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
649649 rb->SendOk ();
650650}
651651
652+ namespace {
653+
654+ struct ShardJournalChannel : journal::JournalConsumerInterface {
655+ explicit ShardJournalChannel (fb2::EventCount& e, journal::Journal* journal)
656+ : ec{e}, journal_{journal} {
657+ CHECK (journal);
658+ journal_cb_id = journal_->RegisterOnChange (this );
659+ }
660+
661+ void Stop () {
662+ journal_->UnregisterOnChange (journal_cb_id);
663+ }
664+
665+ void ConsumeJournalChange (const journal::JournalChangeItem& item) override {
666+ if (rpos == wpos) {
667+ rpos = 0 ;
668+ wpos = 0 ;
669+ buffer = {};
670+ }
671+
672+ buffer.emplace_back (item.journal_item .data );
673+ wpos++;
674+
675+ ec.notifyAll ();
676+ }
677+
678+ void ThrottleIfNeeded () override {
679+ }
680+
681+ std::vector<std::string> Read () {
682+ CHECK_LT (rpos, wpos) << " Invalid read attempt" ;
683+
684+ auto i = rpos;
685+ std::vector<std::string> result;
686+ while (i < wpos) {
687+ result.emplace_back (std::move (buffer[i++]));
688+ }
689+ rpos = i;
690+ return result;
691+ }
692+
693+ bool HasData () const {
694+ return rpos < wpos;
695+ }
696+
697+ fb2::EventCount& ec;
698+ size_t rpos{0 };
699+ size_t wpos{0 };
700+ std::vector<std::string> buffer;
701+ uint32_t journal_cb_id;
702+ journal::Journal* journal_;
703+ };
704+
705+ struct Pipe final : io::Source, io::Sink {
706+ io::Result<unsigned long > ReadSome (const iovec* v, uint32_t len) override {
707+ if (done) {
708+ return 0 ;
709+ }
710+
711+ ec.await ([&] { return rpos < wpos || done; });
712+ if (done && rpos == wpos) {
713+ return 0 ;
714+ }
715+
716+ auto bytes_read = 0 ;
717+
718+ while (rpos < wpos && len > 0 ) {
719+ const auto chunk_size = min (wpos - rpos, v->iov_len );
720+ std::copy_n (buffer.begin () + rpos, chunk_size, static_cast <char *>(v->iov_base ));
721+ bytes_read += chunk_size;
722+ rpos += chunk_size;
723+ ++v;
724+ --len;
725+ }
726+
727+ if (rpos == wpos && wpos == cap) {
728+ rpos = 0 ;
729+ wpos = 0 ;
730+ ec.notifyAll ();
731+ }
732+
733+ return bytes_read;
734+ }
735+
736+ io::Result<unsigned long > WriteSome (const iovec* v, uint32_t len) override {
737+ CHECK (!done);
738+ ec.await ([&] { return wpos < cap || done; });
739+ if (done && wpos == cap) {
740+ return 0 ;
741+ }
742+
743+ int bytes_written = 0 ;
744+
745+ while (wpos < cap && len > 0 ) {
746+ const auto chunk_size = std::min (cap - wpos, v->iov_len );
747+ auto p = static_cast <const char *>(v->iov_base );
748+ std::copy_n (p, chunk_size, buffer.begin () + wpos);
749+ bytes_written += chunk_size;
750+ wpos += chunk_size;
751+ ++v;
752+ --len;
753+ }
754+
755+ ec.notifyAll ();
756+ return bytes_written;
757+ }
758+
759+ void Stop () {
760+ done = true ;
761+ ec.notifyAll ();
762+ }
763+
764+ std::array<uint8_t , 1024 > buffer;
765+ size_t rpos{0 };
766+ size_t wpos{0 };
767+ size_t cap{1024 };
768+ std::atomic_bool done{false };
769+ fb2::EventCount ec;
770+ };
771+
772+ } // namespace
773+
774+ void DflyCmd::StartValkeySync () {
775+ auto Write = [this ](auto v) {
776+ const auto buf = io::Bytes (reinterpret_cast <const unsigned char *>(v.data ()), v.size ());
777+ CHECK (!_valkey_replica->conn ->socket ()->Write (buf));
778+ };
779+
780+ CHECK (_valkey_replica.has_value ()) << " There is no valkey replica to sync with" ;
781+
782+ // Since we do not know the size of rdb up front, use the EOF protocol, send
783+ // "$EOF:<40-random-chars>\n" first, then the same 40 chars at the end
784+ std::string eof_mark (40 , ' X' );
785+ std::string eof_mark_with_prefix = absl::StrCat (" $EOF:" , eof_mark, " \n " );
786+
787+ Write (eof_mark_with_prefix);
788+
789+ for (unsigned i = 0 ; i < shard_set->size (); ++i) {
790+ Pipe p;
791+ auto cb = [&] {
792+ std::array<uint8_t , 128 > backing;
793+ const io::MutableBytes mb{backing};
794+ while (!p.done ) {
795+ auto n = p.Read (mb);
796+ if (!n.has_value () || n.value () == 0 ) {
797+ break ;
798+ }
799+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb.subspan (0 , n.value ())));
800+ }
801+
802+ if (auto n = p.Read (mb); n.has_value () && n.value ()) {
803+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb.subspan (0 , n.value ())));
804+ }
805+ };
806+ auto drain_fb = fb2::Fiber (" replica-drain-fb" , cb);
807+
808+ shard_set->Await (i, [&p, this , i] {
809+ auto shard = EngineShard::tlocal ();
810+ RdbSaver saver{&p, SaveMode::SINGLE_SHARD, false , " " };
811+ if (i == 0 ) {
812+ CHECK (!saver.SaveHeader (saver.GetGlobalData (&sf_->service ())));
813+ }
814+
815+ saver.StartSnapshotInShard (false , &_valkey_replica->exec_st , shard);
816+ bool skip_epilog = i < shard_set->size () - 1 ;
817+ CHECK (!saver.WaitSnapshotInShard (shard, skip_epilog));
818+ p.Stop ();
819+ VLOG (1 ) << " finished writing snapshot for shard " << shard->shard_id ();
820+ });
821+
822+ drain_fb.JoinIfNeeded ();
823+ }
824+
825+ Write (eof_mark);
826+
827+ // Stable sync
828+ VLOG (1 ) << " Entering stable sync.." ;
829+
830+ std::vector<std::unique_ptr<ShardJournalChannel>> channels (shard_set->size ());
831+ fb2::EventCount ec;
832+ JournalReader reader{nullptr , 0 };
833+
834+ auto cb = [&channels, &ec, this ](EngineShard* shard) {
835+ auto & channel = channels[shard->shard_id ()];
836+ sf_->journal ()->StartInThread ();
837+ channel.reset (new ShardJournalChannel (ec, sf_->journal ()));
838+ VLOG (1 ) << " Set channel for shard " << shard->shard_id ();
839+ };
840+ shard_set->RunBlockingInParallel (cb);
841+
842+ RedisReplyBuilder rb{_valkey_replica->conn ->socket ()};
843+ DbIndex current_dbid = std::numeric_limits<DbIndex>::max ();
844+
845+ while (true ) {
846+ ec.await ([&channels] {
847+ return std::any_of (channels.begin (), channels.end (),
848+ [](const auto & channel) { return channel->HasData (); });
849+ });
850+ for (const auto & channel : channels) {
851+ if (channel->HasData ()) {
852+ auto data = channel->Read ();
853+ auto total_size =
854+ std::accumulate (data.begin (), data.end (), 0 ,
855+ [](auto currsum, const auto & str) { return currsum + str.size (); });
856+ auto span = io::Bytes (reinterpret_cast <uint8_t *>(data.begin ()->data ()), total_size);
857+ auto src = io::BytesSource{span};
858+ reader.SetSource (&src);
859+ while (true ) {
860+ auto entry = reader.ReadEntry ();
861+ if (!entry.has_value ()) {
862+ // We read all the commands in the buffer
863+ CHECK_EQ (entry.error ().value (), EIO);
864+ break ;
865+ }
866+
867+ auto & parsed = entry.value ();
868+ if (parsed.dbid != current_dbid) {
869+ VLOG (1 ) << " Database changed from " << current_dbid << " to " << parsed.dbid ;
870+ std::string parsed_dbid = std::to_string (parsed.dbid );
871+ std::vector<std::string_view> select_cmd = {" SELECT" , parsed_dbid};
872+
873+ VLOG (1 ) << " sending command: " << select_cmd;
874+ rb.SendBulkStrArr (select_cmd);
875+ current_dbid = parsed.dbid ;
876+ }
877+
878+ VLOG (1 ) << " sending command: " << parsed.ToString () << " of size " << parsed.cmd .cmd_len ;
879+
880+ // valkey expects commands propagated as bulk array
881+ rb.SendBulkStrArr (parsed.cmd .cmd_args );
882+ }
883+ }
884+ }
885+ }
886+ }
887+
652888OpStatus DflyCmd::StartFullSyncInThread (FlowInfo* flow, ExecutionState* exec_st,
653889 EngineShard* shard) {
654890 DCHECK (shard);
@@ -730,6 +966,12 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
730966 };
731967}
732968
969+ void DflyCmd::CreateValkeySyncSession (facade::Connection* conn) {
970+ CHECK (!_valkey_replica.has_value ());
971+ fb2::LockGuard lk (mu_);
972+ _valkey_replica.emplace (conn, [](const GenericError&) {});
973+ }
974+
733975auto DflyCmd::CreateSyncSession (ConnectionState* state) -> std::pair<uint32_t, unsigned> {
734976 util::fb2::LockGuard lk (mu_);
735977 unsigned sync_id = next_sync_id_++;
0 commit comments