diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java index d9e7757547f5..fc715d013c0e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java @@ -35,11 +35,11 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream; import org.apache.commons.compress.compressors.snappy.SnappyCompressorOutputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; +import org.checkerframework.checker.nullness.qual.Nullable; /** Various compression types for reading/writing files. */ @SuppressWarnings("ImmutableEnumChecker") @@ -91,7 +91,7 @@ public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws byte zero = 0x00; int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); if (header == GZIPInputStream.GZIP_MAGIC) { - return Channels.newChannel(new GzipCompressorInputStream(stream, true)); + return Channels.newChannel(new ConcatenatedGzipInputStream(stream)); } } return Channels.newChannel(stream); @@ -286,6 +286,65 @@ public abstract ReadableByteChannel readDecompressed(ReadableByteChannel channel public abstract WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException; + /** + * A thread-safe input stream for reading concatenated GZIP streams. This is a replacement for + * Apache Commons Compress's GzipCompressorInputStream(stream, true), which is not thread-safe. + */ + private static class ConcatenatedGzipInputStream extends InputStream { + private final InputStream underlying; + private @Nullable GZIPInputStream currentMember; + + ConcatenatedGzipInputStream(InputStream in) throws IOException { + this.underlying = in; + this.currentMember = new GZIPInputStream(in); + } + + @Override + public int read() throws IOException { + if (currentMember == null) { + return -1; + } + int result = currentMember.read(); + if (result == -1) { + try { + currentMember = new GZIPInputStream(underlying); + return currentMember.read(); + } catch (IOException e) { + currentMember = null; + return -1; + } + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (currentMember == null) { + return -1; + } + int result = currentMember.read(b, off, len); + if (result == -1) { + try { + currentMember = new GZIPInputStream(underlying); + return currentMember.read(b, off, len); + } catch (IOException e) { + currentMember = null; + return -1; + } + } + return result; + } + + @Override + public void close() throws IOException { + if (currentMember != null) { + currentMember.close(); + currentMember = null; + } + underlying.close(); + } + } + /** Concatenates all {@link ZipInputStream}s contained within the zip file. */ private static class FullZipInputStream extends InputStream { private ZipInputStream zipInputStream;