Skip to content

Commit 836eb9b

Browse files
hamdaankhalidHamdaan KhalidCopilot
authored
Fix Garnet RMW InitialUpdater to work with Revivification (In-Chain) (#1307)
* Rmw and Reviv fix * WIP * wip * fmt * fmt * WIP * fmt * fmt * stupid indentation * fix double setting of filler breaking debug assertion * wip * cleanup * try a much cleaner way * wip * fmt * fix test * fix up setting of spanbyte lengths * clean used value length setting * is needed after all * Update libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/RecordLengths.cs Co-authored-by: Copilot <[email protected]> * fix missing copy * uno mas comment --------- Co-authored-by: Hamdaan Khalid <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 67bcf60 commit 836eb9b

File tree

8 files changed

+246
-46
lines changed

8 files changed

+246
-46
lines changed

libs/server/Storage/Functions/MainStore/RMWMethods.cs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,23 +92,19 @@ public bool InitialUpdater(ref SpanByte key, ref RawStringInput input, ref SpanB
9292
var length = sbSrcHLL.Length;
9393
var srcHLL = sbSrcHLL.ToPointer();
9494
var dstHLL = value.ToPointer();
95-
9695
value.ShrinkSerializedLength(length);
9796
Buffer.MemoryCopy(srcHLL, dstHLL, value.Length, value.Length);
9897
break;
9998

10099
case RespCommand.SETIFGREATER:
101100
case RespCommand.SETIFMATCH:
102101
int spaceForEtag = this.functionsState.etagState.etagOffsetForVarlen;
103-
// Copy input to value
104102
var newInputValue = input.parseState.GetArgSliceByRef(0).ReadOnlySpan;
105103
var metadataSize = input.arg1 == 0 ? 0 : sizeof(long);
106104
value.ShrinkSerializedLength(newInputValue.Length + metadataSize + spaceForEtag);
107105
value.ExtraMetadata = input.arg1;
108106
newInputValue.CopyTo(value.AsSpan(spaceForEtag));
109-
110107
long clientSentEtag = input.parseState.GetLong(1);
111-
112108
if (cmd == RespCommand.SETIFMATCH)
113109
clientSentEtag++;
114110

@@ -183,11 +179,9 @@ public bool InitialUpdater(ref SpanByte key, ref RawStringInput input, ref SpanB
183179
case RespCommand.SETBIT:
184180
var bOffset = input.arg1;
185181
var bSetVal = (byte)(input.parseState.GetArgSliceByRef(1).ReadOnlySpan[0] - '0');
186-
187182
value.ShrinkSerializedLength(BitmapManager.Length(bOffset));
188-
189-
// Always return 0 at initial updater because previous value was 0
190183
BitmapManager.UpdateBitmap(value.ToPointer(), bOffset, bSetVal);
184+
// Always return 0 at initial updater because previous value was 0
191185
CopyDefaultResp(CmdStrings.RESP_RETURN_VAL_0, ref output);
192186
break;
193187

@@ -219,9 +213,7 @@ public bool InitialUpdater(ref SpanByte key, ref RawStringInput input, ref SpanB
219213
case RespCommand.APPEND:
220214
var appendValue = input.parseState.GetArgSliceByRef(0);
221215
value.ShrinkSerializedLength(appendValue.Length);
222-
// Copy value to be appended to the newly allocated value buffer
223216
appendValue.ReadOnlySpan.CopyTo(value.AsSpan());
224-
225217
CopyValueLengthToOutput(ref value, ref output, 0);
226218
break;
227219
case RespCommand.INCR:
@@ -326,15 +318,15 @@ public bool InPlaceUpdater(ref SpanByte key, ref RawStringInput input, ref SpanB
326318
// NOTE: In the below control flow if you decide to add a new command or modify a command such that it will now do an early return with TRUE, you must make sure you must reset etagState in FunctionState
327319
private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, ref SpanByte value, ref SpanByteAndMemory output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo)
328320
{
321+
RespCommand cmd = input.header.cmd;
329322
// Expired data
330323
if (value.MetadataSize > 0 && input.header.CheckExpiry(value.ExtraMetadata))
331324
{
332-
rmwInfo.Action = RMWAction.ExpireAndResume;
325+
rmwInfo.Action = cmd is RespCommand.DELIFEXPIM ? RMWAction.ExpireAndStop : RMWAction.ExpireAndResume;
333326
recordInfo.ClearHasETag();
334327
return false;
335328
}
336329

337-
RespCommand cmd = input.header.cmd;
338330
bool hadRecordPreMutation = recordInfo.ETag;
339331
bool shouldUpdateEtag = hadRecordPreMutation;
340332
if (shouldUpdateEtag)
@@ -815,15 +807,7 @@ private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, re
815807

816808
return false;
817809
case RespCommand.DELIFEXPIM:
818-
// Only if the key has expired, will we delete it.
819-
if (value.MetadataSize > 0 && input.header.CheckExpiry(value.ExtraMetadata))
820-
{
821-
// setting the action and returning false will tombstone this record
822-
rmwInfo.Action = RMWAction.ExpireAndStop;
823-
// reset etag state that may have been initialized earlier,
824-
EtagState.ResetState(ref functionsState.etagState);
825-
return false;
826-
}
810+
// this is the case where it isn't expired
827811
shouldUpdateEtag = false;
828812
break;
829813
default:

libs/server/Storage/Session/MainStore/AdvancedOps.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public GarnetStatus RMW_MainStore<TContext>(ref SpanByte key, ref RawStringInput
7979
if (status.IsPending)
8080
CompletePendingForSession(ref status, ref output, ref context);
8181

82-
if (status.Found || status.Record.Created)
82+
if (status.Found || status.Record.Created || status.Record.InPlaceUpdated)
8383
return GarnetStatus.OK;
8484
else
8585
return GarnetStatus.NOTFOUND;

libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ private bool TryRevivifyInChain<TInput, TOutput, TContext, TSessionFunctionsWrap
285285
// RMW uses GetInitialRecordSize because it has only the initial Input, not a Value
286286
var (requiredSize, _, _) = hlog.GetRMWInitialRecordSize(ref key, ref input, sessionFunctions);
287287
(ok, rmwInfo.UsedValueLength) = TryReinitializeTombstonedValue<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions,
288-
ref srcRecordInfo, ref key, ref recordValue, requiredSize, recordLengths);
288+
ref srcRecordInfo, ref key, ref recordValue, requiredSize, recordLengths, stackCtx.recSrc.PhysicalAddress);
289289
}
290290

291291
ref RevivificationStats stats = ref sessionFunctions.Ctx.RevivificationStats;
@@ -295,7 +295,8 @@ private bool TryRevivifyInChain<TInput, TOutput, TContext, TSessionFunctionsWrap
295295
MarkPage(stackCtx.recSrc.LogicalAddress, sessionFunctions.Ctx);
296296
pendingContext.recordInfo = srcRecordInfo;
297297
pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress;
298-
status = OperationStatusUtils.AdvancedOpCode(OperationStatus.SUCCESS, StatusCode.InPlaceUpdatedRecord);
298+
// We "IPU'd" because we reused a tombstone, but since the record we have reused did not logically exist, we must also bubble up that the original key was not found (logically). OperationStatus.NOTFOUND bubbles up success but also indicates that the record was not found in the database.
299+
status = OperationStatusUtils.AdvancedOpCode(OperationStatus.NOTFOUND, StatusCode.InPlaceUpdatedRecord);
299300
stats.inChainSuccesses++;
300301
return true;
301302
}

libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalUpsert.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private bool TryRevivifyInChain<TInput, TOutput, TContext, TSessionFunctionsWrap
233233
// Input is not included in record-length calculations for Upsert
234234
var (requiredSize, _, _) = hlog.GetRecordSize(ref key, ref value);
235235
(ok, upsertInfo.UsedValueLength) = TryReinitializeTombstonedValue<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions,
236-
ref srcRecordInfo, ref key, ref recordValue, requiredSize, recordLengths);
236+
ref srcRecordInfo, ref key, ref recordValue, requiredSize, recordLengths, stackCtx.recSrc.PhysicalAddress);
237237
}
238238

239239
ref RevivificationStats stats = ref sessionFunctions.Ctx.RevivificationStats;
@@ -244,7 +244,8 @@ private bool TryRevivifyInChain<TInput, TOutput, TContext, TSessionFunctionsWrap
244244
MarkPage(stackCtx.recSrc.LogicalAddress, sessionFunctions.Ctx);
245245
pendingContext.recordInfo = srcRecordInfo;
246246
pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress;
247-
status = OperationStatusUtils.AdvancedOpCode(OperationStatus.SUCCESS, StatusCode.InPlaceUpdatedRecord);
247+
// Return NOTFOUND OperationStatus to indicate that the operation was successful but a previous record was not found.
248+
status = OperationStatusUtils.AdvancedOpCode(OperationStatus.NOTFOUND, StatusCode.InPlaceUpdatedRecord);
248249
stats.inChainSuccesses++;
249250
return true;
250251
}

libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/Revivification/RecordLengths.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ internal void SetTombstoneAndExtraValueLength(ref TValue recordValue, ref Record
208208

209209
[MethodImpl(MethodImplOptions.AggressiveInlining)]
210210
internal (bool ok, int usedValueLength) TryReinitializeTombstonedValue<TInput, TOutput, TContext, TSessionFunctionsWrapper>(TSessionFunctionsWrapper sessionFunctions,
211-
ref RecordInfo srcRecordInfo, ref TKey key, ref TValue recordValue, int requiredSize, (int usedValueLength, int fullValueLength, int allocatedSize) recordLengths)
211+
ref RecordInfo srcRecordInfo, ref TKey key, ref TValue recordValue, int requiredSize, (int usedValueLength, int fullValueLength, int allocatedSize) recordLengths, long physicalAddress)
212212
where TSessionFunctionsWrapper : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
213213
{
214214
if (RevivificationManager.IsFixedLength || recordLengths.allocatedSize < requiredSize)
@@ -219,13 +219,21 @@ internal void SetTombstoneAndExtraValueLength(ref TValue recordValue, ref Record
219219
var requiredValueLength = requiredSize - valueOffset;
220220
var minValueLength = requiredValueLength < recordLengths.usedValueLength ? requiredValueLength : recordLengths.usedValueLength;
221221

222+
// clears out the minimum space possible. So let's say we are shrinking our usage from 8 bytes to 3 bytes. This will clear only the bytes 4-8.
223+
// if we are expanding this will not clear anything.
222224
ClearExtraValueSpace(ref srcRecordInfo, ref recordValue, minValueLength, recordLengths.fullValueLength);
223-
storeFunctions.DisposeRecord(ref key, ref recordValue, DisposeReason.RevivificationFreeList);
224225

225226
srcRecordInfo.ClearTombstone();
226227

227-
SetExtraValueLength(ref recordValue, ref srcRecordInfo, recordLengths.usedValueLength, recordLengths.fullValueLength);
228-
return (true, hlog.GetValueLength(ref recordValue));
228+
// for SpanByte, this will set the new length (payload + metadata).
229+
hlog.GetAndInitializeValue(physicalAddress, physicalAddress + requiredSize);
230+
// since the above sets the Length, we can use the below to get TotalSize which represents the UsedLength of a record
231+
var newUsedValueLength = hlog.GetValueLength(ref recordValue);
232+
233+
// potentially sets filler, if the used value length is going to be under the full length by more than 4 bytes.
234+
SetExtraValueLength(ref recordValue, ref srcRecordInfo, usedValueLength: newUsedValueLength, recordLengths.fullValueLength);
235+
236+
return (true, newUsedValueLength);
229237
}
230238

231239
#endregion TombstonedRecords

libs/storage/Tsavorite/cs/src/core/VarLen/SpanByte.cs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -394,28 +394,19 @@ public void CopyTo(ref SpanByte dst, long metadata = 0)
394394
[MethodImpl(MethodImplOptions.AggressiveInlining)]
395395
public bool TrySafeCopyTo(ref SpanByte dst, int fullDestSize, long metadata = 0)
396396
{
397-
// Need to account for extra metadata if current value does not have any.
397+
// If the incoming caller wants to addMetadata and the destination does not already have metadata, the new length needs to account for it.
398398
var addMetadata = metadata > 0 && MetadataSize == 0;
399399

400400
var newTotalSize = addMetadata ? TotalSize + sizeof(long) : TotalSize;
401401
if (fullDestSize < newTotalSize)
402402
return false;
403403

404404
var newLength = addMetadata ? Length + sizeof(long) : Length;
405-
if (dst.Length < newLength)
406-
{
407-
// dst is shorter than src, but we have already verified there is enough extra value space to grow dst to store src.
408-
dst.Length = newLength;
409-
CopyTo(ref dst, metadata);
410-
}
411-
else
412-
{
413-
// dst length is equal or longer than src. We can adjust the length header on the serialized log, if we wish (here, we do).
414-
// This method will also zero out the extra space to retain log scan correctness.
415-
dst.ShrinkSerializedLength(newLength);
416-
CopyTo(ref dst, metadata);
417-
dst.Length = newLength;
418-
}
405+
dst.ShrinkSerializedLength(newLength);
406+
// Note: If dst is shorter than src we have already verified there is enough extra value space to grow dst to store src.
407+
dst.Length = newLength;
408+
CopyTo(ref dst, metadata);
409+
419410
return true;
420411
}
421412

0 commit comments

Comments
 (0)