Skip to content

Commit f7e3f55

Browse files
committed
revert shouldNotCompleteRestorationIfTasksCannotInitialize to original
1 parent f634443 commit f7e3f55

File tree

1 file changed

+26
-22
lines changed

1 file changed

+26
-22
lines changed

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2874,36 +2874,40 @@ public void shouldAddNewActiveTasks() {
28742874

28752875
@Test
28762876
public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
2877-
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
2878-
.withInputPartitions(taskId00Partitions)
2879-
.inState(State.CREATED)
2880-
.build();
2881-
2882-
final TasksRegistry tasks = mock(TasksRegistry.class);
2883-
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
28842877
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
2885-
mkEntry(taskId00, taskId00Partitions)
2878+
mkEntry(taskId00, taskId00Partitions),
2879+
mkEntry(taskId01, taskId01Partitions)
28862880
);
2881+
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
2882+
@Override
2883+
public void initializeIfNeeded() {
2884+
throw new LockException("can't lock");
2885+
}
2886+
};
2887+
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
2888+
@Override
2889+
public void initializeIfNeeded() {
2890+
throw new TimeoutException("timed out");
2891+
}
2892+
};
28872893

2888-
when(activeTaskCreator.createTasks(any(), eq(assignment)))
2889-
.thenReturn(singletonList(task00));
2890-
taskManager.handleAssignment(assignment, emptyMap());
2894+
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
28912895

2892-
verify(tasks).addPendingTasksToInit(singletonList(task00));
2896+
taskManager.handleAssignment(assignment, emptyMap());
28932897

2894-
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
2895-
final LockException lockException = new LockException("can't lock");
2896-
doThrow(lockException).when(task00).initializeIfNeeded();
2897-
when(tasks.hasPendingTasksToInit()).thenReturn(true);
2898+
assertThat(task00.state(), is(Task.State.CREATED));
2899+
assertThat(task01.state(), is(Task.State.CREATED));
28982900

2899-
final boolean restorationComplete = taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
2901+
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
29002902

2901-
assertFalse(restorationComplete);
2902-
verify(task00).initializeIfNeeded();
2903-
verify(tasks, times(2)).addPendingTasksToInit(
2904-
argThat(tasksToInit -> tasksToInit.contains(task00))
2903+
assertThat(task00.state(), is(Task.State.CREATED));
2904+
assertThat(task01.state(), is(Task.State.CREATED));
2905+
assertThat(
2906+
taskManager.activeTaskMap(),
2907+
Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01)))
29052908
);
2906-
verify(stateUpdater, never()).add(task00);
2909+
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
2910+
verify(changeLogReader).enforceRestoreActive();
29072911
verifyNoInteractions(consumer);
29082912
}
29092913

0 commit comments

Comments
 (0)