diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 0156d101ed8db..7db9c957fe0d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -61,6 +61,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -2911,49 +2913,50 @@ public void initializeIfNeeded() { @Test public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() { - final Map> assignment = mkMap( - mkEntry(taskId00, taskId00Partitions) - ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public void completeRestoration(final java.util.function.Consumer> offsetResetter) { - throw new TimeoutException("timeout!"); - } - }; - - when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00)); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RESTORING) + .build(); - taskManager.handleAssignment(assignment, emptyMap()); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - assertThat(task00.state(), is(Task.State.CREATED)); + when(stateUpdater.restoresActiveTasks()).thenReturn(true); + when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00)); + final TimeoutException timeoutException = new TimeoutException("timeout!"); + doThrow(timeoutException).when(task00).completeRestoration(any()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); + final boolean restorationComplete = taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); - assertThat(task00.state(), is(Task.State.RESTORING)); - assertThat( - taskManager.activeTaskMap(), - Matchers.equalTo(mkMap(mkEntry(taskId00, task00))) - ); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(changeLogReader).enforceRestoreActive(); + assertFalse(restorationComplete); + verify(task00).completeRestoration(any()); + verify(stateUpdater).add(task00); + verify(tasks, never()).addTask(task00); verifyNoInteractions(consumer); } @Test public void shouldSuspendActiveTasksDuringRevocation() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenReturn(offsets); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.state(), is(Task.State.SUSPENDED)); + + verify(task00).prepareCommit(true); + verify(task00).postCommit(true); + verify(task00).suspend(); } @SuppressWarnings("removal") @@ -3041,218 +3044,233 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo @Test public void shouldCommitAllNeededTasksOnHandleRevocation() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + // revoked task that needs commit + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets00); - task00.setCommitNeeded(); + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenReturn(offsets00); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + // non revoked task that needs commit + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); final Map offsets01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null)); - task01.setCommittableOffsetsAndMetadata(offsets01); - task01.setCommitNeeded(); + when(task01.commitNeeded()).thenReturn(true); + when(task01.prepareCommit(true)).thenReturn(offsets01); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); - final Map offsets02 = singletonMap(t1p2, new OffsetAndMetadata(2L, null)); - task02.setCommittableOffsetsAndMetadata(offsets02); + // non revoked task that does NOT need commit + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .withInputPartitions(taskId02Partitions) + .inState(State.RUNNING) + .build(); + when(task02.commitNeeded()).thenReturn(false); - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + // standby task (not be affected by revocation) + final StandbyTask task03 = standbyTask(taskId03, taskId03ChangelogPartitions) + .withInputPartitions(taskId03Partitions) + .inState(State.RUNNING) + .build(); final Map expectedCommittedOffsets = new HashMap<>(); expectedCommittedOffsets.putAll(offsets00); expectedCommittedOffsets.putAll(offsets01); - final Map> assignmentActive = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions), - mkEntry(taskId02, taskId02Partitions) - ); - - final Map> assignmentStandby = mkMap( - mkEntry(taskId10, taskId10Partitions) - ); - when(consumer.assignment()).thenReturn(assignment); - - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))) - .thenReturn(asList(task00, task01, task02)); - when(standbyTaskCreator.createTasks(assignmentStandby)) - .thenReturn(singletonList(task10)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, task03)); - taskManager.handleAssignment(assignmentActive, assignmentStandby); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(task02.state(), is(Task.State.RUNNING)); - assertThat(task10.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.commitNeeded, is(false)); - assertThat(task00.commitPrepared, is(true)); - assertThat(task01.commitNeeded, is(false)); - assertThat(task01.commitPrepared, is(true)); - assertThat(task02.commitPrepared, is(false)); - assertThat(task10.commitPrepared, is(false)); + // both tasks needing commit had prepareCommit called + verify(task00).prepareCommit(true); + verify(task01).prepareCommit(true); + verify(task02, never()).prepareCommit(anyBoolean()); + verify(task03, never()).prepareCommit(anyBoolean()); verify(consumer).commitSync(expectedCommittedOffsets); - } - - @Test - public void shouldNotCommitIfNoRevokedTasksNeedCommitting() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); - task01.setCommitNeeded(); - - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); - - final Map> assignmentActive = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions), - mkEntry(taskId02, taskId02Partitions) - ); - - when(consumer.assignment()).thenReturn(assignment); - - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))) - .thenReturn(asList(task00, task01, task02)); - - taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(task02.state(), is(Task.State.RUNNING)); + // revoked task suspended + verify(task00).suspend(); + verify(task00).postCommit(true); - taskManager.handleRevocation(taskId00Partitions); + // non-revoked task with commit was also post-committed (but not suspended) + verify(task01).postCommit(false); + verify(task01, never()).suspend(); - assertThat(task00.commitPrepared, is(false)); - assertThat(task01.commitPrepared, is(false)); - assertThat(task02.commitPrepared, is(false)); + // task02 and task03 should not be affected + verify(task02, never()).postCommit(anyBoolean()); + verify(task02, never()).suspend(); + verify(task03, never()).postCommit(anyBoolean()); + verify(task03, never()).suspend(); } - @Test - public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() { - final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); - - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); - task01.setCommitNeeded(); + @ParameterizedTest + @EnumSource(ProcessingMode.class) + public void shouldNotCommitIfNoRevokedTasksNeedCommitting(final ProcessingMode processingMode) { + // task00 being revoked, no commit needed + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); + // task01 NOT being revoked, commit needed + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); - final Map> assignmentActive = mkMap( - mkEntry(taskId00, taskId00Partitions), - mkEntry(taskId01, taskId01Partitions), - mkEntry(taskId02, taskId02Partitions) - ); + // task02 NOT being revoked, no commit needed + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .withInputPartitions(taskId02Partitions) + .inState(State.RUNNING) + .build(); - when(consumer.assignment()).thenReturn(assignment); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02)); - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))) - .thenReturn(asList(task00, task01, task02)); + when(task00.commitNeeded()).thenReturn(false); + when(task01.commitNeeded()).thenReturn(true); // only task01 needs commit + when(task02.commitNeeded()).thenReturn(false); - taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task01.state(), is(Task.State.RUNNING)); - assertThat(task02.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(processingMode, tasks); taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.commitPrepared, is(false)); - assertThat(task01.commitPrepared, is(false)); - assertThat(task02.commitPrepared, is(false)); + verify(task00, never()).prepareCommit(anyBoolean()); + verify(task01, never()).prepareCommit(anyBoolean()); + verify(task02, never()).prepareCommit(anyBoolean()); + + verify(task00).suspend(); + verify(task01, never()).suspend(); + verify(task02, never()).suspend(); } @Test public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets00); - task00.setCommitNeeded(); - - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); - final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); - final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - when(consumer.assignment()).thenReturn(assignment); + when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); + when(stateUpdater.tasks()).thenReturn(Set.of(task01)); - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00)); - when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10)); + final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); + final Map> assignmentStandby = singletonMap(taskId01, taskId01Partitions); taskManager.handleAssignment(assignmentActive, assignmentStandby); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task10.state(), is(Task.State.RUNNING)); - taskManager.handleAssignment(assignmentActive, assignmentStandby); + // active task stays in task manager + verify(tasks, never()).removeTask(task00); + verify(task00, never()).prepareCommit(anyBoolean()); + verify(task00, never()).postCommit(anyBoolean()); + + // standby task not removed from state updater + verify(stateUpdater, never()).remove(task01.id()); + verify(task01, never()).prepareCommit(anyBoolean()); + verify(task01, never()).postCommit(anyBoolean()); - assertThat(task00.commitNeeded, is(true)); - assertThat(task10.commitPrepared, is(false)); + verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } @Test public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets00); - task00.setCommitNeeded(); - - final StateMachineTask task10 = new StateMachineTask(taskId10, taskId10Partitions, false, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); - final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); - final Map> assignmentStandby = singletonMap(taskId10, taskId10Partitions); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - when(consumer.assignment()).thenReturn(assignment); + when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); + when(stateUpdater.tasks()).thenReturn(Set.of(task01)); - when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00)); - when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10)); + // mock to remove standby task from state updater + final CompletableFuture future = new CompletableFuture<>(); + when(stateUpdater.remove(task01.id())).thenReturn(future); + future.complete(new StateUpdater.RemovedTaskResult(task01)); - taskManager.handleAssignment(assignmentActive, assignmentStandby); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task10.state(), is(Task.State.RUNNING)); + final Map> assignmentActive = singletonMap(taskId00, taskId00Partitions); taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); - assertThat(task00.commitNeeded, is(true)); + verify(task00, never()).prepareCommit(anyBoolean()); + verify(task00, never()).postCommit(anyBoolean()); + + verify(stateUpdater).remove(task01.id()); + verify(task01).suspend(); + verify(task01).closeClean(); + + verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } @Test public void shouldNotCommitCreatedTasksOnRevocationOrClosure() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId00Partitions) + .build(); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + when(activeTaskCreator.createTasks(consumer, taskId00Assignment)) + .thenReturn(singletonList(task00)); taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(task00.state(), is(Task.State.CREATED)); + verify(tasks).addPendingTasksToInit(singletonList(task00)); + // when handle revocation is called, the tasks in pendingTasksToInit are NOT affected + // by revocation. They remain in the pending queue untouched taskManager.handleRevocation(taskId00Partitions); - assertThat(task00.state(), is(Task.State.SUSPENDED)); + // tasks in pendingTasksToInit are not managed by handleRevocation + verify(task00, never()).suspend(); + verify(task00, never()).prepareCommit(anyBoolean()); + + when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00)); + + // this calls handleTasksPendingInitialization() + // which drains pendingTasksToInit and closes those tasks taskManager.handleAssignment(emptyMap(), emptyMap()); - assertThat(task00.state(), is(Task.State.CLOSED)); + + // close clean without ever being committed + verify(task00).closeClean(); + verify(task00, never()).prepareCommit(anyBoolean()); } @Test public void shouldPassUpIfExceptionDuringSuspend() { - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public void suspend() { - super.suspend(); - throw new RuntimeException("KABOOM!"); - } - }; + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); + doThrow(new RuntimeException("KABOOM!")).when(task00).suspend(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(RuntimeException.class, () -> taskManager.handleRevocation(taskId00Partitions)); - assertThat(task00.state(), is(Task.State.SUSPENDED)); + + verify(task00).suspend(); } @Test @@ -4805,9 +4823,26 @@ public void shouldConvertStandbyTaskToActiveTask() { @Test public void shouldListNotPausedTasks() { - handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap()); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); + + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00, task01)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + when(stateUpdater.tasks()).thenReturn(Collections.emptySet()); assertEquals(2, taskManager.notPausedTasks().size()); + assertTrue(taskManager.notPausedTasks().containsKey(taskId00)); + assertTrue(taskManager.notPausedTasks().containsKey(taskId01)); topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);