2323
2424from prefect ._internal .compatibility .deprecated import PrefectDeprecationWarning
2525from prefect ._internal .concurrency .api import Call , from_async
26- from prefect ._internal .context import _EnvironmentRunContext
2726from prefect ._internal .installation import install_packages
2827from prefect ._internal .integrations import KNOWN_EXTRAS_FOR_PACKAGES
29- from prefect .events .utilities import emit_event
28+ from prefect .events .clients import get_events_client
29+ from prefect .events .schemas .events import Event , RelatedResource , Resource
3030from prefect .logging .loggers import get_logger
3131from prefect .settings import PREFECT_DEBUG_MODE
3232from prefect .utilities .importtools import import_object
@@ -147,6 +147,8 @@ async def run_steps(
147147 upstream_outputs : dict [str , Any ] | None = None ,
148148 print_function : Any = print ,
149149 deployment : Any | None = None ,
150+ flow_run : Any | None = None ,
151+ logger : Any | None = None ,
150152) -> dict [str , Any ]:
151153 upstream_outputs = deepcopy (upstream_outputs ) if upstream_outputs else {}
152154 serialized_steps : list [dict [str , Any ]] = []
@@ -162,7 +164,13 @@ async def run_steps(
162164 # Templating (which resolves blocks, variables, and env vars) happens inside
163165 # run_step(), so by serializing here we prevent secrets from leaking in events.
164166 serialized_steps .append (
165- _serialize_step_for_event (len (serialized_steps ), fqn , inputs )
167+ {
168+ "index" : len (serialized_steps ),
169+ "qualified_name" : fqn ,
170+ "step_name" : step_name ,
171+ "id" : inputs .get ("id" ),
172+ "inputs" : inputs , # Keep all inputs including reserved keywords like 'requires'
173+ }
166174 )
167175 try :
168176 # catch warnings to ensure deprecation warnings are printed
@@ -203,13 +211,21 @@ async def run_steps(
203211 upstream_outputs [inputs .get ("id" )] = step_output
204212 upstream_outputs .update (step_output )
205213 except Exception as exc :
206- _emit_pull_steps_event (
214+ await _emit_pull_steps_event (
207215 serialized_steps ,
208216 failed_step = serialized_steps [- 1 ] if serialized_steps else None ,
209217 deployment = deployment ,
218+ flow_run = flow_run ,
219+ logger = logger ,
210220 )
211221 raise StepExecutionError (f"Encountered error while running { fqn } " ) from exc
212- _emit_pull_steps_event (serialized_steps , failed_step = None , deployment = deployment )
222+ await _emit_pull_steps_event (
223+ serialized_steps ,
224+ failed_step = None ,
225+ deployment = deployment ,
226+ flow_run = flow_run ,
227+ logger = logger ,
228+ )
213229 return upstream_outputs
214230
215231
@@ -218,49 +234,30 @@ def _get_step_fully_qualified_name_and_inputs(step: dict) -> tuple[str, dict]:
218234 return step .popitem ()
219235
220236
221- def _serialize_step_for_event (
222- index : int , qualified_name : str , inputs : dict [str , Any ]
223- ) -> dict [str , Any ]:
224- """
225- Serialize a step's metadata for the pull-steps.executed event payload.
226-
227- IMPORTANT: This function must be called with PRE-TEMPLATED inputs to avoid
228- exposing secrets in event payloads. Template strings like "{{ prefect.blocks.secret.name }}"
229- are preserved, while templating (which resolves these to actual secret values)
230- happens later in run_step().
231-
232- Args:
233- index: The step's index in the execution sequence
234- qualified_name: The fully qualified name of the step function
235- inputs: The raw, pre-templated inputs for the step
236-
237- Returns:
238- A dictionary containing step metadata safe for event emission
239- """
240- sanitized_inputs = {
241- key : value for key , value in inputs .items () if key not in RESERVED_KEYWORDS
242- }
243- return {
244- "index" : index ,
245- "qualified_name" : qualified_name ,
246- "step_name" : qualified_name .split ("." )[- 1 ],
247- "id" : inputs .get ("id" ),
248- "inputs" : sanitized_inputs ,
249- }
250-
251-
252- def _emit_pull_steps_event (
237+ async def _emit_pull_steps_event (
253238 serialized_steps : list [dict [str , Any ]],
254239 * ,
255240 failed_step : dict [str , Any ] | None ,
256241 deployment : Any | None = None ,
242+ flow_run : Any | None = None ,
243+ logger : Any | None = None ,
257244) -> None :
258245 if not serialized_steps :
259246 return
260247
261- # Use _EnvironmentRunContext to access flow run ID from environment
262- ctx = _EnvironmentRunContext .from_environment ()
263- if not ctx or not ctx .flow_run_id :
248+ # If no flow_run provided, try to get flow_run_id from environment
249+ # (for backward compatibility with calls that don't pass flow_run)
250+ flow_run_id = None
251+ if flow_run :
252+ flow_run_id = flow_run .id
253+ else :
254+ from prefect ._internal .context import _EnvironmentRunContext
255+
256+ ctx = _EnvironmentRunContext .from_environment ()
257+ if ctx and ctx .flow_run_id :
258+ flow_run_id = ctx .flow_run_id
259+
260+ if not flow_run_id :
264261 return
265262
266263 payload = {
@@ -269,31 +266,42 @@ def _emit_pull_steps_event(
269266 }
270267 if failed_step is not None :
271268 payload ["failed_step" ] = failed_step
272- resource = {
273- "prefect.resource.id" : f"prefect.flow-run.{ ctx .flow_run_id } " ,
274- }
275269
276270 # Build related resources
277- related : list [dict [ str , str ] ] = []
271+ related : list [RelatedResource ] = []
278272 if deployment :
279273 # Add deployment as a related resource
280274 related .append (
281- {
282- "prefect.resource.id" : f"prefect.deployment.{ deployment .id } " ,
283- "prefect.resource.role" : "deployment" ,
284- }
275+ RelatedResource (
276+ {
277+ "prefect.resource.id" : f"prefect.deployment.{ deployment .id } " ,
278+ "prefect.resource.role" : "deployment" ,
279+ }
280+ )
285281 )
286282
287283 try :
288- emit_event (
289- event = "prefect.flow-run.pull-steps.executed" ,
290- resource = resource ,
291- related = related if related else None ,
292- payload = payload ,
293- )
284+ # Use events client directly with checkpoint_every=1 to avoid buffering issues
285+ # (similar to how Runner handles heartbeat events)
286+ async with get_events_client (checkpoint_every = 1 ) as events_client :
287+ await events_client .emit (
288+ Event (
289+ event = "prefect.flow-run.pull-steps.executed" ,
290+ resource = Resource (
291+ {
292+ "prefect.resource.id" : f"prefect.flow-run.{ flow_run_id } " ,
293+ }
294+ ),
295+ related = related ,
296+ payload = payload ,
297+ )
298+ )
294299 except Exception :
295- # Use context-aware logger for better log association
296- logger = ctx .get_logger (__name__ )
297- logger .warning (
298- "Failed to emit pull-steps event for flow run %s" , ctx .flow_run_id
299- )
300+ if logger :
301+ logger .warning (
302+ "Failed to emit pull-steps event for flow run %s" , flow_run_id
303+ )
304+ else :
305+ get_logger (__name__ ).warning (
306+ "Failed to emit pull-steps event for flow run %s" , flow_run_id
307+ )
0 commit comments