Skip to content

Commit bd4e938

Browse files
WytheLizhangzhw8
authored andcommitted
fix(backend): batch_retry_nodes批量重试节点性能优化 #7709
# Reviewed, transaction id: 22559
1 parent dd6def8 commit bd4e938

File tree

3 files changed

+18
-11
lines changed

3 files changed

+18
-11
lines changed

dbm-ui/backend/db_services/taskflow/handlers.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,25 @@ def revoke_pipeline(self):
7575

7676
return result
7777

78-
def retry_node(self, node_id: str):
78+
def retry_node(self, node: str):
7979
"""重试节点"""
80-
return task.retry_node(root_id=self.root_id, node_id=node_id, retry_times=1)
80+
if isinstance(node, str):
81+
flow_node = FlowNode.objects.get(root_id=self.root_id, node_id=node)
82+
else:
83+
flow_node = node
84+
return task.retry_node(root_id=self.root_id, flow_node=flow_node, retry_times=1)
8185

8286
def batch_retry_nodes(self):
8387
"""批量重试节点"""
8488
node_ids = self.get_failed_node_ids()
89+
90+
flow_nodes = FlowNode.objects.filter(node_id__in=node_ids, root_id=self.root_id).all()
91+
flow_node_dict = {node.node_id: node for node in flow_nodes}
92+
8593
for node_id in node_ids:
94+
flow_node = flow_node_dict.get(node_id)
8695
try:
87-
self.retry_node(node_id)
96+
self.retry_node(flow_node)
8897
except Exception as err:
8998
logger.error(f"{node_id} retry failed, {err}")
9099

dbm-ui/backend/db_services/taskflow/task.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,25 +49,23 @@
4949

5050

5151
@shared_task
52-
def retry_node(root_id: str, node_id: str, retry_times: int) -> Union[EngineAPIResult, Any]:
52+
def retry_node(root_id: str, flow_node: FlowNode, retry_times: int) -> Union[EngineAPIResult, Any]:
5353
"""重试flow任务节点"""
5454

5555
def send_flow_state(state, _root_id, _node_id, _version_id):
5656
post_set_state.send(
5757
sender=None,
58-
node_id=node_id,
58+
node_id=flow_node.node_id,
5959
to_state=state,
6060
version=flow_node.version_id,
6161
root_id=flow_node.root_id,
6262
parent_id=None,
6363
loop=None,
6464
)
6565

66-
flow_node = FlowNode.objects.get(root_id=root_id, node_id=node_id)
67-
6866
# 实例化一个Service实例用于捕获日志到日志平台
6967
service = BaseService()
70-
service.setup_runtime_attrs(root_pipeline_id=root_id, id=node_id, version=flow_node.version_id)
68+
service.setup_runtime_attrs(root_pipeline_id=root_id, id=flow_node.node_id, version=flow_node.version_id)
7169

7270
# 限制最大重试次数
7371
if retry_times > MAX_AUTO_RETRY_TIMES:
@@ -99,14 +97,14 @@ def send_flow_state(state, _root_id, _node_id, _version_id):
9997
if retry_times == 1:
10098
send_flow_state(StateType.RUNNING, root_id, flow_node.node_id, flow_node.version_id)
10199

102-
retry_node.apply_async((root_id, node_id, retry_times + 1), countdown=RETRY_INTERVAL)
100+
retry_node.apply_async((root_id, flow_node, retry_times + 1), countdown=RETRY_INTERVAL)
103101
return EngineAPIResult(result=False, message=_("存在执行互斥将自动进行重试..."))
104102
except (Ticket.DoesNotExist, ValueError):
105103
# 如果单据不存在,则忽略校验
106104
pass
107105

108106
# 进行重试操作
109-
result = BambooEngine(root_id=root_id).retry_node(node_id=node_id)
107+
result = BambooEngine(root_id=root_id).retry_node(node_id=flow_node.node_id)
110108
if not result.result:
111109
raise RetryNodeException(str(result.exc.args))
112110

dbm-ui/backend/db_services/taskflow/views/flow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def retry_node(self, requests, *args, **kwargs):
143143

144144
root_id = kwargs["root_id"]
145145
validated_data = self.params_validate(self.get_serializer_class())
146-
return Response(TaskFlowHandler(root_id=root_id).retry_node(node_id=validated_data["node_id"]).result)
146+
return Response(TaskFlowHandler(root_id=root_id).retry_node(node=validated_data["node_id"]).result)
147147

148148
@common_swagger_auto_schema(
149149
operation_summary=_("批量重试"),

0 commit comments

Comments
 (0)