Skip to content

Commit f8a0b7a

Browse files
chrisguidryclaude
andauthored
Fix task event enrichment for distributed task runners (#19178)
Co-authored-by: Claude <[email protected]>
1 parent e0b598d commit f8a0b7a

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

src/prefect/engine.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2011,9 +2011,11 @@ async def orchestrate_task_run(
20112011
)
20122012

20132013
# Emit an event to capture that the task run was in the `PENDING` state.
2014-
last_event = emit_task_run_state_change_event(
2015-
task_run=task_run, initial_state=None, validated_state=task_run.state
2016-
)
2014+
with task_run_context:
2015+
last_event = emit_task_run_state_change_event(
2016+
task_run=task_run, initial_state=None, validated_state=task_run.state
2017+
)
2018+
20172019
last_state = (
20182020
Pending()
20192021
if flow_run_context and flow_run_context.autonomous_task_run
@@ -2107,13 +2109,14 @@ async def tick():
21072109
break
21082110

21092111
# Emit an event to capture the result of proposing a `RUNNING` state.
2110-
last_event = emit_task_run_state_change_event(
2111-
task_run=task_run,
2112-
initial_state=last_state,
2113-
validated_state=state,
2114-
follows=last_event,
2115-
)
2116-
last_state = state
2112+
with task_run_context:
2113+
last_event = emit_task_run_state_change_event(
2114+
task_run=task_run,
2115+
initial_state=last_state,
2116+
validated_state=state,
2117+
follows=last_event,
2118+
)
2119+
last_state = state
21172120

21182121
# flag to ensure we only update the task run name once
21192122
run_name_set = False

tests/events/client/instrumentation/test_task_run_state_change_events.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,62 @@ def happy_path():
147147
last_state = task_run_state
148148

149149

150+
async def test_task_events_include_flow_run_and_flow_in_related_resources(
151+
asserting_events_worker: EventsWorker,
152+
reset_worker_events: None,
153+
prefect_client: PrefectClient,
154+
):
155+
"""
156+
Test that all task-run events include flow-run and flow in their related resources.
157+
158+
This validates the fix for the bug where Pending/Running events were missing
159+
related resources when FlowRunContext was not available, because they were
160+
emitted before TaskRunContext was entered.
161+
"""
162+
163+
@task
164+
def simple_task():
165+
return 42
166+
167+
@flow
168+
def simple_flow():
169+
return simple_task(return_state=True)
170+
171+
flow_state: State[State[int]] = simple_flow(return_state=True)
172+
await flow_state.result()
173+
174+
flow_run_id = flow_state.state_details.flow_run_id
175+
176+
await asserting_events_worker.drain()
177+
assert isinstance(asserting_events_worker._client, AssertingEventsClient)
178+
179+
events = [
180+
e
181+
for e in asserting_events_worker._client.events
182+
if e.event.startswith("prefect.task-run.")
183+
]
184+
assert len(events) == 3
185+
186+
pending, running, completed = events
187+
188+
# All three events should have flow-run in related resources
189+
assert "flow-run" in pending.resource_in_role
190+
assert pending.resource_in_role["flow-run"].id == f"prefect.flow-run.{flow_run_id}"
191+
192+
assert "flow-run" in running.resource_in_role
193+
assert running.resource_in_role["flow-run"].id == f"prefect.flow-run.{flow_run_id}"
194+
195+
assert "flow-run" in completed.resource_in_role
196+
assert (
197+
completed.resource_in_role["flow-run"].id == f"prefect.flow-run.{flow_run_id}"
198+
)
199+
200+
# All three events should have flow in related resources
201+
assert "flow" in pending.resource_in_role
202+
assert "flow" in running.resource_in_role
203+
assert "flow" in completed.resource_in_role
204+
205+
150206
async def test_background_task_state_changes(
151207
asserting_events_worker: EventsWorker,
152208
reset_worker_events,

0 commit comments

Comments
 (0)