Skip to content

Commit a87d0cd

Browse files
desertaxleclaude
andauthored
Fix run_deployment execution graph display when called from tasks (#19361)
Co-authored-by: Claude <[email protected]>
1 parent 6127fa8 commit a87d0cd

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/prefect/deployments/flow_runs.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import prefect
99
from prefect._result_records import ResultRecordMetadata
10-
from prefect.client.schemas import FlowRun
10+
from prefect.client.schemas import FlowRun, TaskRunResult
1111
from prefect.client.utilities import inject_client
1212
from prefect.context import FlowRunContext, TaskRunContext
1313
from prefect.logging import get_logger
@@ -136,6 +136,21 @@ async def run_deployment(
136136
k: await collect_task_run_inputs(v) for k, v in parameters.items()
137137
}
138138

139+
# Track parent task if this is being called from within a task
140+
# This enables the execution graph to properly display the deployment
141+
# flow run as nested under the calling task
142+
if task_run_ctx:
143+
# The task run is only considered a parent if it is in the same
144+
# flow run (otherwise the child is in a subflow, so the subflow
145+
# serves as the parent) or if there is no flow run
146+
if not flow_run_ctx or (
147+
task_run_ctx.task_run.flow_run_id
148+
== getattr(flow_run_ctx.flow_run, "id", None)
149+
):
150+
task_inputs["__parents__"] = [
151+
TaskRunResult(id=task_run_ctx.task_run.id)
152+
]
153+
139154
if deployment_id:
140155
flow = await client.read_flow(deployment.flow_id)
141156
deployment_name = f"{flow.name}/{deployment.name}"

tests/deployment/test_flow_runs.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,53 @@ async def foo():
445445
]
446446
}
447447

448+
async def test_tracks_parent_task_when_called_from_task(
449+
self, test_deployment, use_hosted_api_server, prefect_client
450+
):
451+
"""
452+
Test that when run_deployment is called from within a task,
453+
the wrapper task run tracks the calling task as a parent
454+
in task_inputs["__parents__"].
455+
456+
This is important for the execution graph to correctly display
457+
the deployment flow run as nested under the calling task.
458+
459+
Regression test for https://github.com/PrefectHQ/prefect/issues/19359
460+
"""
461+
deployment = test_deployment
462+
463+
@task
464+
async def trigger_deployment():
465+
return await run_deployment(
466+
f"foo/{deployment.name}",
467+
timeout=0,
468+
poll_interval=0,
469+
)
470+
471+
@flow
472+
async def parent_flow():
473+
return await trigger_deployment(return_state=True)
474+
475+
parent_state = await parent_flow(return_state=True)
476+
calling_task_state = await parent_state.result()
477+
child_flow_run = await calling_task_state.result()
478+
479+
# The wrapper task run should exist and be linked to the child flow run
480+
assert child_flow_run.parent_task_run_id is not None
481+
wrapper_task_run = await prefect_client.read_task_run(
482+
child_flow_run.parent_task_run_id
483+
)
484+
485+
# The wrapper task run should have the calling task as its parent
486+
# in the __parents__ field, which enables proper execution graph display
487+
assert "__parents__" in wrapper_task_run.task_inputs
488+
assert wrapper_task_run.task_inputs["__parents__"] == [
489+
TaskRunResult(
490+
input_type="task_run",
491+
id=calling_task_state.state_details.task_run_id,
492+
)
493+
]
494+
448495
async def test_propagates_otel_trace_to_deployment_flow_run(
449496
self,
450497
test_deployment: DeploymentResponse,

0 commit comments

Comments
 (0)