Skip to content

Commit f457146

Browse files
authored
Merge branch 'dev' into fix-constants-invalid
2 parents f8312c0 + 4d7f927 commit f457146

File tree

26 files changed

+259
-854
lines changed

26 files changed

+259
-854
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,6 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task
399399
taskDefinitionToUpdate.setUserId(taskDefinition.getUserId());
400400
taskDefinitionToUpdate.setVersion(++version);
401401
taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
402-
taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
403402
taskDefinitionToUpdate.setUpdateTime(now);
404403
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
405404
taskDefinitionToUpdate.setOperator(loginUser.getId());

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,12 @@ public interface IWorkflowExecutionGraph {
182182
/**
183183
* Whether there exist the TaskExecutionRunnable chain in the graph is finish with paused.
184184
*/
185-
boolean isExistPauseTaskExecutionRunnableChain();
185+
boolean isExistPausedTaskExecutionRunnableChain();
186186

187187
/**
188188
* Whether there exist the TaskExecutionRunnable chain in the graph is finish with kill.
189189
*/
190-
boolean isExistKillTaskExecutionRunnableChain();
190+
boolean isExistKilledTaskExecutionRunnableChain();
191191

192192
/**
193193
* Check whether the given task is the end of the task chain.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ public boolean isAllTaskExecutionRunnableChainSuccess() {
204204
return false;
205205
}
206206
return !isExistFailureTaskExecutionRunnableChain()
207-
&& !isExistPauseTaskExecutionRunnableChain()
208-
&& !isExistKillTaskExecutionRunnableChain();
207+
&& !isExistPausedTaskExecutionRunnableChain()
208+
&& !isExistKilledTaskExecutionRunnableChain();
209209
}
210210

211211
@Override
@@ -214,12 +214,12 @@ public boolean isExistFailureTaskExecutionRunnableChain() {
214214
}
215215

216216
@Override
217-
public boolean isExistPauseTaskExecutionRunnableChain() {
217+
public boolean isExistPausedTaskExecutionRunnableChain() {
218218
return CollectionUtils.isNotEmpty(pausedTaskChains);
219219
}
220220

221221
@Override
222-
public boolean isExistKillTaskExecutionRunnableChain() {
222+
public boolean isExistKilledTaskExecutionRunnableChain() {
223223
return CollectionUtils.isNotEmpty(killedTaskChains);
224224
}
225225

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyPauseStateAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,13 @@ protected void emitWorkflowFinishedEventIfApplicable(final IWorkflowExecutionRun
120120
}
121121

122122
final WorkflowEventBus workflowEventBus = workflowExecutionRunnable.getWorkflowEventBus();
123-
if (workflowExecutionGraph.isExistFailureTaskExecutionRunnableChain()) {
124-
workflowEventBus.publish(WorkflowFailedLifecycleEvent.of(workflowExecutionRunnable));
123+
if (workflowExecutionGraph.isExistPausedTaskExecutionRunnableChain()) {
124+
workflowEventBus.publish(WorkflowPausedLifecycleEvent.of(workflowExecutionRunnable));
125125
return;
126126
}
127127

128-
if (workflowExecutionGraph.isExistPauseTaskExecutionRunnableChain()) {
129-
workflowEventBus.publish(WorkflowPausedLifecycleEvent.of(workflowExecutionRunnable));
128+
if (workflowExecutionGraph.isExistFailureTaskExecutionRunnableChain()) {
129+
workflowEventBus.publish(WorkflowFailedLifecycleEvent.of(workflowExecutionRunnable));
130130
return;
131131
}
132132

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowReadyStopStateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ protected void emitWorkflowFinishedEventIfApplicable(final IWorkflowExecutionRun
120120
}
121121

122122
final WorkflowEventBus workflowEventBus = workflowExecutionRunnable.getWorkflowEventBus();
123-
if (workflowExecutionGraph.isExistKillTaskExecutionRunnableChain()) {
123+
if (workflowExecutionGraph.isExistKilledTaskExecutionRunnableChain()) {
124124
workflowEventBus.publish(WorkflowStoppedLifecycleEvent.of(workflowExecutionRunnable));
125125
return;
126126
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@ public void onStopEvent(final IWorkflowExecutionRunnable workflowExecutionRunnab
8484
public void onStoppedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
8585
final WorkflowStoppedLifecycleEvent workflowStoppedEvent) {
8686
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
87-
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStoppedEvent);
87+
// [Fix-17354]
88+
if (!workflowExecutionRunnable.getWorkflowExecutionGraph().isExistKilledTaskExecutionRunnableChain()) {
89+
throw new IllegalStateException(
90+
"The workflow: " + workflowExecutionRunnable.getName()
91+
+ " does not exist tasks chain which is killed");
92+
}
93+
super.workflowFinish(workflowExecutionRunnable, WorkflowExecutionStatus.STOP);
8894
}
8995

9096
@Override
@@ -142,13 +148,23 @@ protected void emitWorkflowFinishedEventIfApplicable(IWorkflowExecutionRunnable
142148
return;
143149
}
144150

151+
// [Fix-17354]
152+
// If there exist tasks which has set timeout failed, then will publish a kill event to kill the task.
153+
// So there might exist task which is killed, and the workflow instance state is running.
154+
// This is a special case, the workflow instance can transform from running to stop state.
155+
// Is there better way to handle this case?
156+
if (workflowExecutionGraph.isExistKilledTaskExecutionRunnableChain()) {
157+
workflowEventBus.publish(WorkflowStoppedLifecycleEvent.of(workflowExecutionRunnable));
158+
return;
159+
}
160+
145161
if (workflowExecutionGraph.isAllTaskExecutionRunnableChainSuccess()) {
146162
workflowEventBus.publish(WorkflowSucceedLifecycleEvent.of(workflowExecutionRunnable));
147163
return;
148164
}
149165

150166
throw new IllegalStateException("The workflow: " + workflowExecutionRunnable.getName() +
151167
" state is " + workflowExecutionRunnable.getState()
152-
+ " can only finish with success/failed but exist task which state is not success and failure");
168+
+ " can only finish with task success/failed/killed but exist task which state is not successfailure、killed");
153169
}
154170
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowTestCaseContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,14 @@ public WorkflowDefinition getOneWorkflow() {
6161
}
6262
return workflows.get(0);
6363
}
64+
65+
public WorkflowDefinition getWorkflow(String name) {
66+
if (CollectionUtils.isEmpty(workflows)) {
67+
throw new IllegalStateException("workflows is empty");
68+
}
69+
return workflows.stream()
70+
.filter(workflow -> workflow.getName().equals(name))
71+
.findFirst()
72+
.orElseThrow(() -> new IllegalStateException("Workflow with name " + name + " not found"));
73+
}
6474
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,4 +1004,35 @@ public void testStartWorkflow_usingWorkflowBuiltInParam() {
10041004
});
10051005
masterContainer.assertAllResourceReleased();
10061006
}
1007+
1008+
@Test
1009+
@DisplayName("Test start a workflow which contains a dep task with timeout kill strategy")
1010+
public void testStartWorkflow_withTimeoutKillTask() {
1011+
final String yaml = "/it/start/workflow_with_timeout_kill_task.yaml";
1012+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
1013+
final WorkflowDefinition workflow = context.getWorkflow("workflow_with_timeout_kill_task");
1014+
1015+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
1016+
.workflowDefinition(workflow)
1017+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
1018+
.build();
1019+
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
1020+
1021+
await()
1022+
.atMost(Duration.ofSeconds(90))
1023+
.untilAsserted(() -> {
1024+
Assertions
1025+
.assertThat(repository.queryWorkflowInstance(workflow))
1026+
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
1027+
.isEqualTo(WorkflowExecutionStatus.STOP));
1028+
Assertions
1029+
.assertThat(repository.queryTaskInstance(workflow))
1030+
.hasSize(1)
1031+
.satisfiesExactly(taskInstance -> {
1032+
assertThat(taskInstance.getName()).isEqualTo("dep_task_with_timeout_killed");
1033+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
1034+
});
1035+
});
1036+
masterContainer.assertAllResourceReleased();
1037+
}
10071038
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_fake_task_success
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a fake workflow with single task
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
- name: workflow_with_timeout_kill_task
39+
code: 2
40+
version: 1
41+
projectCode: 1
42+
description: This is a fake workflow with single timeout task
43+
releaseState: ONLINE
44+
createTime: 2024-08-12 00:00:00
45+
updateTime: 2021-08-12 00:00:00
46+
userId: 1
47+
executionType: PARALLEL
48+
49+
tasks:
50+
- name: B
51+
code: 1
52+
version: 1
53+
projectCode: 1
54+
userId: 1
55+
taskType: LogicFakeTask
56+
taskParams: '{"localParams":[],"shellScript":"if [ \"${system.project.name}\" = \"MasterIntegrationTest\" ]; then\n exit 0 \nelse\n exit 1\nfi","resourceList":[]}'
57+
workerGroup: default
58+
createTime: 2024-08-12 00:00:00
59+
updateTime: 2021-08-12 00:00:00
60+
taskExecuteType: BATCH
61+
- name: dep_task_with_timeout_killed
62+
code: 2
63+
version: 1
64+
projectCode: 1
65+
userId: 1
66+
timeoutFlag: 'OPEN'
67+
timeoutNotifyStrategy: 'FAILED'
68+
timeout: 1
69+
taskType: DEPENDENT
70+
taskParams: '{"localParams":[],"resourceList":[],"dependence":{"checkInterval":10,"failurePolicy":"DEPENDENT_FAILURE_FAILURE","relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"dependentType":"DEPENDENT_ON_WORKFLOW","projectCode":1,"definitionCode":1,"depTaskCode":0,"cycle":"day","dateValue":"last1Days"}]}]}}'
71+
workerGroup: default
72+
createTime: 2024-08-12 00:00:00
73+
updateTime: 2021-08-12 00:00:00
74+
taskExecuteType: BATCH
75+
76+
taskRelations:
77+
- projectCode: 1
78+
workflowDefinitionCode: 1
79+
workflowDefinitionVersion: 1
80+
preTaskCode: 0
81+
preTaskVersion: 0
82+
postTaskCode: 1
83+
postTaskVersion: 1
84+
createTime: 2024-08-12 00:00:00
85+
updateTime: 2024-08-12 00:00:00
86+
- projectCode: 1
87+
workflowDefinitionCode: 2
88+
workflowDefinitionVersion: 1
89+
preTaskCode: 0
90+
preTaskVersion: 0
91+
postTaskCode: 2
92+
postTaskVersion: 1
93+
createTime: 2024-08-12 00:00:00
94+
updateTime: 2024-08-12 00:00:00
95+

dolphinscheduler-master/src/test/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
<file>${taskInstanceLogFullPath}</file>
4444
<encoder>
4545
<pattern>
46-
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n
46+
%date{yyyy-MM-dd HH:mm:ss.SSS} %-5level - %message%n
4747
</pattern>
4848
<charset>UTF-8</charset>
4949
</encoder>

0 commit comments

Comments
 (0)