@@ -649,6 +649,119 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
649649 rb->SendOk ();
650650}
651651
652+ namespace {
653+
654+ struct Pipe final : io::Source, io::Sink {
655+ io::Result<unsigned long > ReadSome (const iovec* v, uint32_t len) override {
656+ if (done) {
657+ return 0 ;
658+ }
659+
660+ ec.await ([&] { return rpos < wpos; });
661+ auto bytes_read = 0 ;
662+
663+ while (rpos < wpos && len > 0 ) {
664+ const auto chunk_size = min (wpos - rpos, v->iov_len );
665+ std::copy_n (buffer.begin () + rpos, chunk_size, static_cast <char *>(v->iov_base ));
666+ bytes_read += chunk_size;
667+ rpos += chunk_size;
668+ ++v;
669+ --len;
670+ }
671+
672+ if (rpos == wpos && wpos == cap) {
673+ rpos = 0 ;
674+ wpos = 0 ;
675+ ec.notifyAll ();
676+ }
677+
678+ return bytes_read;
679+ }
680+
681+ io::Result<unsigned long > WriteSome (const iovec* v, uint32_t len) override {
682+ CHECK (!done);
683+ ec.await ([&] { return wpos < cap; });
684+ int bytes_written = 0 ;
685+
686+ while (wpos < cap && len > 0 ) {
687+ const auto chunk_size = std::min (cap - wpos, v->iov_len );
688+ auto p = static_cast <const char *>(v->iov_base );
689+ std::copy_n (p, chunk_size, buffer.begin () + wpos);
690+ bytes_written += chunk_size;
691+ wpos += chunk_size;
692+ ++v;
693+ --len;
694+ }
695+
696+ std::string debugging{reinterpret_cast <const char *>(buffer.data () + 5 ), wpos};
697+ VLOG (1 ) << " debugging: " << debugging;
698+ ec.notifyAll ();
699+ return bytes_written;
700+ }
701+
702+ std::array<uint8_t , 1024 > buffer;
703+ size_t rpos{0 };
704+ size_t wpos{0 };
705+ size_t cap{1024 };
706+ std::atomic_bool done{false };
707+ fb2::EventCount ec;
708+ };
709+
710+ } // namespace
711+
712+ void DflyCmd::StartValkeySync () {
713+ auto Write = [this ](auto v) {
714+ const auto buf = io::Bytes (reinterpret_cast <const unsigned char *>(v.data ()), v.size ());
715+ CHECK (!_valkey_replica->conn ->socket ()->Write (buf));
716+ };
717+
718+ CHECK (_valkey_replica.has_value ()) << " There is no valkey replica to sync with" ;
719+
720+ // Since we do not know the size of rdb up front, use the EOF protocol, send
721+ // "$EOF:<40-random-chars>\n" first, then the same 40 chars at the end
722+ std::string eof_mark (40 , ' X' );
723+ std::string eof_mark_with_prefix = absl::StrCat (" $EOF:" , eof_mark, " \n " );
724+
725+ Write (eof_mark_with_prefix);
726+
727+ for (unsigned i = 0 ; i < shard_set->size (); ++i) {
728+ Pipe p;
729+ auto cb = [&] {
730+ std::array<uint8_t , 128 > backing;
731+ const io::MutableBytes mb{backing};
732+ while (!p.done ) {
733+ if (auto n = p.Read (mb); !n.has_value () || n.value () == 0 ) {
734+ break ;
735+ }
736+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
737+ }
738+
739+ if (auto n = p.Read (mb); n.has_value () && n.value ()) {
740+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
741+ }
742+ };
743+ auto drain_fb = fb2::Fiber (" replica-drain-fb" , cb);
744+
745+ shard_set->Await (i, [&p, this , i] {
746+ auto shard = EngineShard::tlocal ();
747+ auto mode = i == 0 ? SaveMode::SINGLE_SHARD_WITH_SUMMARY : SaveMode::SINGLE_SHARD;
748+ RdbSaver saver{&p, mode, false , " " };
749+ if (mode == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
750+ CHECK (!saver.SaveHeader (saver.GetGlobalData (&sf_->service ())));
751+ }
752+
753+ saver.StartSnapshotInShard (false , &_valkey_replica->exec_st , shard);
754+ CHECK (!saver.WaitSnapshotInShard (shard));
755+ p.done = true ;
756+ VLOG (1 ) << " finished writing snapshot for shard " << shard->shard_id ();
757+ });
758+
759+ drain_fb.JoinIfNeeded ();
760+ }
761+
762+ Write (eof_mark);
763+ }
764+
652765OpStatus DflyCmd::StartFullSyncInThread (FlowInfo* flow, ExecutionState* exec_st,
653766 EngineShard* shard) {
654767 DCHECK (shard);
@@ -730,6 +843,11 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
730843 };
731844}
732845
846+ void DflyCmd::CreateValkeySyncSession (facade::Connection* conn) {
847+ fb2::LockGuard lk (mu_);
848+ _valkey_replica.emplace (conn, [](const GenericError&) {});
849+ }
850+
733851auto DflyCmd::CreateSyncSession (ConnectionState* state) -> std::pair<uint32_t, unsigned> {
734852 util::fb2::LockGuard lk (mu_);
735853 unsigned sync_id = next_sync_id_++;
0 commit comments