Skip to content

Commit 9ea26f0

Browse files
authored
Some fixes to database checkpointing (#1370)
* wip * wip * wip * fix * Fixed comments
1 parent 3cabaae commit 9ea26f0

File tree

7 files changed

+174
-59
lines changed

7 files changed

+174
-59
lines changed

libs/server/Databases/IDatabaseManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public interface IDatabaseManager : IDisposable
9797
public void RecoverCheckpoint(bool replicaRecover = false, bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, CheckpointMetadata metadata = null);
9898

9999
/// <summary>
100-
/// Take checkpoint of all active databases
100+
/// Take checkpoint of all active databases if checkpointing is not in progress
101101
/// </summary>
102102
/// <param name="background">True if method can return before checkpoint is taken</param>
103103
/// <param name="logger">Logger</param>
@@ -106,7 +106,7 @@ public interface IDatabaseManager : IDisposable
106106
public bool TakeCheckpoint(bool background, ILogger logger = null, CancellationToken token = default);
107107

108108
/// <summary>
109-
/// Take checkpoint of specified database ID
109+
/// Take checkpoint of specified database ID if checkpointing is not in progress
110110
/// </summary>
111111
/// <param name="background">True if method can return before checkpoint is taken</param>
112112
/// <param name="dbId">ID of database to checkpoint</param>

libs/server/Databases/MultiDatabaseManager.cs

Lines changed: 68 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ internal class MultiDatabaseManager : DatabaseManagerBase
3636
// The swap-db operation should take a write lock and any operation that should be swap-db-safe should take a read lock.
3737
SingleWriterMultiReaderLock databasesContentLock;
3838

39+
// Lock for synchronizing checkpointing of all active DBs (if more than one)
40+
SingleWriterMultiReaderLock multiDbCheckpointingLock;
41+
3942
// Reusable task array for tracking checkpointing of multiple DBs
4043
// Used by recurring checkpointing task if multiple DBs exist
4144
Task[] checkpointTasks;
@@ -152,20 +155,40 @@ public override bool TakeCheckpoint(bool background, ILogger logger = null, Canc
152155
var lockAcquired = TryGetDatabasesContentReadLock(token);
153156
if (!lockAcquired) return false;
154157

155-
try
158+
var checkpointTask = Task.Run(async () =>
156159
{
157-
var activeDbIdsMapSize = activeDbIds.ActualSize;
158-
var activeDbIdsMapSnapshot = activeDbIds.Map;
159-
Array.Copy(activeDbIdsMapSnapshot, dbIdsToCheckpoint, activeDbIdsMapSize);
160+
var checkpointLockTaken = false;
160161

161-
TakeDatabasesCheckpointAsync(activeDbIdsMapSize, logger: logger, token: token).GetAwaiter().GetResult();
162-
}
163-
finally
164-
{
165-
databasesContentLock.ReadUnlock();
166-
}
162+
try
163+
{
164+
var activeDbIdsMapSize = activeDbIds.ActualSize;
167165

168-
return true;
166+
if (activeDbIdsMapSize > 1)
167+
{
168+
if (!multiDbCheckpointingLock.TryWriteLock())
169+
return false;
170+
171+
checkpointLockTaken = true;
172+
}
173+
174+
var activeDbIdsMapSnapshot = activeDbIds.Map;
175+
Array.Copy(activeDbIdsMapSnapshot, dbIdsToCheckpoint, activeDbIdsMapSize);
176+
177+
return await TakeDatabasesCheckpointAsync(activeDbIdsMapSize, logger: logger, token: token);
178+
}
179+
finally
180+
{
181+
if (checkpointLockTaken)
182+
multiDbCheckpointingLock.WriteUnlock();
183+
184+
databasesContentLock.ReadUnlock();
185+
}
186+
}, token).GetAwaiter();
187+
188+
if (background)
189+
return true;
190+
191+
return checkpointTask.GetResult();
169192
}
170193

171194
/// <inheritdoc/>
@@ -175,7 +198,8 @@ public override bool TakeCheckpoint(bool background, int dbId, ILogger logger =
175198
var databasesMapSnapshot = databases.Map;
176199
Debug.Assert(dbId < databasesMapSize && databasesMapSnapshot[dbId] != null);
177200

178-
if (!TryPauseCheckpointsContinuousAsync(dbId, token).GetAwaiter().GetResult())
201+
// Check if checkpoint already in progress
202+
if (!TryPauseCheckpoints(dbId))
179203
return false;
180204

181205
var checkpointTask = TakeCheckpointAsync(databasesMapSnapshot[dbId], logger: logger, token: token).ContinueWith(
@@ -212,12 +236,12 @@ public override async Task TakeOnDemandCheckpointAsync(DateTimeOffset entryTime,
212236

213237
var db = databasesMapSnapshot[dbId];
214238

215-
// Take lock to ensure no other task will be taking a checkpoint
239+
// Check if checkpoint already in progress
216240
var checkpointsPaused = TryPauseCheckpoints(dbId);
217241

218242
try
219243
{
220-
// If an external task has taken a checkpoint beyond the provided entryTime return
244+
// If another checkpoint is in progress or a checkpoint was taken beyond the provided entryTime - return
221245
if (!checkpointsPaused || db.LastSaveTime > entryTime)
222246
return;
223247

@@ -241,10 +265,21 @@ public override async Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLi
241265
var lockAcquired = TryGetDatabasesContentReadLock(token);
242266
if (!lockAcquired) return;
243267

268+
var activeDbIdsMapSize = activeDbIds.ActualSize;
269+
270+
var checkpointLockTaken = false;
271+
244272
try
245273
{
274+
if (activeDbIdsMapSize > 1)
275+
{
276+
if (!multiDbCheckpointingLock.TryWriteLock())
277+
return;
278+
279+
checkpointLockTaken = true;
280+
}
281+
246282
var databasesMapSnapshot = databases.Map;
247-
var activeDbIdsMapSize = activeDbIds.ActualSize;
248283
var activeDbIdsMapSnapshot = activeDbIds.Map;
249284

250285
var dbIdsIdx = 0;
@@ -270,6 +305,9 @@ public override async Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLi
270305
}
271306
finally
272307
{
308+
if (checkpointLockTaken)
309+
multiDbCheckpointingLock.WriteUnlock();
310+
273311
databasesContentLock.ReadUnlock();
274312
}
275313
}
@@ -948,7 +986,7 @@ private void CopyDatabases(IDatabaseManager src, bool enableAof)
948986
/// <param name="logger">Logger</param>
949987
/// <param name="token">Cancellation token</param>
950988
/// <returns>False if checkpointing already in progress</returns>
951-
private async Task TakeDatabasesCheckpointAsync(int dbIdsCount, ILogger logger = null,
989+
private async Task<bool> TakeDatabasesCheckpointAsync(int dbIdsCount, ILogger logger = null,
952990
CancellationToken token = default)
953991
{
954992
Debug.Assert(checkpointTasks != null);
@@ -958,40 +996,32 @@ private async Task TakeDatabasesCheckpointAsync(int dbIdsCount, ILogger logger =
958996
checkpointTasks[i] = Task.CompletedTask;
959997

960998
var lockAcquired = TryGetDatabasesContentReadLock(token);
961-
if (!lockAcquired) return;
999+
if (!lockAcquired) return false;
9621000

9631001
try
9641002
{
9651003
var databaseMapSnapshot = databases.Map;
9661004

967-
var currIdx = 0;
968-
while (currIdx < dbIdsCount)
1005+
for (var currIdx = 0; currIdx < dbIdsCount; currIdx++)
9691006
{
9701007
var dbId = dbIdsToCheckpoint[currIdx];
9711008

972-
// Prevent parallel checkpoint
973-
if (!await TryPauseCheckpointsContinuousAsync(dbId, token))
1009+
// If a checkpoint is already in progress for this database, skip it
1010+
if (!TryPauseCheckpoints(dbId))
9741011
continue;
9751012

9761013
checkpointTasks[currIdx] = TakeCheckpointAsync(databaseMapSnapshot[dbId], logger: logger, token: token).ContinueWith(
9771014
t =>
9781015
{
979-
try
980-
{
981-
if (t.IsCompletedSuccessfully)
982-
{
983-
var storeTailAddress = t.Result.Item1;
984-
var objectStoreTailAddress = t.Result.Item2;
985-
UpdateLastSaveData(dbId, storeTailAddress, objectStoreTailAddress);
986-
}
987-
}
988-
finally
989-
{
990-
ResumeCheckpoints(dbId);
991-
}
992-
}, TaskContinuationOptions.ExecuteSynchronously);
1016+
ResumeCheckpoints(dbId);
9931017

994-
currIdx++;
1018+
if (!t.IsCompletedSuccessfully)
1019+
return;
1020+
1021+
var storeTailAddress = t.Result.Item1;
1022+
var objectStoreTailAddress = t.Result.Item2;
1023+
UpdateLastSaveData(dbId, storeTailAddress, objectStoreTailAddress);
1024+
}, TaskContinuationOptions.ExecuteSynchronously);
9951025
}
9961026

9971027
await Task.WhenAll(checkpointTasks);
@@ -1004,6 +1034,8 @@ private async Task TakeDatabasesCheckpointAsync(int dbIdsCount, ILogger logger =
10041034
{
10051035
databasesContentLock.ReadUnlock();
10061036
}
1037+
1038+
return true;
10071039
}
10081040

10091041
private void UpdateLastSaveData(int dbId, long? storeTailAddress, long? objectStoreTailAddress)
@@ -1022,24 +1054,6 @@ private void UpdateLastSaveData(int dbId, long? storeTailAddress, long? objectSt
10221054
}
10231055
}
10241056

1025-
private async Task<bool> TryPauseCheckpointsContinuousAsync(int dbId, CancellationToken token = default)
1026-
{
1027-
var databasesMapSize = databases.ActualSize;
1028-
var databasesMapSnapshot = databases.Map;
1029-
Debug.Assert(dbId < databasesMapSize && databasesMapSnapshot[dbId] != null);
1030-
1031-
var db = databasesMapSnapshot[dbId];
1032-
var checkpointsPaused = TryPauseCheckpoints(db);
1033-
1034-
while (!checkpointsPaused && !token.IsCancellationRequested && !Disposed)
1035-
{
1036-
await Task.Yield();
1037-
checkpointsPaused = TryPauseCheckpoints(db);
1038-
}
1039-
1040-
return checkpointsPaused;
1041-
}
1042-
10431057
public override void Dispose()
10441058
{
10451059
if (Disposed) return;

libs/server/Databases/SingleDatabaseManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public override void ResumeCheckpoints(int dbId)
127127
/// <inheritdoc/>
128128
public override bool TakeCheckpoint(bool background, ILogger logger = null, CancellationToken token = default)
129129
{
130-
if (!TryPauseCheckpointsContinuousAsync(defaultDatabase.Id, token: token).GetAwaiter().GetResult())
130+
// Check if checkpoint already in progress
131+
if (!TryPauseCheckpoints(defaultDatabase.Id))
131132
return false;
132133

133134
var checkpointTask = TakeCheckpointAsync(defaultDatabase, logger: logger, token: token).ContinueWith(

libs/server/Resp/AdminCommands.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,7 @@ private bool NetworkSAVE()
880880

881881
if (!storeWrapper.TakeCheckpoint(false, dbId: dbId, logger: logger))
882882
{
883-
while (!RespWriteUtils.TryWriteError("ERR checkpoint already in progress"u8, ref dcurr, dend))
883+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_CHECKPOINT_ALREADY_IN_PROGRESS, ref dcurr, dend))
884884
SendAndReset();
885885
}
886886
else
@@ -1003,7 +1003,7 @@ private bool NetworkBGSAVE()
10031003
}
10041004
else
10051005
{
1006-
while (!RespWriteUtils.TryWriteError("ERR checkpoint already in progress"u8, ref dcurr, dend))
1006+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_CHECKPOINT_ALREADY_IN_PROGRESS, ref dcurr, dend))
10071007
SendAndReset();
10081008
}
10091009

libs/server/Resp/CmdStrings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ static partial class CmdStrings
303303
public static ReadOnlySpan<byte> RESP_ERR_FLUSHALL_READONLY_REPLICA => "ERR You can't write against a read only replica."u8;
304304
public static ReadOnlySpan<byte> RESP_ERR_ZSET_MEMBER => "ERR could not decode requested zset member"u8;
305305
public static ReadOnlySpan<byte> RESP_ERR_EXPDELSCAN_INVALID => "ERR Cannot execute EXPDELSCAN with background expired key deletion scan enabled"u8;
306+
public static ReadOnlySpan<byte> RESP_ERR_CHECKPOINT_ALREADY_IN_PROGRESS => "ERR checkpoint already in progress"u8;
306307

307308
/// <summary>
308309
/// Response string templates

test/Garnet.test/MultiDatabaseTests.cs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,71 @@ public void MultiDatabaseAofRecoverObjectTest()
12781278
}
12791279
}
12801280

1281+
[Test]
1282+
public void MultiDatabaseSaveInProgressTest()
1283+
{
1284+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
1285+
{
1286+
var db1 = redis.GetDatabase(0);
1287+
var db2 = redis.GetDatabase(1);
1288+
1289+
// Check no saves present
1290+
var lastsave = (int)db1.Execute("LASTSAVE");
1291+
var lastsave1 = (int)db1.Execute("LASTSAVE", "0");
1292+
var lastsave2 = (int)db2.Execute("LASTSAVE", "1");
1293+
ClassicAssert.AreEqual(0, lastsave);
1294+
ClassicAssert.AreEqual(0, lastsave1);
1295+
ClassicAssert.AreEqual(0, lastsave2);
1296+
1297+
// Issue background save to DB 0
1298+
var res1 = db1.Execute("BGSAVE", "0");
1299+
ClassicAssert.AreEqual("Background saving started", res1.ToString());
1300+
1301+
// Issue background save to DB 1 while DB 0 save is in progress - legal
1302+
var res2 = db1.Execute("BGSAVE", "1");
1303+
ClassicAssert.AreEqual("Background saving started", res2.ToString());
1304+
1305+
// Issue general background save while DB 0 save is in progress - legal
1306+
var res = db1.Execute("BGSAVE");
1307+
ClassicAssert.AreEqual("Background saving started", res.ToString());
1308+
1309+
// Wait for saves to complete
1310+
do
1311+
{
1312+
Thread.Sleep(10);
1313+
lastsave = (int)db1.Execute("LASTSAVE");
1314+
lastsave1 = (int)db1.Execute("LASTSAVE", "0");
1315+
lastsave2 = (int)db2.Execute("LASTSAVE", "1");
1316+
}
1317+
while (lastsave == 0 || lastsave1 == 0 || lastsave2 == 0);
1318+
1319+
// Add some data
1320+
for (var i = 0; i < 1024; i++)
1321+
{
1322+
db1.StringSet($"k{i}", new string('x', 256));
1323+
db2.StringSet($"k{i}", new string('x', 256));
1324+
db1.ListLeftPush($"k{i}o", new string('x', 256));
1325+
db2.ListLeftPush($"k{i}o", new string('x', 256));
1326+
}
1327+
1328+
// Issue general background save
1329+
res = db1.Execute("BGSAVE");
1330+
ClassicAssert.AreEqual("Background saving started", res.ToString());
1331+
1332+
// Issue background save to DB 0 while general save is in progress - illegal
1333+
Assert.Throws<RedisServerException>(() => db1.Execute("BGSAVE", "0"),
1334+
Encoding.ASCII.GetString(CmdStrings.RESP_ERR_CHECKPOINT_ALREADY_IN_PROGRESS));
1335+
1336+
// Wait for save to complete
1337+
do
1338+
{
1339+
Thread.Sleep(10);
1340+
lastsave = (int)db1.Execute("LASTSAVE");
1341+
}
1342+
while (lastsave == 0);
1343+
}
1344+
}
1345+
12811346
[Test]
12821347
[TestCase(false)]
12831348
[TestCase(true)]

test/Garnet.test/RespAdminCommandsTests.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,40 @@ public void SeSaveRecoverObjectTest()
211211
}
212212
}
213213

214+
[Test]
215+
[TestCase(SaveType.BackgroundSave)]
216+
#pragma warning disable CS0618 // Type or member is obsolete
217+
[TestCase(SaveType.ForegroundSave)]
218+
#pragma warning restore CS0618 // Type or member is obsolete
219+
public void SeSaveInProgressTest(SaveType saveType)
220+
{
221+
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true));
222+
var server = redis.GetServer(TestUtils.EndPoint);
223+
var db = redis.GetDatabase(0);
224+
225+
var lastSave = server.LastSave();
226+
227+
// Check no saves present
228+
ClassicAssert.AreEqual(DateTimeOffset.FromUnixTimeSeconds(0).Ticks, lastSave.Ticks);
229+
230+
// Add some data
231+
for (var i = 0; i < 1024; i++)
232+
{
233+
db.StringSet($"k{i}", new string('x', 256));
234+
db.ListLeftPush($"k{i}o", new string('x', 256));
235+
}
236+
237+
// Issue background save
238+
server.Save(SaveType.BackgroundSave);
239+
240+
// Issue another save while one is in progress
241+
Assert.Throws<RedisServerException>(() => server.Save(saveType),
242+
Encoding.ASCII.GetString(CmdStrings.RESP_ERR_CHECKPOINT_ALREADY_IN_PROGRESS));
243+
244+
// Wait for save to complete
245+
while (server.LastSave() == lastSave) Thread.Sleep(10);
246+
}
247+
214248
[Test]
215249
public void SeSaveRecoverCustomObjectTest()
216250
{

0 commit comments

Comments
 (0)