Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
numPersistedRows += sink.getNumRowsInMemory();
bytesPersisted += sink.getBytesInMemory();

final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
final int limit = sink.isWritable() ? Integer.max(0, hydrants.size() - 1) : hydrants.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a small UT for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out how to accomplish it. So for full disclosure the test case I came up with is courtesy of Claude coming up with a way to force the empty list of hydrants scenario. Confirmed that without the fix it is an IAE and with the change, no exception.


// gather hydrants that have not been persisted:
for (FireHydrant hydrant : hydrants.subList(0, limit)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2546,6 +2546,65 @@ public void test_dropSegment_skipsUnlockInterval_ifOverlappingSinkIsActive() thr
}
}

/**
* Tests an edge case where a writable sink exists but has no hydrants.
* <p>
* The test scenario is somewhat unrealistic, but we are trying to prevent a regression because this issue was seen in
* the wild and resulted in failed indexing tasks due to an IAE.
* </p>
*/
Comment on lines +2549 to +2555
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the javadoc here can be omitted.
Also, since it was already reported, I think it is a valid scenario.

@Test
public void testPersistAllWithEmptySink() throws Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: update the method name to reflect the expected behaviour with the suffix IsNoop or DoesNotFail, etc.

Suggested change
public void testPersistAllWithEmptySink() throws Exception
public void testPersistAllWithEmptySinkIsNoop() throws Exception

{
try (
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.basePersistDirectory(temporaryFolder.newFolder())
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final Supplier<Committer> committerSupplier = () -> new Committer()
{
@Override
public Object getMetadata()
{
return ImmutableMap.of("eventCount", 0);
}

@Override
public void run()
{
// Do nothing
}
};

appenderator.startJob();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());

// Create a sink by adding a row, but use a timestamp outside the identifier's interval
// This will cause the row to not actually be added (filtered out by interval check)
// but the sink will be created
InputRow outOfBoundsRow = ir("1999", "foo", 1); // IDENTIFIERS.get(0) is for 2000/2001
try {
appenderator.add(IDENTIFIERS.get(0), outOfBoundsRow, committerSupplier);
}
catch (DruidException ignored) {
// For the purposes of this test we ignore that an exception is thrown. We got our sink with no hydrants created which is what we are testing.
}

// Verify the sink was created but is empty
List<SegmentIdWithShardSpec> segments = appenderator.getSegments();
if (!segments.isEmpty()) {
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());

// Despite having an empty sink, persistAll should work without error
appenderator.persistAll(committerSupplier.get()).get();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
}

appenderator.close();
}
}

private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(
Expand Down
Loading