Skip to content

Commit 9004467

Browse files
zzstoatzzclaude
andcommitted
address review feedback: use EventsClient directly, pass params, inline serialization
This commit addresses PR review comments #1-4 and implements the requested change from comment #5: 1. Keep reserved keywords (like 'requires') in serialized inputs ✅ 2. Use `get_events_client(checkpoint_every=1)` directly instead of `emit_event` to avoid buffering issues ✅ 3. Pass `flow_run` and `logger` as parameters instead of using context ✅ 4. Inline step serialization logic directly in `run_steps` ✅ 5. Emit **one event per step** instead of one aggregate event ✅ Changes: - Removed `_EnvironmentRunContext` class and tests (over-engineered) - Read `flow_run_id` directly from `os.getenv("PREFECT__FLOW_RUN_ID")` - Changed from single aggregate event to individual events per step: - `prefect.flow-run.pull-step.executed` for each successful step - `prefect.flow-run.pull-step.failed` when a step fails - Updated all tests to expect one event per step - Event payload now contains individual step data (index, qualified_name, step_name, id, inputs) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 4f2e4ee commit 9004467

File tree

4 files changed

+133
-364
lines changed

4 files changed

+133
-364
lines changed

src/prefect/_internal/context.py

Lines changed: 0 additions & 108 deletions
This file was deleted.

src/prefect/deployments/steps/core.py

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -151,27 +151,27 @@ async def run_steps(
151151
logger: Any | None = None,
152152
) -> dict[str, Any]:
153153
upstream_outputs = deepcopy(upstream_outputs) if upstream_outputs else {}
154-
serialized_steps: list[dict[str, Any]] = []
154+
step_index = 0
155155
for step in steps:
156156
if not step:
157157
continue
158158
fqn, inputs = _get_step_fully_qualified_name_and_inputs(step)
159159
step_name = fqn.split(".")[-1]
160160
print_function(f" > Running {step_name} step...")
161+
161162
# SECURITY: Serialize inputs BEFORE running the step (and thus before templating).
162163
# This ensures that the event payload contains template strings like
163164
# "{{ prefect.blocks.secret.api-key }}" rather than resolved secret values.
164165
# Templating (which resolves blocks, variables, and env vars) happens inside
165166
# run_step(), so by serializing here we prevent secrets from leaking in events.
166-
serialized_steps.append(
167-
{
168-
"index": len(serialized_steps),
169-
"qualified_name": fqn,
170-
"step_name": step_name,
171-
"id": inputs.get("id"),
172-
"inputs": inputs, # Keep all inputs including reserved keywords like 'requires'
173-
}
174-
)
167+
serialized_step = {
168+
"index": step_index,
169+
"qualified_name": fqn,
170+
"step_name": step_name,
171+
"id": inputs.get("id"),
172+
"inputs": inputs, # Keep all inputs including reserved keywords like 'requires'
173+
}
174+
175175
try:
176176
# catch warnings to ensure deprecation warnings are printed
177177
with warnings.catch_warnings(record=True) as w:
@@ -210,22 +210,27 @@ async def run_steps(
210210
if inputs.get("id"):
211211
upstream_outputs[inputs.get("id")] = step_output
212212
upstream_outputs.update(step_output)
213+
214+
# Emit success event for this step
215+
await _emit_pull_step_event(
216+
serialized_step,
217+
event_type="prefect.flow-run.pull-step.executed",
218+
deployment=deployment,
219+
flow_run=flow_run,
220+
logger=logger,
221+
)
213222
except Exception as exc:
214-
await _emit_pull_steps_event(
215-
serialized_steps,
216-
failed_step=serialized_steps[-1] if serialized_steps else None,
223+
# Emit failure event for this step
224+
await _emit_pull_step_event(
225+
serialized_step,
226+
event_type="prefect.flow-run.pull-step.failed",
217227
deployment=deployment,
218228
flow_run=flow_run,
219229
logger=logger,
220230
)
221231
raise StepExecutionError(f"Encountered error while running {fqn}") from exc
222-
await _emit_pull_steps_event(
223-
serialized_steps,
224-
failed_step=None,
225-
deployment=deployment,
226-
flow_run=flow_run,
227-
logger=logger,
228-
)
232+
233+
step_index += 1
229234
return upstream_outputs
230235

231236

@@ -234,43 +239,32 @@ def _get_step_fully_qualified_name_and_inputs(step: dict) -> tuple[str, dict]:
234239
return step.popitem()
235240

236241

237-
async def _emit_pull_steps_event(
238-
serialized_steps: list[dict[str, Any]],
242+
async def _emit_pull_step_event(
243+
serialized_step: dict[str, Any],
239244
*,
240-
failed_step: dict[str, Any] | None,
245+
event_type: str,
241246
deployment: Any | None = None,
242247
flow_run: Any | None = None,
243248
logger: Any | None = None,
244249
) -> None:
245-
if not serialized_steps:
246-
return
247-
248-
# If no flow_run provided, try to get flow_run_id from environment
249-
# (for backward compatibility with calls that don't pass flow_run)
250+
# Get flow_run_id from flow_run param or environment
250251
flow_run_id = None
251252
if flow_run:
252253
flow_run_id = flow_run.id
253254
else:
254-
from prefect._internal.context import _EnvironmentRunContext
255+
# Read directly from environment variable
256+
flow_run_id_str = os.getenv("PREFECT__FLOW_RUN_ID")
257+
if flow_run_id_str:
258+
from uuid import UUID
255259

256-
ctx = _EnvironmentRunContext.from_environment()
257-
if ctx and ctx.flow_run_id:
258-
flow_run_id = ctx.flow_run_id
260+
flow_run_id = UUID(flow_run_id_str)
259261

260262
if not flow_run_id:
261263
return
262264

263-
payload = {
264-
"count": len(serialized_steps),
265-
"steps": serialized_steps,
266-
}
267-
if failed_step is not None:
268-
payload["failed_step"] = failed_step
269-
270265
# Build related resources
271266
related: list[RelatedResource] = []
272267
if deployment:
273-
# Add deployment as a related resource
274268
related.append(
275269
RelatedResource(
276270
{
@@ -285,22 +279,22 @@ async def _emit_pull_steps_event(
285279
async with get_events_client(checkpoint_every=1) as events_client:
286280
await events_client.emit(
287281
Event(
288-
event="prefect.flow-run.pull-steps.executed",
282+
event=event_type,
289283
resource=Resource(
290284
{
291285
"prefect.resource.id": f"prefect.flow-run.{flow_run_id}",
292286
}
293287
),
294288
related=related,
295-
payload=payload,
289+
payload=serialized_step,
296290
)
297291
)
298292
except Exception:
299293
if logger:
300294
logger.warning(
301-
"Failed to emit pull-steps event for flow run %s", flow_run_id
295+
"Failed to emit pull-step event for flow run %s", flow_run_id
302296
)
303297
else:
304298
get_logger(__name__).warning(
305-
"Failed to emit pull-steps event for flow run %s", flow_run_id
299+
"Failed to emit pull-step event for flow run %s", flow_run_id
306300
)

0 commit comments

Comments
 (0)