Skip to content

Commit d2d6944

Browse files
XiztTejas Kulkarnivazois
authored
Resetting replication recovery when cluster reset is issued (#1319)
* Resetting replication recovery when cluster reset is issued * validating using clusterendpoint during reset * missed flag change * using reset cancellation token * Merge fix * ADded other recovery statuses during reset recovery. * using storeWrapper.serverOptions.ReplicaAttachTimeout for replica diskless sync * ran dotnet formatter * added reset during replication tests * ran dotnet format * removed if debug * if debug encompasses using statements * version bump --------- Co-authored-by: Tejas Kulkarni <[email protected]> Co-authored-by: Vasileios Zois <[email protected]>
1 parent bf2618a commit d2d6944

File tree

7 files changed

+214
-5
lines changed

7 files changed

+214
-5
lines changed

Version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<!-- VersionPrefix property for builds and packages -->
33
<PropertyGroup>
4-
<VersionPrefix>1.0.83</VersionPrefix>
4+
<VersionPrefix>1.0.84</VersionPrefix>
55
</PropertyGroup>
66
</Project>

libs/cluster/Server/ClusterManagerWorkerState.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
100100
try
101101
{
102102
SuspendConfigMerge();
103+
104+
// Reset recovery operations before proceeding with reset
105+
clusterProvider.replicationManager.ResetRecovery();
106+
103107
var resp = CmdStrings.RESP_OK;
104108
while (true)
105109
{
@@ -113,8 +117,9 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
113117
this.clusterConnectionStore.CloseAll();
114118

115119
var newNodeId = soft ? current.LocalNodeId : Generator.CreateHexId();
116-
var address = current.LocalNodeIp;
117-
var port = current.LocalNodePort;
120+
var endpoint = clusterProvider.storeWrapper.GetClusterEndpoint();
121+
var address = endpoint.Address.ToString();
122+
var port = endpoint.Port;
118123

119124
var configEpoch = soft ? current.LocalNodeConfigEpoch : 0;
120125
var expiry = DateTimeOffset.UtcNow.Ticks + TimeSpan.FromSeconds(expirySeconds).Ticks;

libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
using System;
55
using System.Net;
66
using System.Text;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using Garnet.client;
910
using Garnet.cluster.Server.Replication;
11+
using Garnet.common;
1012
using Microsoft.Extensions.Logging;
1113

1214
namespace Garnet.cluster
@@ -58,6 +60,7 @@ async Task<string> TryBeginReplicaSync(bool downgradeLock)
5860
var disklessSync = clusterProvider.serverOptions.ReplicaDisklessSync;
5961
var disableObjects = clusterProvider.serverOptions.DisableObjects;
6062
GarnetClientSession gcs = null;
63+
resetHandler ??= new CancellationTokenSource();
6164
try
6265
{
6366
if (!clusterProvider.serverOptions.EnableFastCommit)
@@ -119,7 +122,12 @@ async Task<string> TryBeginReplicaSync(bool downgradeLock)
119122
currentReplicationOffset: ReplicationOffset,
120123
checkpointEntry: checkpointEntry);
121124

122-
var resp = await gcs.ExecuteAttachSync(syncMetadata.ToByteArray()).ConfigureAwait(false);
125+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token, resetHandler.Token);
126+
127+
// Exception injection point for testing cluster reset during diskless replication
128+
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_InProgress_During_Diskless_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
129+
130+
var resp = await gcs.ExecuteAttachSync(syncMetadata.ToByteArray()).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
123131
}
124132
catch (Exception ex)
125133
{
@@ -144,6 +152,11 @@ async Task<string> TryBeginReplicaSync(bool downgradeLock)
144152
}
145153
gcs?.Dispose();
146154
recvCheckpointHandler?.Dispose();
155+
if (!resetHandler.TryReset())
156+
{
157+
resetHandler.Dispose();
158+
resetHandler = new CancellationTokenSource();
159+
}
147160
}
148161
return null;
149162
}

libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
using System.IO;
77
using System.Net;
88
using System.Text;
9+
using System.Threading;
910
using System.Threading.Tasks;
1011
using Garnet.client;
1112
using Garnet.cluster.Server.Replication;
13+
using Garnet.common;
1214
using Garnet.server;
1315
using Microsoft.Extensions.Logging;
1416
using Tsavorite.core;
@@ -72,6 +74,7 @@ async Task<string> ReplicaSyncAttachTask(bool downgradeLock)
7274
{
7375
Debug.Assert(IsRecovering);
7476
GarnetClientSession gcs = null;
77+
resetHandler ??= new CancellationTokenSource();
7578
try
7679
{
7780
// Immediately try to connect to a primary, so we FAIL
@@ -139,12 +142,16 @@ async Task<string> ReplicaSyncAttachTask(bool downgradeLock)
139142
// 4. Replica responds with aofStartAddress sync
140143
// 5. Primary will initiate aof sync task
141144
// 6. Primary releases checkpoint
145+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token, resetHandler.Token);
146+
147+
// Exception injection point for testing cluster reset during disk-based replication
148+
await ExceptionInjectionHelper.WaitOnSet(ExceptionInjectionType.Replication_InProgress_During_DiskBased_Replica_Attach_Sync).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
142149
var resp = await gcs.ExecuteReplicaSync(
143150
nodeId,
144151
PrimaryReplId,
145152
cEntry.ToByteArray(),
146153
storeWrapper.appendOnlyFile.BeginAddress,
147-
storeWrapper.appendOnlyFile.TailAddress).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, ctsRepManager.Token).ConfigureAwait(false);
154+
storeWrapper.appendOnlyFile.TailAddress).WaitAsync(storeWrapper.serverOptions.ReplicaAttachTimeout, linkedCts.Token).ConfigureAwait(false);
148155
}
149156
catch (Exception ex)
150157
{
@@ -167,6 +174,11 @@ async Task<string> ReplicaSyncAttachTask(bool downgradeLock)
167174
}
168175
recvCheckpointHandler?.Dispose();
169176
gcs?.Dispose();
177+
if (!resetHandler.TryReset())
178+
{
179+
resetHandler.Dispose();
180+
resetHandler = new CancellationTokenSource();
181+
}
170182
}
171183
return null;
172184
}

libs/cluster/Server/Replication/ReplicationManager.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ internal sealed partial class ReplicationManager : IDisposable
2525
readonly CheckpointStore checkpointStore;
2626
readonly ReplicationSyncManager replicationSyncManager;
2727
readonly CancellationTokenSource ctsRepManager = new();
28+
CancellationTokenSource resetHandler = new();
2829

2930
readonly int pageSizeBits;
3031

@@ -454,6 +455,20 @@ public void EndRecovery(RecoveryStatus nextRecoveryStatus, bool downgradeLock)
454455
}
455456
}
456457

458+
public void ResetRecovery()
459+
{
460+
switch (currentRecoveryStatus)
461+
{
462+
case RecoveryStatus.ClusterReplicate:
463+
case RecoveryStatus.ClusterFailover:
464+
case RecoveryStatus.ReplicaOfNoOne:
465+
case RecoveryStatus.CheckpointRecoveredAtReplica:
466+
case RecoveryStatus.InitializeRecover:
467+
resetHandler.Cancel();
468+
break;
469+
}
470+
}
471+
457472
public void Dispose()
458473
{
459474
_disposed = true;
@@ -470,6 +485,8 @@ public void Dispose()
470485
replicaReplayTaskCts.Dispose();
471486
ctsRepManager.Cancel();
472487
ctsRepManager.Dispose();
488+
resetHandler.Cancel();
489+
resetHandler.Dispose();
473490
aofTaskStore.Dispose();
474491
aofProcessor?.Dispose();
475492
networkPool?.Dispose();

libs/common/ExceptionInjectionType.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,13 @@ public enum ExceptionInjectionType
5353
/// Delay response on receive checkpoint to trigger timeout
5454
/// </summary>
5555
Replication_Timeout_On_Receive_Checkpoint,
56+
/// <summary>
57+
/// Replication InProgress during disk-based replica attach sync operation
58+
/// </summary>
59+
Replication_InProgress_During_DiskBased_Replica_Attach_Sync,
60+
/// <summary>
61+
/// Replication InProgress during diskless replica attach sync operation
62+
/// </summary>
63+
Replication_InProgress_During_Diskless_Replica_Attach_Sync,
5664
}
5765
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
#if DEBUG
5+
using System.Collections.Generic;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Garnet.common;
9+
using Microsoft.Extensions.Logging;
10+
using NUnit.Framework;
11+
using NUnit.Framework.Legacy;
12+
13+
namespace Garnet.test.cluster.ReplicationTests
14+
{
15+
/// <summary>
16+
/// These tests simulate scenarios where a replica gets stuck or is in replication attach and verify that
17+
/// CLUSTER RESET HARD can properly cancel ongoing operations and allow the replica to be reused.
18+
/// </summary>
19+
[NonParallelizable]
20+
public class ClusterResetDuringReplicationTests
21+
{
22+
ClusterTestContext context;
23+
24+
readonly int createInstanceTimeout = (int)System.TimeSpan.FromSeconds(30).TotalSeconds;
25+
const int testTimeout = 120_000;
26+
27+
readonly Dictionary<string, LogLevel> monitorTests = [];
28+
29+
[SetUp]
30+
public void Setup()
31+
{
32+
context = new ClusterTestContext();
33+
context.Setup(monitorTests, testTimeoutSeconds: testTimeout);
34+
}
35+
36+
[TearDown]
37+
public void TearDown()
38+
{
39+
context?.TearDown();
40+
}
41+
42+
/// <summary>
43+
/// Test CLUSTER RESET HARD functionality during diskless replication attach.
44+
/// This test simulates a scenario where a replica gets stuck while attaching to a primary
45+
/// and verifies that CLUSTER RESET HARD can properly cancel the operation and reset the node.
46+
/// </summary>
47+
[Test, Order(1), CancelAfter(testTimeout)]
48+
[Category("REPLICATION")]
49+
public async Task ClusterResetHardDuringDisklessReplicationAttach(CancellationToken cancellationToken)
50+
{
51+
var primaryIndex = 0;
52+
var replicaIndex = 1;
53+
var nodes_count = 2;
54+
55+
// Create instances with diskless sync enabled
56+
context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, enableDisklessSync: true, timeout: createInstanceTimeout);
57+
context.CreateConnection();
58+
59+
// Setup primary
60+
_ = context.clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], addslot: true, logger: context.logger);
61+
context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger);
62+
context.clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger: context.logger);
63+
context.clusterTestUtils.Meet(primaryIndex, replicaIndex, logger: context.logger);
64+
65+
// Ensure nodes know each other
66+
context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger);
67+
68+
try
69+
{
70+
ExceptionInjectionHelper.EnableException(ExceptionInjectionType.Replication_InProgress_During_Diskless_Replica_Attach_Sync);
71+
72+
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaIndex, primaryNodeIndex: primaryIndex, failEx: false, async: true, logger: context.logger);
73+
74+
await Task.Delay(1000, cancellationToken);
75+
76+
// Verify that the replica is in a replicating state
77+
var replicationInfo = context.clusterTestUtils.GetReplicationInfo(replicaIndex, [ReplicationInfoItem.RECOVER_STATUS], logger: context.logger);
78+
ClassicAssert.AreEqual("ClusterReplicate", replicationInfo[0].Item2);
79+
80+
// Issuing CLUSTER RESET HARD while replication is ongoing/stuck.
81+
var resetResp = context.clusterTestUtils.ClusterReset(replicaIndex, soft: false, expiry: 60, logger: context.logger);
82+
ClassicAssert.AreEqual("OK", resetResp);
83+
84+
// Verify that the node is no longer in recovery state
85+
replicationInfo = context.clusterTestUtils.GetReplicationInfo(replicaIndex, [ReplicationInfoItem.RECOVER_STATUS], logger: context.logger);
86+
ClassicAssert.AreEqual("NoRecovery", replicationInfo[0].Item2);
87+
88+
// Verify the node role is back to PRIMARY (default after reset)
89+
var role = context.clusterTestUtils.RoleCommand(replicaIndex, logger: context.logger);
90+
ClassicAssert.AreEqual("master", role.Value);
91+
}
92+
finally
93+
{
94+
ExceptionInjectionHelper.DisableException(ExceptionInjectionType.Replication_InProgress_During_Diskless_Replica_Attach_Sync);
95+
}
96+
}
97+
98+
/// <summary>
99+
/// Test CLUSTER RESET HARD functionality during diskbased replication attach.
100+
/// This test simulates a scenario where a replica gets stuck while attaching to a primary
101+
/// and verifies that CLUSTER RESET HARD can properly cancel the operation and reset the node.
102+
/// </summary>
103+
[Test, Order(2), CancelAfter(testTimeout)]
104+
[Category("REPLICATION")]
105+
public async Task ClusterResetHardDuringDiskBasedReplicationAttach(CancellationToken cancellationToken)
106+
{
107+
var primaryIndex = 0;
108+
var replicaIndex = 1;
109+
var nodes_count = 2;
110+
111+
// (diskless sync is false)
112+
context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, enableDisklessSync: false, timeout: createInstanceTimeout);
113+
context.CreateConnection();
114+
115+
// Setup primary
116+
_ = context.clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], addslot: true, logger: context.logger);
117+
context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger);
118+
context.clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger: context.logger);
119+
context.clusterTestUtils.Meet(primaryIndex, replicaIndex, logger: context.logger);
120+
121+
context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger);
122+
123+
try
124+
{
125+
ExceptionInjectionHelper.EnableException(ExceptionInjectionType.Replication_InProgress_During_DiskBased_Replica_Attach_Sync);
126+
127+
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaIndex, primaryNodeIndex: primaryIndex, failEx: false, async: true, logger: context.logger);
128+
129+
await Task.Delay(1000, cancellationToken);
130+
131+
// Verify that the replica is in a replicating state
132+
var replicationInfo = context.clusterTestUtils.GetReplicationInfo(replicaIndex, [ReplicationInfoItem.RECOVER_STATUS], logger: context.logger);
133+
ClassicAssert.AreEqual("ClusterReplicate", replicationInfo[0].Item2);
134+
135+
// Issueing CLUSTER RESET HARD while replication is ongoing/stuck.
136+
var resetResp = context.clusterTestUtils.ClusterReset(replicaIndex, soft: false, expiry: 60, logger: context.logger);
137+
ClassicAssert.AreEqual("OK", resetResp);
138+
139+
// Verify that the node is no longer in recovery state
140+
replicationInfo = context.clusterTestUtils.GetReplicationInfo(replicaIndex, [ReplicationInfoItem.RECOVER_STATUS], logger: context.logger);
141+
ClassicAssert.AreEqual("NoRecovery", replicationInfo[0].Item2);
142+
143+
// Verify the node role is back to PRIMARY (default after reset)
144+
var role = context.clusterTestUtils.RoleCommand(replicaIndex, logger: context.logger);
145+
ClassicAssert.AreEqual("master", role.Value);
146+
}
147+
finally
148+
{
149+
ExceptionInjectionHelper.DisableException(ExceptionInjectionType.Replication_InProgress_During_DiskBased_Replica_Attach_Sync);
150+
}
151+
}
152+
}
153+
}
154+
#endif

0 commit comments

Comments
 (0)