diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 578b1f32da2b..75066b4d84a2 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -653,7 +653,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) numPersistedRows += sink.getNumRowsInMemory(); bytesPersisted += sink.getBytesInMemory(); - final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); + final int limit = sink.isWritable() ? Integer.max(0, hydrants.size() - 1) : hydrants.size(); // gather hydrants that have not been persisted: for (FireHydrant hydrant : hydrants.subList(0, limit)) { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 22074f4f78f6..7f43f6d43172 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -2546,6 +2546,65 @@ public void test_dropSegment_skipsUnlockInterval_ifOverlappingSinkIsActive() thr } } + /** + * Tests an edge case where a writable sink exists but has no hydrants. + *

+ * The test scenario is somewhat unrealistic, but we are trying to prevent a regression because this issue was seen in + * the wild and resulted in failed indexing tasks due to an IAE. + *

+ */ + @Test + public void testPersistAllWithEmptySink() throws Exception + { + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(100) + .basePersistDirectory(temporaryFolder.newFolder()) + .build()) { + final Appenderator appenderator = tester.getAppenderator(); + final Supplier committerSupplier = () -> new Committer() + { + @Override + public Object getMetadata() + { + return ImmutableMap.of("eventCount", 0); + } + + @Override + public void run() + { + // Do nothing + } + }; + + appenderator.startJob(); + Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory()); + + // Create a sink by adding a row, but use a timestamp outside the identifier's interval + // This will cause the row to not actually be added (filtered out by interval check) + // but the sink will be created + InputRow outOfBoundsRow = ir("1999", "foo", 1); // IDENTIFIERS.get(0) is for 2000/2001 + try { + appenderator.add(IDENTIFIERS.get(0), outOfBoundsRow, committerSupplier); + } + catch (DruidException ignored) { + // For the purposes of this test we ignore that an exception is thrown. We got our sink with no hydrants created which is what we are testing. + } + + // Verify the sink was created but is empty + List segments = appenderator.getSegments(); + if (!segments.isEmpty()) { + Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory()); + + // Despite having an empty sink, persistAll should work without error + appenderator.persistAll(committerSupplier.get()).get(); + Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory()); + } + + appenderator.close(); + } + } + private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum) { return new SegmentIdWithShardSpec(