Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/main/java/io/lettuce/core/StreamMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public class StreamMessage<K, V> {

private final Map<K, V> body;

private final Long msSinceLastDelivery;

private final Long redeliveryCount;

/**
* Create a new {@link StreamMessage}.
*
Expand All @@ -29,6 +33,26 @@ public StreamMessage(K stream, String id, Map<K, V> body) {
this.stream = stream;
this.id = id;
this.body = body;
this.msSinceLastDelivery = null;
this.redeliveryCount = null;
}

/**
* Create a new {@link StreamMessage}.
*
* @param stream the stream.
* @param id the message id.
* @param msSinceLastDelivery the milliseconds since last delivery when CLAIM was used.
* @param redeliveryCount the number of prior deliveries when CLAIM was used.
* @param body map containing the message body.
*/
public StreamMessage(K stream, String id, Map<K, V> body, long msSinceLastDelivery, long redeliveryCount) {

this.stream = stream;
this.id = id;
this.body = body;
this.msSinceLastDelivery = msSinceLastDelivery;
this.redeliveryCount = redeliveryCount;
}

public K getStream() {
Expand All @@ -46,6 +70,37 @@ public Map<K, V> getBody() {
return body;
}

/**
* @return the milliseconds since the last delivery of this message when CLAIM was used. Default: 0. ul>
* <li>{@code null} when not applicable</li>
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
* <li>{@code > 0} means claimed from the PEL</li>
* </ul>
* @since 7.1
*/
public Long getMsSinceLastDelivery() {
return msSinceLastDelivery;
}

/**
* /**
*
* @return the number of prior deliveries of this message when CLAIM was used:
* <ul>
* <li>{@code null} when not applicable</li>
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
* <li>{@code > 0} means claimed from the PEL</li>
* </ul>
* @since 7.1
*/
public Long getRedeliveryCount() {
return redeliveryCount;
}

public boolean isClaimed() {
return redeliveryCount != null && redeliveryCount > 0;
}

@Override
public boolean equals(Object o) {
if (this == o)
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/io/lettuce/core/XReadArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class XReadArgs implements CompositeArgument {

private boolean noack;

private Long claimMinIdleTime;

/**
* Builder entry points for {@link XReadArgs}.
*/
Expand Down Expand Up @@ -90,6 +92,21 @@ public static XReadArgs noack(boolean noack) {
return new XReadArgs().noack(noack);
}

/**
* Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP.
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing JavaDoc parameters and return value documentation. The method should document:

  • @param milliseconds the minimum idle time in milliseconds
  • @return new {@link XReadArgs} with CLAIM set
  • Consider adding @see XReadArgs#claim(long) for consistency with other Builder methods
Suggested change
* Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP.
* Create a new {@link XReadArgs} and set CLAIM min-idle-time (milliseconds). Only valid for XREADGROUP.
*
* @param milliseconds the minimum idle time in milliseconds
* @return new {@link XReadArgs} with CLAIM set
* @see XReadArgs#claim(long)

Copilot uses AI. Check for mistakes.
*/
public static XReadArgs claim(long milliseconds) {
return new XReadArgs().claim(milliseconds);
}

/**
* Create a new {@link XReadArgs} and set CLAIM min-idle-time. Only valid for XREADGROUP.
*/
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing JavaDoc parameters and return value documentation. The method should document:

  • @param timeout the minimum idle time as a Duration
  • @return new {@link XReadArgs} with CLAIM set
  • Consider adding @see XReadArgs#claim(Duration) for consistency with other Builder methods
Suggested change
*/
*/
/**
* Create a new {@link XReadArgs} and set CLAIM min-idle-time. Only valid for XREADGROUP.
*
* @param timeout the minimum idle time as a {@link Duration}
* @return new {@link XReadArgs} with CLAIM set
* @see XReadArgs#claim(Duration)
*/

Copilot uses AI. Check for mistakes.
public static XReadArgs claim(Duration timeout) {
LettuceAssert.notNull(timeout, "Claim timeout must not be null");
return claim(timeout.toMillis());
}

}

/**
Expand Down Expand Up @@ -141,6 +158,29 @@ public XReadArgs noack(boolean noack) {
return this;
}

/**
* Claim idle pending messages first with a minimum idle time (milliseconds). Only valid for XREADGROUP.
*
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing JavaDoc parameters and return value documentation. The method should document:

  • @param milliseconds the minimum idle time in milliseconds
  • @return {@code this} XReadArgs instance for method chaining
Suggested change
*
*
* @param milliseconds the minimum idle time in milliseconds
* @return {@code this} XReadArgs instance for method chaining

Copilot uses AI. Check for mistakes.
* @since 7.0
*/
public XReadArgs claim(long milliseconds) {

this.claimMinIdleTime = milliseconds;
return this;
}

/**
* Claim idle pending messages first with a minimum idle time. Only valid for XREADGROUP.
*
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing JavaDoc parameters and return value documentation. The method should document:

  • @param timeout the minimum idle time as a Duration
  • @return {@code this} XReadArgs instance for method chaining
Suggested change
*
*
* @param timeout the minimum idle time as a Duration
* @return {@code this} XReadArgs instance for method chaining

Copilot uses AI. Check for mistakes.
* @since 7.0
*/
public XReadArgs claim(Duration timeout) {

LettuceAssert.notNull(timeout, "Claim timeout must not be null");

return claim(timeout.toMillis());
}

public <K, V> void build(CommandArgs<K, V> args) {

if (block != null) {
Expand All @@ -154,6 +194,10 @@ public <K, V> void build(CommandArgs<K, V> args) {
if (noack) {
args.add(CommandKeyword.NOACK);
}

if (claimMinIdleTime != null) {
args.add("CLAIM").add(claimMinIdleTime);
}
}

/**
Expand Down
34 changes: 32 additions & 2 deletions src/main/java/io/lettuce/core/output/StreamReadOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import io.lettuce.core.StreamMessage;
import io.lettuce.core.codec.RedisCodec;

import io.lettuce.core.internal.LettuceAssert;

/**
Expand All @@ -31,6 +32,10 @@ public class StreamReadOutput<K, V> extends CommandOutput<K, V, List<StreamMessa

private Map<K, V> body;

private Long msSinceLastDelivery;

private Long redeliveryCount;

private boolean bodyReceived = false;

public StreamReadOutput(RedisCodec<K, V> codec) {
Expand Down Expand Up @@ -75,6 +80,23 @@ public void set(ByteBuffer bytes) {
key = null;
}

@Override
public void set(long integer) {

// Extra integers appear only for claimed entries (XREADGROUP with CLAIM)
if (id != null && bodyReceived) {
if (msSinceLastDelivery == null) {
msSinceLastDelivery = integer;
return;
}
if (redeliveryCount == null) {
redeliveryCount = integer;
return;
}
}
super.set(integer);
}

@Override
public void multi(int count) {

Expand All @@ -91,12 +113,20 @@ public void multi(int count) {
@Override
public void complete(int depth) {

if (depth == 3 && bodyReceived) {
subscriber.onNext(output, new StreamMessage<>(stream, id, body == null ? Collections.emptyMap() : body));
// Emit the message when the entry array (id/body[/extras]) completes.
if (depth == 2 && bodyReceived) {
Map<K, V> map = body == null ? Collections.emptyMap() : body;
if (msSinceLastDelivery != null && redeliveryCount != null) {
subscriber.onNext(output, new StreamMessage<>(stream, id, map, msSinceLastDelivery, redeliveryCount));
} else {
subscriber.onNext(output, new StreamMessage<>(stream, id, map));
}
bodyReceived = false;
key = null;
body = null;
id = null;
msSinceLastDelivery = null;
redeliveryCount = null;
}

// RESP2/RESP3 compat
Expand Down
Loading
Loading