@@ -649,6 +649,183 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
649649 rb->SendOk ();
650650}
651651
652+ namespace {
653+
654+ struct ShardJournalChannel : journal::JournalConsumerInterface, io::Source {
655+ void ConsumeJournalChange (const journal::JournalChangeItem& item) override {
656+ // TODO don't block journal slice
657+ ec.await ([&] { return wpos < cap; });
658+ buffer[wpos++] = item.journal_item .data ;
659+ ec.notifyAll ();
660+ parent.notifyAll ();
661+ }
662+
663+ io::Result<unsigned long > ReadSome (const iovec* v, uint32_t len) override {
664+ ec.await ([&] { return rpos < wpos; });
665+
666+ auto bytes_read = 0 ;
667+ size_t chunk_size = 0 ;
668+ bool partial_read = false ;
669+ while (rpos < wpos && len > 0 ) {
670+ auto & item = buffer[rpos];
671+ chunk_size = std::min (v->iov_len , item.size ());
672+ std::copy_n (item.data (), chunk_size, static_cast <char *>(v->iov_base ));
673+
674+ bytes_read += chunk_size;
675+ ++v;
676+ --len;
677+
678+ // If we were not able to fully consume the string, erase the front of it but remain at same
679+ // index
680+ partial_read = chunk_size < item.size ();
681+ if (!partial_read) {
682+ ++rpos;
683+ } else {
684+ item.erase (0 , chunk_size);
685+ }
686+ }
687+
688+ if (rpos == wpos && wpos == cap) {
689+ auto new_size = 0 ;
690+ if (partial_read) {
691+ buffer[0 ] = std::move (buffer[rpos]);
692+ new_size = 1 ;
693+ }
694+
695+ rpos = 0 ;
696+ wpos = new_size;
697+ buffer.resize (new_size);
698+ }
699+
700+ ec.notifyAll ();
701+ return bytes_read;
702+ }
703+
704+ bool HasData () const {
705+ return rpos > 0 ;
706+ }
707+
708+ fb2::EventCount ec;
709+ size_t rpos{0 };
710+ size_t wpos{0 };
711+ size_t cap{5 };
712+ std::vector<std::string> buffer;
713+ fb2::EventCount& parent;
714+ };
715+
716+ struct Pipe final : io::Source, io::Sink {
717+ io::Result<unsigned long > ReadSome (const iovec* v, uint32_t len) override {
718+ if (done) {
719+ return 0 ;
720+ }
721+
722+ ec.await ([&] { return rpos < wpos; });
723+ auto bytes_read = 0 ;
724+
725+ while (rpos < wpos && len > 0 ) {
726+ const auto chunk_size = min (wpos - rpos, v->iov_len );
727+ std::copy_n (buffer.begin () + rpos, chunk_size, static_cast <char *>(v->iov_base ));
728+ bytes_read += chunk_size;
729+ rpos += chunk_size;
730+ ++v;
731+ --len;
732+ }
733+
734+ if (rpos == wpos && wpos == cap) {
735+ rpos = 0 ;
736+ wpos = 0 ;
737+ ec.notifyAll ();
738+ }
739+
740+ return bytes_read;
741+ }
742+
743+ io::Result<unsigned long > WriteSome (const iovec* v, uint32_t len) override {
744+ CHECK (!done);
745+ ec.await ([&] { return wpos < cap; });
746+ int bytes_written = 0 ;
747+
748+ while (wpos < cap && len > 0 ) {
749+ const auto chunk_size = std::min (cap - wpos, v->iov_len );
750+ auto p = static_cast <const char *>(v->iov_base );
751+ std::copy_n (p, chunk_size, buffer.begin () + wpos);
752+ bytes_written += chunk_size;
753+ wpos += chunk_size;
754+ ++v;
755+ --len;
756+ }
757+
758+ std::string debugging{reinterpret_cast <const char *>(buffer.data () + 5 ), wpos};
759+ VLOG (1 ) << " debugging: " << debugging;
760+ ec.notifyAll ();
761+ return bytes_written;
762+ }
763+
764+ std::array<uint8_t , 1024 > buffer;
765+ size_t rpos{0 };
766+ size_t wpos{0 };
767+ size_t cap{1024 };
768+ std::atomic_bool done{false };
769+ fb2::EventCount ec;
770+ };
771+
772+ } // namespace
773+
774+ void DflyCmd::StartValkeySync () {
775+ auto Write = [this ](auto v) {
776+ const auto buf = io::Bytes (reinterpret_cast <const unsigned char *>(v.data ()), v.size ());
777+ CHECK (!_valkey_replica->conn ->socket ()->Write (buf));
778+ };
779+
780+ CHECK (_valkey_replica.has_value ()) << " There is no valkey replica to sync with" ;
781+
782+ // Since we do not know the size of rdb up front, use the EOF protocol, send
783+ // "$EOF:<40-random-chars>\n" first, then the same 40 chars at the end
784+ std::string eof_mark (40 , ' X' );
785+ std::string eof_mark_with_prefix = absl::StrCat (" $EOF:" , eof_mark, " \n " );
786+
787+ Write (eof_mark_with_prefix);
788+
789+ for (unsigned i = 0 ; i < shard_set->size (); ++i) {
790+ Pipe p;
791+ auto cb = [&] {
792+ std::array<uint8_t , 128 > backing;
793+ const io::MutableBytes mb{backing};
794+ while (!p.done ) {
795+ if (auto n = p.Read (mb); !n.has_value () || n.value () == 0 ) {
796+ break ;
797+ }
798+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
799+ }
800+
801+ if (auto n = p.Read (mb); n.has_value () && n.value ()) {
802+ CHECK (!_valkey_replica->conn ->socket ()->Write (mb));
803+ }
804+ };
805+ auto drain_fb = fb2::Fiber (" replica-drain-fb" , cb);
806+
807+ shard_set->Await (i, [&p, this , i] {
808+ auto shard = EngineShard::tlocal ();
809+ auto mode = i == 0 ? SaveMode::SINGLE_SHARD_WITH_SUMMARY : SaveMode::SINGLE_SHARD;
810+ RdbSaver saver{&p, mode, false , " " };
811+ if (mode == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
812+ CHECK (!saver.SaveHeader (saver.GetGlobalData (&sf_->service ())));
813+ }
814+
815+ saver.StartSnapshotInShard (false , &_valkey_replica->exec_st , shard);
816+ CHECK (!saver.WaitSnapshotInShard (shard));
817+ p.done = true ;
818+ VLOG (1 ) << " finished writing snapshot for shard " << shard->shard_id ();
819+ });
820+
821+ drain_fb.JoinIfNeeded ();
822+ }
823+
824+ Write (eof_mark);
825+
826+ // Stable sync
827+ }
828+
652829OpStatus DflyCmd::StartFullSyncInThread (FlowInfo* flow, ExecutionState* exec_st,
653830 EngineShard* shard) {
654831 DCHECK (shard);
@@ -730,6 +907,11 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
730907 };
731908}
732909
910+ void DflyCmd::CreateValkeySyncSession (facade::Connection* conn) {
911+ fb2::LockGuard lk (mu_);
912+ _valkey_replica.emplace (conn, [](const GenericError&) {});
913+ }
914+
733915auto DflyCmd::CreateSyncSession (ConnectionState* state) -> std::pair<uint32_t, unsigned> {
734916 util::fb2::LockGuard lk (mu_);
735917 unsigned sync_id = next_sync_id_++;
0 commit comments