Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gradle/spring-module.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ tasks.register('javadocJar', Jar) {
from javadoc
}

nullability {
nullAwayVersion = "0.12.15"
}

publishing {
publications {
mavenJava(MavenPublication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementTy
default @Nullable T decode(DataBuffer buffer, ResolvableType targetType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {

CompletableFuture<T> future = decodeToMono(Mono.just(buffer), targetType, mimeType, hints).toFuture();
CompletableFuture<@Nullable T> future = decodeToMono(Mono.just(buffer), targetType, mimeType, hints).toFuture();
Assert.state(future.isDone(), "DataBuffer decoding should have completed");

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler

private static final byte[] EMPTY_PAYLOAD = new byte[0];

private static final CompletableFuture<Void> EMPTY_TASK = CompletableFuture.completedFuture(null);
private static final CompletableFuture<@Nullable Void> EMPTY_TASK = CompletableFuture.completedFuture(null);

private static final StompHeaderAccessor HEART_BEAT_ACCESSOR;

Expand Down Expand Up @@ -851,7 +851,7 @@ public void afterConnectionClosed() {
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
public CompletableFuture<@Nullable Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
TcpConnection<byte[]> conn = this.tcpConnection;

if (!this.isStompConnected || conn == null) {
Expand Down Expand Up @@ -887,7 +887,7 @@ else if (logger.isTraceEnabled()) {
logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
}

CompletableFuture<Void> future = conn.sendAsync((Message<byte[]>) messageToSend);
CompletableFuture<@Nullable Void> future = conn.sendAsync((Message<byte[]>) messageToSend);
future.whenComplete((unused, throwable) -> {
if (throwable == null) {
if (accessor.getCommand() == StompCommand.DISCONNECT) {
Expand Down Expand Up @@ -1067,9 +1067,9 @@ public void afterConnectionClosed() {
}

@Override
public CompletableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
public CompletableFuture<@Nullable Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
CompletableFuture<Void> future = super.forward(message, accessor);
CompletableFuture<@Nullable Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

import org.jspecify.annotations.Nullable;

import org.springframework.messaging.Message;

/**
Expand All @@ -37,7 +39,7 @@ public interface TcpConnection<P> extends Closeable {
* message was successfully sent
* @since 6.0
*/
CompletableFuture<Void> sendAsync(Message<P> message);
CompletableFuture<@Nullable Void> sendAsync(Message<P> message);

/**
* Register a task to invoke after a period of read inactivity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import java.util.concurrent.CompletableFuture;

import org.jspecify.annotations.Nullable;


/**
* A contract for establishing TCP connections.
*
Expand All @@ -34,7 +37,7 @@ public interface TcpOperations<P> {
* connection is successfully established
* @since 6.0
*/
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler);
CompletableFuture<@Nullable Void> connectAsync(TcpConnectionHandler<P> connectionHandler);

/**
* Open a new connection and a strategy for reconnecting if the connection fails.
Expand All @@ -44,14 +47,14 @@ public interface TcpOperations<P> {
* initial connection is successfully established
* @since 6.0
*/
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
CompletableFuture<@Nullable Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);

/**
* Shut down and close any open connections.
* @return a CompletableFuture that can be used to determine when and if the
* connection is successfully closed
* @since 6.0
*/
CompletableFuture<Void> shutdownAsync();
CompletableFuture<@Nullable Void> shutdownAsync();

}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public Log getLogger() {


@Override
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler) {
public CompletableFuture<@Nullable Void> connectAsync(TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");

if (this.stopping) {
Expand Down Expand Up @@ -200,7 +200,7 @@ protected TcpClient extendTcpClient(TcpClient tcpClient, TcpConnectionHandler<P>
}

@Override
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
public CompletableFuture<@Nullable Void> connectAsync(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");

Expand All @@ -209,7 +209,7 @@ public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler, Rec
}

// Report first connect to the ListenableFuture
CompletableFuture<Void> connectFuture = new CompletableFuture<>();
CompletableFuture<@Nullable Void> connectFuture = new CompletableFuture<>();

extendTcpClient(this.tcpClient, handler)
.handle(new ReactorNettyHandler(handler))
Expand All @@ -228,7 +228,7 @@ public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler, Rec
return connectFuture;
}

private CompletableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
private CompletableFuture<@Nullable Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
handler.afterConnectFailure(ex);
return Mono.<Void>error(ex).toFuture();
Expand All @@ -240,7 +240,7 @@ private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy r
}

@Override
public CompletableFuture<Void> shutdownAsync() {
public CompletableFuture<@Nullable Void> shutdownAsync() {
if (this.stopping) {
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.CompletableFuture;

import io.netty.buffer.ByteBuf;
import org.jspecify.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.NettyInbound;
Expand Down Expand Up @@ -56,7 +57,7 @@ public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,


@Override
public CompletableFuture<Void> sendAsync(Message<P> message) {
public CompletableFuture<@Nullable Void> sendAsync(Message<P> message) {
ByteBuf byteBuf = this.outbound.alloc().buffer();
this.codec.encode(message, byteBuf);
return this.outbound.send(Mono.just(byteBuf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public ParameterNameDiscoverer getParameterNameDiscoverer() {
public @Nullable HandlerResult invokeForHandlerResult(ServerWebExchange exchange,
BindingContext bindingContext, Object... providedArgs) {

CompletableFuture<HandlerResult> future =
CompletableFuture<@Nullable HandlerResult> future =
this.delegate.invoke(exchange, bindingContext, providedArgs).toFuture();

if (!future.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ public boolean supportsPartialMessages() {
// TcpConnection implementation

@Override
public CompletableFuture<Void> sendAsync(Message<byte[]> message) {
public CompletableFuture<@Nullable Void> sendAsync(Message<byte[]> message) {
updateLastWriteTime();
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<@Nullable Void> future = new CompletableFuture<>();
try {
WebSocketSession session = this.session;
Assert.state(session != null, "No WebSocketSession available");
Expand Down