Skip to content

Commit e8edca8

Browse files
authored
Fix Migration for Larger Payloads (#611)
* fix SpanByte serialization for large payloads during migration * use nunit legacy methods * simplify spanbyte serialization * simplify spanbyte serialization over the wire * fix flushdb and flushall for cluster * dispose memory if write fails * re-use heap buffer when rented and dispose it end of key batch migration
1 parent af67519 commit e8edca8

File tree

8 files changed

+155
-51
lines changed

8 files changed

+155
-51
lines changed

libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -393,23 +393,14 @@ public bool TryWriteKeyValueByteArray(byte[] key, byte[] value, long expiration,
393393

394394
private bool WriteSerializedSpanByte(ref SpanByte key, ref SpanByte value)
395395
{
396-
// We include space for newline at the end, to be added before sending
397-
int totalLen = key.TotalSize + value.TotalSize + 2 + 2;
396+
var totalLen = key.TotalSize + value.TotalSize + 2 + 2;
398397
if (totalLen > (int)(end - curr))
399398
return false;
400399

401-
*(int*)curr = key.Length;
402-
curr += sizeof(int);
403-
Buffer.MemoryCopy(key.ToPointerWithMetadata(), curr, key.Length, key.Length);
404-
curr += key.Length;
405-
*curr++ = (byte)key.MetadataSize;
406-
407-
*(int*)curr = value.Length;
408-
curr += sizeof(int);
409-
Buffer.MemoryCopy(value.ToPointerWithMetadata(), curr, value.Length, value.Length);
410-
curr += value.Length;
411-
*curr++ = (byte)value.MetadataSize;
412-
400+
key.CopyTo(curr);
401+
curr += key.TotalSize;
402+
value.CopyTo(curr);
403+
curr += value.TotalSize;
413404
return true;
414405
}
415406

libs/cluster/Server/Migration/MigrateSessionKeys.cs

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Licensed under the MIT license.
33

44
using System;
5-
using System.Buffers;
65
using System.Runtime.CompilerServices;
76
using Garnet.server;
87
using Microsoft.Extensions.Logging;
@@ -24,6 +23,9 @@ private bool MigrateKeysFromMainStore()
2423
{
2524
var bufferSize = 1 << 10;
2625
SectorAlignedMemory buffer = new(bufferSize, 1);
26+
var bufPtr = buffer.GetValidPointer();
27+
var bufPtrEnd = bufPtr + bufferSize;
28+
var o = new SpanByteAndMemory(bufPtr, (int)(bufPtrEnd - bufPtr));
2729

2830
try
2931
{
@@ -46,59 +48,55 @@ private bool MigrateKeysFromMainStore()
4648
// 1. Header
4749
((RespInputHeader*)pcurr)->SetHeader(RespCommandAccessor.MIGRATE, 0);
4850

49-
var bufPtr = buffer.GetValidPointer();
50-
var bufPtrEnd = bufPtr + bufferSize;
51-
foreach (var mKey in _keys.GetKeys())
51+
foreach (var pair in _keys.GetKeys())
5252
{
5353
// Process only keys in MIGRATING status
54-
if (mKey.Value != KeyMigrationStatus.MIGRATING)
54+
if (pair.Value != KeyMigrationStatus.MIGRATING)
5555
continue;
5656

57-
var key = mKey.Key.SpanByte;
57+
var key = pair.Key.SpanByte;
5858

5959
// Read value for key
60-
var o = new SpanByteAndMemory(bufPtr, (int)(bufPtrEnd - bufPtr));
6160
var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref Unsafe.AsRef<SpanByte>(pbCmdInput), ref o);
6261

6362
// Check if found in main store
6463
if (status == GarnetStatus.NOTFOUND)
6564
{
6665
// Transition key status back to QUEUED to unblock any writers
67-
_keys.UpdateStatus(mKey.Key, KeyMigrationStatus.QUEUED);
66+
_keys.UpdateStatus(pair.Key, KeyMigrationStatus.QUEUED);
6867
continue;
6968
}
7069

71-
// Make value SpanByte
72-
SpanByte value;
73-
MemoryHandle memoryHandle = default;
70+
// Get SpanByte from stack if any
71+
ref var value = ref o.SpanByte;
7472
if (!o.IsSpanByte)
7573
{
76-
memoryHandle = o.Memory.Memory.Pin();
77-
value = SpanByte.FromPinnedMemory(o.Memory.Memory);
74+
// Reinterpret heap memory to SpanByte
75+
value = ref SpanByte.ReinterpretWithoutLength(o.Memory.Memory.Span);
7876
}
79-
else
80-
value = o.SpanByte;
8177

78+
// Write key to network buffer if it has not expired
8279
if (!ClusterSession.Expired(ref value) && !WriteOrSendMainStoreKeyValuePair(ref key, ref value))
8380
return false;
8481

85-
if (!o.IsSpanByte)
86-
{
87-
memoryHandle.Dispose();
88-
o.Memory.Dispose();
89-
}
82+
// Reset SpanByte for next read if any but don't dispose heap buffer as we might re-use it
83+
o.SpanByte = new SpanByte((int)(bufPtrEnd - bufPtr), (IntPtr)bufPtr);
9084
}
9185

9286
// Flush data in client buffer
9387
if (!HandleMigrateTaskResponse(_gcs.SendAndResetMigrate()))
9488
return false;
95-
return true;
89+
90+
DeleteKeys();
9691
}
9792
finally
9893
{
99-
DeleteKeys();
94+
// If allocated memory in heap dispose it here.
95+
if (o.Memory != default)
96+
o.Memory.Dispose();
10097
buffer.Dispose();
10198
}
99+
return true;
102100
}
103101

104102
/// <summary>

libs/cluster/Session/RespClusterMigrateCommands.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,10 @@ private bool NetworkClusterMigrate(out bool invalidParameters)
7171
TrackImportProgress(keyCount, isMainStore: true, keyCount == 0);
7272
while (i < keyCount)
7373
{
74-
byte* keyPtr = null, valPtr = null;
75-
byte keyMetaDataSize = 0, valMetaDataSize = 0;
76-
if (!RespReadUtils.ReadSerializedSpanByte(ref keyPtr, ref keyMetaDataSize, ref valPtr,
77-
ref valMetaDataSize, ref payloadPtr, payloadEndPtr))
78-
return false;
79-
80-
ref var key = ref SpanByte.Reinterpret(keyPtr);
81-
if (keyMetaDataSize > 0) key.ExtraMetadata = *(long*)(keyPtr + 4);
82-
ref var value = ref SpanByte.Reinterpret(valPtr);
83-
if (valMetaDataSize > 0) value.ExtraMetadata = *(long*)(valPtr + 4);
74+
ref var key = ref SpanByte.Reinterpret(payloadPtr);
75+
payloadPtr += key.TotalSize;
76+
ref var value = ref SpanByte.Reinterpret(payloadPtr);
77+
payloadPtr += value.TotalSize;
8478

8579
// An error has occurred
8680
if (migrateState > 0)

libs/server/Resp/Parser/RespCommand.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public enum RespCommand : byte
153153
BITOP_XOR,
154154
BITOP_NOT, // Note: Update OneIfWrite if adding new write commands after this
155155

156-
// Neither read nor write commands
156+
// Neither read nor write key commands
157157
ASYNC,
158158

159159
PING,

libs/server/Storage/Functions/MainStore/PrivateMethods.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ void CopyRespToWithInput(ref SpanByte input, ref SpanByte value, ref SpanByteAnd
9797
break;
9898

9999
case RespCommand.MIGRATE:
100-
long expiration = value.ExtraMetadata;
101100
if (value.Length <= dst.Length)
102101
{
103102
value.CopyTo(ref dst.SpanByte);
@@ -106,9 +105,18 @@ void CopyRespToWithInput(ref SpanByte input, ref SpanByte value, ref SpanByteAnd
106105
}
107106

108107
dst.ConvertToHeap();
109-
dst.Length = value.Length;
110-
dst.Memory = functionsState.memoryPool.Rent(value.Length);
111-
value.AsReadOnlySpanWithMetadata().CopyTo(dst.Memory.Memory.Span);
108+
dst.Length = value.TotalSize;
109+
110+
if (dst.Memory == default) // Allocate new heap buffer
111+
dst.Memory = functionsState.memoryPool.Rent(dst.Length);
112+
else if (dst.Memory.Memory.Span.Length < value.TotalSize)
113+
// Allocate new heap buffer only if existing one is smaller
114+
// otherwise it is safe to re-use existing buffer
115+
{
116+
dst.Memory.Dispose();
117+
dst.Memory = functionsState.memoryPool.Rent(dst.Length);
118+
}
119+
value.CopyTo(dst.Memory.Memory.Span);
112120
break;
113121

114122
case RespCommand.GET:

libs/storage/Tsavorite/cs/src/core/VarLen/SpanByte.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,16 @@ public void CopyWithHeaderTo(ref SpanByteAndMemory dst, MemoryPool<byte> memoryP
457457
AsReadOnlySpan().CopyTo(dst.Memory.Memory.Span.Slice(sizeof(int) + MetadataSize));
458458
}
459459

460+
/// <summary>
461+
/// Copy serialized version to specified memory location
462+
/// </summary>
463+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
464+
public void CopyTo(Span<byte> buffer)
465+
{
466+
fixed (byte* ptr = buffer)
467+
CopyTo(ptr);
468+
}
469+
460470
/// <summary>
461471
/// Copy serialized version to specified memory location
462472
/// </summary>

test/Garnet.test.cluster/ClusterMigrateTests.cs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1666,5 +1666,105 @@ public void ClusterMigrateDataSlotsRange()
16661666
ClassicAssert.AreEqual(Encoding.ASCII.GetString(key), resp);
16671667
}
16681668
}
1669+
1670+
[Test, Order(16)]
1671+
[Category("CLUSTER")]
1672+
public void ClusterMigrateLargePayload([Values] bool expiration, [Values] bool largePayload)
1673+
{
1674+
var r = new Random(674386);
1675+
var key = new byte[12];
1676+
ClusterTestUtils.RandomBytes(ref r, ref key);
1677+
var value = new byte[largePayload ? 6789 : 123];
1678+
ClusterTestUtils.RandomBytes(ref r, ref value);
1679+
1680+
List<(byte[], byte[])> data = [(key, value)];
1681+
ClusterMigrateExpirationWithVaryingPayload(expiration, data);
1682+
}
1683+
1684+
[Test, Order(16)]
1685+
[Category("CLUSTER")]
1686+
public void ClusterMigrateIncreasingPayload([Values] bool expiration, [Values] bool largeSameSize)
1687+
{
1688+
var r = new Random(674386);
1689+
List<(byte[], byte[])> data = [];
1690+
var valueSize = largeSameSize ? 1234 : 12;
1691+
1692+
for (var i = 0; i < 8; i++)
1693+
{
1694+
var key = new byte[12];
1695+
ClusterTestUtils.RandomBytes(ref r, ref key);
1696+
var value = new byte[valueSize];
1697+
ClusterTestUtils.RandomBytes(ref r, ref value);
1698+
1699+
data.Add((key, value));
1700+
if (!largeSameSize) valueSize += valueSize * 2;
1701+
}
1702+
1703+
ClusterMigrateExpirationWithVaryingPayload(expiration, data);
1704+
}
1705+
1706+
private void ClusterMigrateExpirationWithVaryingPayload(bool expiration, List<(byte[], byte[])> data)
1707+
{
1708+
var Shards = 2;
1709+
context.CreateInstances(Shards, useTLS: UseTLS);
1710+
context.CreateConnection(useTLS: UseTLS);
1711+
1712+
var srcNodeIndex = 0;
1713+
var dstNodeIndex = 1;
1714+
ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(srcNodeIndex, [(0, 16383)], addslot: true, logger: context.logger));
1715+
1716+
context.clusterTestUtils.SetConfigEpoch(srcNodeIndex, srcNodeIndex + 1, logger: context.logger);
1717+
context.clusterTestUtils.SetConfigEpoch(dstNodeIndex, dstNodeIndex + 2, logger: context.logger);
1718+
context.clusterTestUtils.Meet(srcNodeIndex, dstNodeIndex, logger: context.logger);
1719+
context.clusterTestUtils.WaitUntilNodeIsKnown(dstNodeIndex, srcNodeIndex, logger: context.logger);
1720+
var config1 = context.clusterTestUtils.ClusterNodes(srcNodeIndex, logger: context.logger);
1721+
var config2 = context.clusterTestUtils.ClusterNodes(dstNodeIndex, logger: context.logger);
1722+
ClassicAssert.AreEqual(config1.GetBySlot(0).NodeId, config2.GetBySlot(0).NodeId);
1723+
1724+
var db = context.clusterTestUtils.GetDatabase();
1725+
1726+
foreach (var pair in data)
1727+
ClassicAssert.IsTrue(db.StringSet(pair.Item1, pair.Item2));
1728+
1729+
foreach (var pair in data)
1730+
{
1731+
var returnedValue = (string)db.StringGet(pair.Item1);
1732+
ClassicAssert.AreEqual(pair.Item2, returnedValue);
1733+
}
1734+
1735+
if (expiration)
1736+
{
1737+
foreach (var pair in data)
1738+
{
1739+
ClassicAssert.IsNull(db.KeyTimeToLive(pair.Item1), "set key should not have an existing expiry");
1740+
ClassicAssert.AreEqual(true, db.KeyExpire(pair.Item1, TimeSpan.FromSeconds(10000)));
1741+
ClassicAssert.IsNotNull(db.KeyTimeToLive(pair.Item1));
1742+
}
1743+
}
1744+
1745+
var sourceEndPoint = context.clusterTestUtils.GetEndPoint(srcNodeIndex);
1746+
var targetEndPoint = context.clusterTestUtils.GetEndPoint(dstNodeIndex);
1747+
context.clusterTestUtils.MigrateSlots(
1748+
sourceEndPoint,
1749+
targetEndPoint,
1750+
[0, 16383],
1751+
range: true,
1752+
logger: context.logger);
1753+
1754+
context.clusterTestUtils.WaitForMigrationCleanup(srcNodeIndex, logger: context.logger);
1755+
1756+
foreach (var pair in data)
1757+
{
1758+
var returnedValue = (string)db.StringGet(pair.Item1);
1759+
ClassicAssert.AreEqual(pair.Item2, returnedValue, "returned value mismatch after migration");
1760+
}
1761+
1762+
1763+
if (expiration)
1764+
{
1765+
foreach (var pair in data)
1766+
ClassicAssert.IsNotNull(db.KeyTimeToLive(pair.Item1), "key does not have expiry after migration");
1767+
}
1768+
}
16691769
}
16701770
}

test/Garnet.test.cluster/ClusterTestUtils.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,9 @@ public void InitRandom(int seed)
709709
}
710710

711711
public void RandomBytes(ref byte[] data, int startOffset = -1, int endOffset = -1)
712+
=> RandomBytes(ref r, ref data, startOffset, endOffset);
713+
714+
public static void RandomBytes(ref Random r, ref byte[] data, int startOffset = -1, int endOffset = -1)
712715
{
713716
startOffset = startOffset == -1 ? 0 : startOffset;
714717
endOffset = endOffset == -1 ? data.Length : endOffset;

0 commit comments

Comments
 (0)