Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Current package versions:

## Unreleased

- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972))

## 2.9.32

- Fix `SSUBSCRIBE` routing during slot migrations ([#2969 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2969))
Expand Down
25 changes: 25 additions & 0 deletions src/StackExchange.Redis/APITypes/StreamEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ public StreamEntry(RedisValue id, NameValueEntry[] values)
{
Id = id;
Values = values;
IdleTime = null;
DeliveryCount = 0;
}

/// <summary>
/// Creates an stream entry.
/// </summary>
public StreamEntry(RedisValue id, NameValueEntry[] values, TimeSpan? idleTime, int deliveryCount)
{
Id = id;
Values = values;
IdleTime = idleTime;
DeliveryCount = deliveryCount;
}

/// <summary>
Expand Down Expand Up @@ -51,6 +64,18 @@ public RedisValue this[RedisValue fieldName]
}
}

/// <summary>
/// Delivery count - the number of times this entry has been delivered: 0 for new messages that haven't been delivered before,
/// 1+ for claimed messages (previously unacknowledged entries).
/// </summary>
public int DeliveryCount { get; }

/// <summary>
/// Idle time in milliseconds - the number of milliseconds elapsed since this entry was last delivered to a consumer.
/// </summary>
/// <remarks>This member is populated when using <c>XREADGROUP</c> with <c>CLAIM</c>.</remarks>
public TimeSpan? IdleTime { get; }

/// <summary>
/// Indicates that the Redis Stream Entry is null.
/// </summary>
Expand Down
20 changes: 19 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3004,7 +3004,25 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
/// </remarks>
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);

/// <summary>
/// Read from multiple streams into the given consumer group.
/// The consumer group with the given <paramref name="groupName"/> will need to have been created for each stream prior to calling this method.
/// </summary>
/// <param name="streamPositions">Array of streams and the positions from which to begin reading for each stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The name of the consumer.</param>
/// <param name="countPerStream">The maximum number of messages to return from each stream.</param>
/// <param name="noAck">When true, the message will not be added to the pending message list.</param>
/// <param name="claimMinIdleTime">Auto-claim messages that have been idle for at least this long.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>A value of <see cref="RedisStream"/> for each stream.</returns>
/// <remarks>
/// <para>Equivalent of calling <c>XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2</c>.</para>
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
/// </remarks>
RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Trim the stream to a specified maximum length.
Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,10 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, CommandFlags)"/>
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None);
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamReadGroup(StreamPosition[], RedisValue, RedisValue, int?, bool, TimeSpan?, CommandFlags)"/>
Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags);
Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions
public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, flags);

public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroupAsync(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);

public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,9 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu
public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, flags);

public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream = null, bool noAck = false, TimeSpan? claimMinIdleTime = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamReadGroup(streamPositions, groupName, consumerName, countPerStream, noAck, claimMinIdleTime, flags);

public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);

Expand Down
9 changes: 7 additions & 2 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,6 @@ StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, Stack
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]!
StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> long
StackExchange.Redis.IDatabase.StreamTrim(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
Expand Down Expand Up @@ -991,7 +990,6 @@ StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position = null, int? count = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? position, int? count, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, int maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<long>!
StackExchange.Redis.IDatabaseAsync.StreamTrimAsync(StackExchange.Redis.RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode mode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
Expand Down Expand Up @@ -2052,3 +2050,10 @@ StackExchange.Redis.IServer.ExecuteAsync(int? database, string! command, System.
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByMember(StackExchange.Redis.RedisValue member) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByVector(System.ReadOnlyMemory<float> vector) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
StackExchange.Redis.RedisChannel.WithKeyRouting() -> StackExchange.Redis.RedisChannel
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
StackExchange.Redis.IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, bool noAck, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisStream[]!
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream = null, bool noAck = false, System.TimeSpan? claimMinIdleTime = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadGroupAsync(StackExchange.Redis.StreamPosition[]! streamPositions, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue consumerName, int? countPerStream, bool noAck, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
StackExchange.Redis.StreamEntry.DeliveryCount.get -> int
StackExchange.Redis.StreamEntry.IdleTime.get -> System.TimeSpan?
StackExchange.Redis.StreamEntry.StreamEntry(StackExchange.Redis.RedisValue id, StackExchange.Redis.NameValueEntry[]! values, System.TimeSpan? idleTime, int deliveryCount) -> void
Loading
Loading