Skip to content

Commit af67519

Browse files
TalZaccaibadrishc
andauthored
Improvements in command handling #4 (#549)
* partial checkin * nit * nit * idea for initial spike for making input a struct type does not work * support ZADD and ZREM using safe struct wrappers for input * nit * wip * wip * Fixing non-determinism + refactoring HashGet * dotnet format * Fixed some API calls * dotnet format * wip * small fix * Removing unused method * wip - refactoring ProcessAdminCommands * Undoing changes to RandomUtils * Continued refactoring of AdminCommands * Added "TryGetInt" and "TryGetLong" to parse state api * dotnet format * wip * format * wip * wip * wip * cleanup * wip * wip * format * wip * Fixing build * cont * bugfix * Fixing build * wip * wip * bugfix * Few small fixes * format * bugfix * Added range validation to SessionParseState read methods * wip * merging from latest main * format * wip * wip * wip * wip * wip * wip * wip * wip * clean up * Fixed object store PERSIST + added missing test * wip - adding parseState to ObjectInput & implementing RMW AOF recovery, using parseState in ZADD implementation * Cleaned up ObjectInput and added XML comments * wip * wip * wip * remove unnecessary serialization during upsert * Addressing comments * wip * wip * wip * wip * wip * wip * bugfix * wip * wip * format * format * Using ObjectInput in custom object commands * wip * some cleanup * safe parsing for cluster commands * format * Fixing build errors * Test fix * Fixing merge * Fixing broken test * some small fixes --------- Co-authored-by: Badrish Chandramouli <[email protected]>
1 parent 23c4e3e commit af67519

File tree

73 files changed

+2639
-3080
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+2639
-3080
lines changed

libs/cluster/CmdStrings.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ static class CmdStrings
4949
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_WORKERS_NOT_INITIALIZED => "ERR workers not initialized"u8;
5050
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_EPOCH_NOT_SET => "ERR Node config epoch was not set due to invalid epoch specified"u8;
5151
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_NOT_IN_IMPORTING_STATE => "ERR Node not in IMPORTING state"u8;
52+
public static ReadOnlySpan<byte> RESP_ERR_INVALID_SLOT => "ERR Invalid or out of range slot"u8;
53+
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8;
54+
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_BOOLEAN => "ERR value is not a boolean."u8;
5255

5356
/// <summary>
5457
/// Generic error response strings for <c>MIGRATE</c> command
@@ -73,5 +76,7 @@ static class CmdStrings
7376
/// Response string templates
7477
/// </summary>
7578
public const string GenericErrWrongNumArgs = "ERR wrong number of arguments for '{0}' command";
79+
80+
public const string GenericErrInvalidPort = "ERR Invalid TCP base port specified: {0}";
7681
}
7782
}

libs/cluster/Session/ClusterCommands.cs

Lines changed: 67 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ private List<byte[]> GetKeysInSlot(int slot, int keyCount)
5353
/// <summary>
5454
/// Try to parse slots
5555
/// </summary>
56+
/// <param name="startIdx"></param>
57+
/// <param name="slots"></param>
5658
/// <param name="errorMessage">
5759
/// The ASCII encoded error message if there one of the following conditions is true
5860
/// <list type="bullet">
@@ -61,63 +63,62 @@ private List<byte[]> GetKeysInSlot(int slot, int keyCount)
6163
/// </list>
6264
/// otherwise <see langword="default" />
6365
/// </param>
66+
/// <param name="range"></param>
6467
/// <returns>A boolean indicating that there was error in parsing of the arguments.</returns>
6568
/// <remarks>
6669
/// The error handling is little special for this method because we need to drain all arguments even in the case of error.
6770
/// <para/>
6871
/// The <paramref name="errorMessage"/> will only have a generic error message set in the event of duplicate or out of range slot.
6972
/// The method will still return <see langword="true" /> in case of such error.
7073
/// </remarks>
71-
private bool TryParseSlots(int count, ref byte* ptr, out HashSet<int> slots, out ReadOnlySpan<byte> errorMessage, bool range)
74+
private bool TryParseSlots(int startIdx, out HashSet<int> slots, out ReadOnlySpan<byte> errorMessage, bool range)
7275
{
7376
slots = [];
7477
errorMessage = default;
75-
var duplicate = false;
76-
var outOfRange = false;
77-
var invalidRange = false;
78-
int slotStart;
79-
int slotEnd;
8078

81-
while (count > 0)
79+
var currTokenIdx = startIdx;
80+
while (currTokenIdx < parseState.Count)
8281
{
82+
int slotStart;
83+
int slotEnd;
8384
if (range)
8485
{
85-
if (!RespReadUtils.ReadIntWithLengthHeader(out slotStart, ref ptr, recvBufferPtr + bytesRead))
86-
return false;
87-
if (!RespReadUtils.ReadIntWithLengthHeader(out slotEnd, ref ptr, recvBufferPtr + bytesRead))
86+
if (!parseState.TryGetInt(currTokenIdx++, out slotStart) ||
87+
!parseState.TryGetInt(currTokenIdx++, out slotEnd))
88+
{
89+
errorMessage = CmdStrings.RESP_ERR_INVALID_SLOT;
8890
return false;
89-
count -= 2;
91+
}
9092
}
9193
else
9294
{
93-
if (!RespReadUtils.ReadIntWithLengthHeader(out slotStart, ref ptr, recvBufferPtr + bytesRead))
95+
if (!parseState.TryGetInt(currTokenIdx++, out slotStart))
96+
{
97+
errorMessage = CmdStrings.RESP_ERR_INVALID_SLOT;
9498
return false;
95-
count--;
99+
}
100+
96101
slotEnd = slotStart;
97102
}
98103

99-
if (duplicate || outOfRange || invalidRange)
100-
continue;
101-
102104
if (slotStart > slotEnd)
103105
{
104106
errorMessage = Encoding.ASCII.GetBytes($"ERR Invalid range {slotStart} > {slotEnd}!");
105-
invalidRange = true;
106-
continue;
107+
return false;
107108
}
108109

109110
if (ClusterConfig.OutOfRange(slotStart) || ClusterConfig.OutOfRange(slotEnd))
110111
{
111112
errorMessage = CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE;
112-
outOfRange = true;
113+
return false;
113114
}
114115

115-
for (var slot = slotStart; slot <= slotEnd && !duplicate; slot++)
116+
for (var slot = slotStart; slot <= slotEnd; slot++)
116117
{
117118
if (!slots.Add(slot))
118119
{
119120
errorMessage = Encoding.ASCII.GetBytes($"ERR Slot {slot} specified multiple times");
120-
duplicate = true;
121+
return false;
121122
}
122123
}
123124
}
@@ -129,81 +130,54 @@ private bool TryParseSlots(int count, ref byte* ptr, out HashSet<int> slots, out
129130
/// Handle cluster subcommands.
130131
/// </summary>
131132
/// <param name="command">Subcommand to execute.</param>
132-
/// <param name="count">Number of parameters in teh command buffer</param>
133+
/// <param name="invalidParameters">True if number of parameters is invalid</param>
133134
/// <returns>True if command is fully processed, false if more processing is needed.</returns>
134-
private bool ProcessClusterCommands(RespCommand command, int count)
135+
private void ProcessClusterCommands(RespCommand command, out bool invalidParameters)
135136
{
136-
bool result;
137-
bool invalidParameters;
138-
139-
result =
140-
command switch
141-
{
142-
RespCommand.CLUSTER_ADDSLOTS => NetworkClusterAddSlots(count, out invalidParameters),
143-
RespCommand.CLUSTER_ADDSLOTSRANGE => NetworkClusterAddSlotsRange(count, out invalidParameters),
144-
RespCommand.CLUSTER_AOFSYNC => NetworkClusterAOFSync(count, out invalidParameters),
145-
RespCommand.CLUSTER_APPENDLOG => NetworkClusterAppendLog(count, out invalidParameters),
146-
RespCommand.CLUSTER_BANLIST => NetworkClusterBanList(count, out invalidParameters),
147-
RespCommand.CLUSTER_BEGIN_REPLICA_RECOVER => NetworkClusterBeginReplicaRecover(count, out invalidParameters),
148-
RespCommand.CLUSTER_BUMPEPOCH => NetworkClusterBumpEpoch(count, out invalidParameters),
149-
RespCommand.CLUSTER_COUNTKEYSINSLOT => NetworkClusterCountKeysInSlot(count, out invalidParameters),
150-
RespCommand.CLUSTER_DELKEYSINSLOT => NetworkClusterDelKeysInSlot(count, out invalidParameters),
151-
RespCommand.CLUSTER_DELKEYSINSLOTRANGE => NetworkClusterDelKeysInSlotRange(count, out invalidParameters),
152-
RespCommand.CLUSTER_DELSLOTS => NetworkClusterDelSlots(count, out invalidParameters),
153-
RespCommand.CLUSTER_DELSLOTSRANGE => NetworkClusterDelSlotsRange(count, out invalidParameters),
154-
RespCommand.CLUSTER_ENDPOINT => NetworkClusterEndpoint(count, out invalidParameters),
155-
RespCommand.CLUSTER_FAILOVER => NetworkClusterFailover(count, out invalidParameters),
156-
RespCommand.CLUSTER_FAILREPLICATIONOFFSET => NetworkClusterFailReplicationOffset(count, out invalidParameters),
157-
RespCommand.CLUSTER_FAILSTOPWRITES => NetworkClusterFailStopWrites(count, out invalidParameters),
158-
RespCommand.CLUSTER_FORGET => NetworkClusterForget(count, out invalidParameters),
159-
RespCommand.CLUSTER_GOSSIP => NetworkClusterGossip(count, out invalidParameters),
160-
RespCommand.CLUSTER_GETKEYSINSLOT => NetworkClusterGetKeysInSlot(count, out invalidParameters),
161-
RespCommand.CLUSTER_HELP => NetworkClusterHelp(count, out invalidParameters),
162-
RespCommand.CLUSTER_INFO => NetworkClusterInfo(count, out invalidParameters),
163-
RespCommand.CLUSTER_INITIATE_REPLICA_SYNC => NetworkClusterInitiateReplicaSync(count, out invalidParameters),
164-
RespCommand.CLUSTER_KEYSLOT => NetworkClusterKeySlot(count, out invalidParameters),
165-
RespCommand.CLUSTER_MEET => NetworkClusterMeet(count, out invalidParameters),
166-
RespCommand.CLUSTER_MIGRATE => NetworkClusterMigrate(count, out invalidParameters),
167-
RespCommand.CLUSTER_MTASKS => NetworkClusterMTasks(count, out invalidParameters),
168-
RespCommand.CLUSTER_MYID => NetworkClusterMyId(count, out invalidParameters),
169-
RespCommand.CLUSTER_MYPARENTID => NetworkClusterMyParentId(count, out invalidParameters),
170-
RespCommand.CLUSTER_NODES => NetworkClusterNodes(count, out invalidParameters),
171-
RespCommand.CLUSTER_REPLICAS => NetworkClusterReplicas(count, out invalidParameters),
172-
RespCommand.CLUSTER_REPLICATE => NetworkClusterReplicate(count, out invalidParameters),
173-
RespCommand.CLUSTER_RESET => NetworkClusterReset(count, out invalidParameters),
174-
RespCommand.CLUSTER_SEND_CKPT_FILE_SEGMENT => NetworkClusterSendCheckpointFileSegment(count, out invalidParameters),
175-
RespCommand.CLUSTER_SEND_CKPT_METADATA => NetworkClusterSendCheckpointMetadata(count, out invalidParameters),
176-
RespCommand.CLUSTER_SETCONFIGEPOCH => NetworkClusterSetConfigEpoch(count, out invalidParameters),
177-
RespCommand.CLUSTER_SETSLOT => NetworkClusterSetSlot(count, out invalidParameters),
178-
RespCommand.CLUSTER_SETSLOTSRANGE => NetworkClusterSetSlotsRange(count, out invalidParameters),
179-
RespCommand.CLUSTER_SHARDS => NetworkClusterShards(count, out invalidParameters),
180-
RespCommand.CLUSTER_SLOTS => NetworkClusterSlots(count, out invalidParameters),
181-
RespCommand.CLUSTER_SLOTSTATE => NetworkClusterSlotState(count, out invalidParameters),
182-
_ => throw new Exception($"Unexpected cluster subcommand: {command}")
183-
};
184-
185-
if (invalidParameters)
137+
_ = command switch
186138
{
187-
if (!DrainCommands(count))
188-
return false;
189-
190-
// Have to lookup the RESP name now that we're in the failure case
191-
string subCommand;
192-
if (RespCommandsInfo.TryGetRespCommandInfo(command, out var info))
193-
{
194-
subCommand = info.Name.ToLowerInvariant();
195-
}
196-
else
197-
{
198-
subCommand = "unknown";
199-
}
200-
201-
var errorMsg = string.Format(CmdStrings.GenericErrWrongNumArgs, subCommand);
202-
while (!RespWriteUtils.WriteError(errorMsg, ref dcurr, dend))
203-
SendAndReset();
204-
}
205-
206-
return result;
139+
RespCommand.CLUSTER_ADDSLOTS => NetworkClusterAddSlots(out invalidParameters),
140+
RespCommand.CLUSTER_ADDSLOTSRANGE => NetworkClusterAddSlotsRange(out invalidParameters),
141+
RespCommand.CLUSTER_AOFSYNC => NetworkClusterAOFSync(out invalidParameters),
142+
RespCommand.CLUSTER_APPENDLOG => NetworkClusterAppendLog(out invalidParameters),
143+
RespCommand.CLUSTER_BANLIST => NetworkClusterBanList(out invalidParameters),
144+
RespCommand.CLUSTER_BEGIN_REPLICA_RECOVER => NetworkClusterBeginReplicaRecover(out invalidParameters),
145+
RespCommand.CLUSTER_BUMPEPOCH => NetworkClusterBumpEpoch(out invalidParameters),
146+
RespCommand.CLUSTER_COUNTKEYSINSLOT => NetworkClusterCountKeysInSlot(out invalidParameters),
147+
RespCommand.CLUSTER_DELKEYSINSLOT => NetworkClusterDelKeysInSlot(out invalidParameters),
148+
RespCommand.CLUSTER_DELKEYSINSLOTRANGE => NetworkClusterDelKeysInSlotRange(out invalidParameters),
149+
RespCommand.CLUSTER_DELSLOTS => NetworkClusterDelSlots(out invalidParameters),
150+
RespCommand.CLUSTER_DELSLOTSRANGE => NetworkClusterDelSlotsRange(out invalidParameters),
151+
RespCommand.CLUSTER_ENDPOINT => NetworkClusterEndpoint(out invalidParameters),
152+
RespCommand.CLUSTER_FAILOVER => NetworkClusterFailover(out invalidParameters),
153+
RespCommand.CLUSTER_FAILREPLICATIONOFFSET => NetworkClusterFailReplicationOffset(out invalidParameters),
154+
RespCommand.CLUSTER_FAILSTOPWRITES => NetworkClusterFailStopWrites(out invalidParameters),
155+
RespCommand.CLUSTER_FORGET => NetworkClusterForget(out invalidParameters),
156+
RespCommand.CLUSTER_GOSSIP => NetworkClusterGossip(out invalidParameters),
157+
RespCommand.CLUSTER_GETKEYSINSLOT => NetworkClusterGetKeysInSlot(out invalidParameters),
158+
RespCommand.CLUSTER_HELP => NetworkClusterHelp(out invalidParameters),
159+
RespCommand.CLUSTER_INFO => NetworkClusterInfo(out invalidParameters),
160+
RespCommand.CLUSTER_INITIATE_REPLICA_SYNC => NetworkClusterInitiateReplicaSync(out invalidParameters),
161+
RespCommand.CLUSTER_KEYSLOT => NetworkClusterKeySlot(out invalidParameters),
162+
RespCommand.CLUSTER_MEET => NetworkClusterMeet(out invalidParameters),
163+
RespCommand.CLUSTER_MIGRATE => NetworkClusterMigrate(out invalidParameters),
164+
RespCommand.CLUSTER_MTASKS => NetworkClusterMTasks(out invalidParameters),
165+
RespCommand.CLUSTER_MYID => NetworkClusterMyId(out invalidParameters),
166+
RespCommand.CLUSTER_MYPARENTID => NetworkClusterMyParentId(out invalidParameters),
167+
RespCommand.CLUSTER_NODES => NetworkClusterNodes(out invalidParameters),
168+
RespCommand.CLUSTER_REPLICAS => NetworkClusterReplicas(out invalidParameters),
169+
RespCommand.CLUSTER_REPLICATE => NetworkClusterReplicate(out invalidParameters),
170+
RespCommand.CLUSTER_RESET => NetworkClusterReset(out invalidParameters),
171+
RespCommand.CLUSTER_SEND_CKPT_FILE_SEGMENT => NetworkClusterSendCheckpointFileSegment(out invalidParameters),
172+
RespCommand.CLUSTER_SEND_CKPT_METADATA => NetworkClusterSendCheckpointMetadata(out invalidParameters),
173+
RespCommand.CLUSTER_SETCONFIGEPOCH => NetworkClusterSetConfigEpoch(out invalidParameters),
174+
RespCommand.CLUSTER_SETSLOT => NetworkClusterSetSlot(out invalidParameters),
175+
RespCommand.CLUSTER_SETSLOTSRANGE => NetworkClusterSetSlotsRange(out invalidParameters),
176+
RespCommand.CLUSTER_SHARDS => NetworkClusterShards(out invalidParameters),
177+
RespCommand.CLUSTER_SLOTS => NetworkClusterSlots(out invalidParameters),
178+
RespCommand.CLUSTER_SLOTSTATE => NetworkClusterSlotState(out invalidParameters),
179+
_ => throw new Exception($"Unexpected cluster subcommand: {command}")
180+
};
207181
}
208182
}
209183
}

0 commit comments

Comments
 (0)