diff --git a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java index a8521d723a3..14ba1f2aa03 100644 --- a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java +++ b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -130,6 +130,8 @@ private void clientCloseTest(int readTimeout) throws IOException { inputStream.close(); // trigger sending another 'A' to the stream; it should fail // (indicating that the streaming has been terminated on the server) + // But only the second flush causes the Exception + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); assertEquals(0, counter.get()); } diff --git a/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java b/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java index 4a38280f63f..8f7888ae39d 100644 --- a/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java +++ b/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -129,6 +129,8 @@ private void clientCloseTest(int readTimeout) throws IOException { inputStream.close(); // trigger sending another 'A' to the stream; it should fail // (indicating that the streaming has been terminated on the server) + // But only the second flush causes the Exception + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); assertEquals(0, counter.get()); } diff --git a/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java b/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java index 12aa7144d87..6b9f3009d33 100644 --- a/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java +++ b/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java @@ -17,6 +17,7 @@ package org.glassfish.jersey.io.spi; import java.io.Closeable; +import java.io.FilterOutputStream; import java.io.Flushable; import java.io.IOException; import java.io.OutputStream; @@ -27,8 +28,8 @@ * That way, {@link #flush()} method is not called twice. * *
- * Usable by {@link javax.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}. - * Usable by {@link javax.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}. + * Usable by {@link jakarta.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}. + * Usable by {@link jakarta.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}. *
* *
@@ -52,4 +53,13 @@ public interface FlushedCloseable extends Flushable, Closeable {
* @throws IOException if an I/O error occurs
*/
public void close() throws IOException;
+
+ /**
+ * Determine if the stream {@link OutputStream#flush() flushes} on {@link OutputStream#close()}.
+ * @param stream the provided {@link OutputStream}
+ * @return {@code true} if the stream ensures to call {@link OutputStream#flush()} on {@link OutputStream#close()}.
+ */
+ public static boolean flushOnClose(OutputStream stream) {
+ return FilterOutputStream.class.isInstance(stream) || FlushedCloseable.class.isInstance(stream);
+ }
}
diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java
index e9f1bc20374..f87c80da872 100644
--- a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java
+++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java
@@ -26,6 +26,7 @@
import org.glassfish.jersey.innate.VirtualThreadSupport;
import org.glassfish.jersey.internal.LocalizationMessages;
import org.glassfish.jersey.internal.guava.Preconditions;
+import org.glassfish.jersey.io.spi.FlushedCloseable;
/**
* A committing output stream with optional serialized entity buffering functionality
@@ -128,6 +129,12 @@ public void setStreamProvider(OutboundMessageContext.StreamProvider streamProvid
this.streamProvider = streamProvider;
}
+ /* package */ void flushOnClose() throws IOException {
+ if (!FlushedCloseable.flushOnClose(adaptedOutput)) {
+ flush();
+ }
+ }
+
/**
* Enable buffering of the serialized entity.
*
diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java
index b1b7745bbd1..db496827ba5 100644
--- a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java
+++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java
@@ -562,8 +562,12 @@ public void close() {
if (hasEntity()) {
try {
final OutputStream es = getEntityStream();
- if (!FlushedCloseable.class.isInstance(es)) {
- es.flush();
+ if (!FlushedCloseable.flushOnClose(es)) {
+ if (CommittingOutputStream.class.isInstance(es)) {
+ ((CommittingOutputStream) es).flushOnClose();
+ } else {
+ es.flush();
+ }
}
es.close();
} catch (IOException e) {
diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java
index ae8e7d86027..14a668e8350 100644
--- a/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java
+++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -65,7 +65,7 @@ public final class ReaderWriter {
public static final int BUFFER_SIZE = getBufferSize();
/**
- * Whether {@linkplain BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
+ * Whether {@linkplain #BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
*/
public static final boolean AUTOSIZE_BUFFER = getAutosizeBuffer();
@@ -269,9 +269,7 @@ private static byte[] readAllBytes(InputStream inputStream) throws IOException {
* @throws IOException in case of a write failure.
*/
public static void writeToAsString(String s, OutputStream out, MediaType type) throws IOException {
- Writer osw = new OutputStreamWriter(out, getCharset(type));
- osw.write(s);
- osw.flush();
+ out.write(s.getBytes(getCharset(type)));
}
/**
diff --git a/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java
new file mode 100644
index 00000000000..067d9e2bb5f
--- /dev/null
+++ b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.server;
+
+
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.StreamingOutput;
+import org.glassfish.jersey.internal.MapPropertiesDelegate;
+import org.glassfish.jersey.io.spi.FlushedCloseable;
+import org.glassfish.jersey.message.MessageBodyWorkers;
+import org.glassfish.jersey.server.RequestContextBuilder.TestContainerRequest;
+import org.glassfish.jersey.server.spi.ContainerResponseWriter;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ContainerResponseWriterNoFlushTest {
+ private static final String RESPONSE = "RESPONSE";
+ private static AtomicInteger flushCounter = new AtomicInteger(0);
+ private static class TestResponseOutputStream extends ByteArrayOutputStream implements FlushedCloseable {
+ private boolean closed = false;
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ flush();
+ super.close();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushCounter.incrementAndGet();
+ }
+ }
+
+ private static class TestContainerWriter implements ContainerResponseWriter {
+ TestResponseOutputStream outputStream;
+ private final boolean buffering;
+
+ private TestContainerWriter(boolean buffering) {
+ this.buffering = buffering;
+ }
+
+ @Override
+ public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse responseContext)
+ throws ContainerException {
+ outputStream = new TestResponseOutputStream();
+// responseContext.setEntityStream(outputStream);
+ return outputStream;
+ }
+
+ @Override
+ public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) {
+ return false;
+ }
+
+ @Override
+ public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException {
+ }
+
+ @Override
+ public void commit() {
+ }
+
+ @Override
+ public void failure(Throwable error) {
+ throw new RuntimeException(error);
+ }
+
+ @Override
+ public boolean enableResponseBuffering() {
+ return buffering;
+ }
+ }
+
+ @Path("/test")
+ public static class StreamResource {
+
+ @GET
+ @Path(value = "/stream")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response stream() {
+
+ StreamingOutput stream = output -> {
+ output.write(RESPONSE.getBytes(StandardCharsets.UTF_8));
+ };
+ return Response.ok(stream).build();
+ }
+ }
+
+ @Test
+ public void testWriterBuffering() {
+ TestContainerWriter writer = new TestContainerWriter(true);
+ testWriter(writer);
+ }
+
+ @Test
+ public void testWriterNoBuffering() {
+ TestContainerWriter writer = new TestContainerWriter(false);
+ testWriter(writer);
+ }
+
+ private void testWriter(TestContainerWriter writer) {
+ flushCounter.set(0);
+ RequestContextBuilder rcb = RequestContextBuilder.from("/test/stream", "GET");
+
+ TestContainerRequest request = rcb.new TestContainerRequest(
+ null, URI.create("/test/stream"), "GET", null, new MapPropertiesDelegate()) {
+ @Override
+ public void setWorkers(MessageBodyWorkers workers) {
+ if (workers != null) {
+ setWriter(writer);
+ }
+ super.setWorkers(workers);
+ }
+ };
+
+ ApplicationHandler applicationHandler = new ApplicationHandler(new ResourceConfig(StreamResource.class));
+ Future