@@ -154,6 +154,7 @@ ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
154154ABSL_DECLARE_FLAG (string, tls_ca_cert_dir);
155155ABSL_DECLARE_FLAG (int , replica_priority);
156156ABSL_DECLARE_FLAG (double , rss_oom_deny_ratio);
157+ ABSL_DECLARE_FLAG (bool , experimental_replicaof_v2);
157158
158159bool AbslParseFlag (std::string_view in, ReplicaOfFlag* flag, std::string* err) {
159160#define RETURN_ON_ERROR (cond, m ) \
@@ -3588,6 +3589,11 @@ void ServerFamily::StopAllClusterReplicas() {
35883589}
35893590
35903591void ServerFamily::ReplicaOf (CmdArgList args, const CommandContext& cmd_cntx) {
3592+ const bool use_replica_of_v2 = absl::GetFlag (FLAGS_experimental_replicaof_v2);
3593+ if (use_replica_of_v2) {
3594+ ReplicaOfInternalV2 (args, cmd_cntx.tx , cmd_cntx.rb , ActionOnConnectionFail::kReturnOnError );
3595+ return ;
3596+ }
35913597 ReplicaOfInternal (args, cmd_cntx.tx , cmd_cntx.rb , ActionOnConnectionFail::kReturnOnError );
35923598}
35933599
@@ -3601,9 +3607,109 @@ void ServerFamily::Replicate(string_view host, string_view port) {
36013607 CmdArgList args_list = absl::MakeSpan (args_vec);
36023608 io::NullSink sink;
36033609 facade::RedisReplyBuilder rb (&sink);
3610+ const bool use_replica_of_v2 = absl::GetFlag (FLAGS_experimental_replicaof_v2);
3611+ if (use_replica_of_v2) {
3612+ ReplicaOfInternalV2 (args_list, nullptr , &rb, ActionOnConnectionFail::kContinueReplication );
3613+ return ;
3614+ }
36043615 ReplicaOfInternal (args_list, nullptr , &rb, ActionOnConnectionFail::kContinueReplication );
36053616}
36063617
3618+ void ServerFamily::ReplicaOfNoOne (SinkReplyBuilder* builder) {
3619+ util::fb2::LockGuard lk (replicaof_mu_);
3620+ ServerState* ss = ServerState::tlocal ();
3621+
3622+ if (!ss->is_master ) {
3623+ CHECK (replica_);
3624+
3625+ // flip flag before clearing replica_
3626+ SetMasterFlagOnAllThreads (true );
3627+
3628+ last_master_data_ = replica_->Stop ();
3629+ replica_.reset ();
3630+ StopAllClusterReplicas ();
3631+ }
3632+
3633+ // May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3634+ service_.SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3635+
3636+ return builder->SendOk ();
3637+ }
3638+
3639+ void ServerFamily::ReplicaOfInternalV2 (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3640+ ActionOnConnectionFail on_error)
3641+ ABSL_LOCKS_EXCLUDED(replicaof_mu_) {
3642+ auto replicaof_args = ReplicaOfArgs::FromCmdArgs (args, builder);
3643+ if (!replicaof_args.has_value ()) {
3644+ return ;
3645+ }
3646+
3647+ LOG (INFO) << " Initiate replication with: " << *replicaof_args;
3648+ // This is a "weak" check. For example, if the node is already a replica,
3649+ // it could be the case that one of the flows disconnects. The MainReplicationFiber
3650+ // will then loop and if it can't partial sync it will enter LOADING state because of
3651+ // full sync. Note that the fiber is not aware of the replicaof_mu_ so even
3652+ // if that mutex is locked below before any state check we can't really enforce
3653+ // that the old replication fiber won't try to full sync and update the state to LOADING.
3654+ // What is more here is that we always call `replica->Stop()`. So even if we end up in the
3655+ // scenario described, the semantics are well defined. First, cancel the old replica and
3656+ // move on with the new one. Cancelation will be slower and ReplicaOf() will
3657+ // induce higher latency -- but that's ok because it's an highly improbable flow with
3658+ // well defined semantics.
3659+ ServerState* ss = ServerState::tlocal ();
3660+
3661+ if (ss->is_master && ss->gstate () == GlobalState::LOADING) {
3662+ builder->SendError (kLoadingErr );
3663+ return ;
3664+ }
3665+
3666+ // replicaof no one
3667+ if (replicaof_args->IsReplicaOfNoOne ()) {
3668+ return ReplicaOfNoOne (builder);
3669+ }
3670+
3671+ auto new_replica = make_shared<Replica>(replicaof_args->host , replicaof_args->port , &service_,
3672+ master_replid (), replicaof_args->slot_range );
3673+ GenericError ec;
3674+ switch (on_error) {
3675+ case ActionOnConnectionFail::kReturnOnError :
3676+ ec = new_replica->Start ();
3677+ break ;
3678+ case ActionOnConnectionFail::kContinueReplication :
3679+ new_replica->EnableReplication ();
3680+ break ;
3681+ };
3682+
3683+ if (ec || new_replica->IsContextCancelled ()) {
3684+ return builder->SendError (ec ? ec.Format () : " replication cancelled" );
3685+ }
3686+
3687+ // Critical section.
3688+ // 1. Stop the old replica_ if it exists
3689+ // 2. Update all the pointers to the new replica and update master flag
3690+ // 3. Start the main replication fiber
3691+ // 4. Send OK
3692+ util::fb2::LockGuard lk (replicaof_mu_);
3693+ std::optional<Replica::LastMasterSyncData> last_master_data;
3694+ if (replica_)
3695+ last_master_data = replica_->Stop ();
3696+
3697+ StopAllClusterReplicas ();
3698+
3699+ if (ServerState::tlocal ()->gstate () == GlobalState::TAKEN_OVER)
3700+ service_.SwitchState (GlobalState::TAKEN_OVER, GlobalState::LOADING);
3701+
3702+ // TODO Update thread locals. That way INFO never blocks
3703+ replica_ = new_replica;
3704+ SetMasterFlagOnAllThreads (false );
3705+
3706+ if (on_error == ActionOnConnectionFail::kReturnOnError ) {
3707+ replica_->StartMainReplicationFiber (last_master_data);
3708+ }
3709+
3710+ builder->SendOk ();
3711+ }
3712+
36073713// REPLTAKEOVER <seconds> [SAVE]
36083714// SAVE is used only by tests.
36093715void ServerFamily::ReplTakeOver (CmdArgList args, const CommandContext& cmd_cntx) {
0 commit comments