|
| 1 | +--- |
| 2 | +sidebar: |
| 3 | + order: 5 |
| 4 | +title: Concurrent execution of workflows |
| 5 | +--- |
| 6 | + |
| 7 | +In addition to looping, branching, and streaming, workflows can run steps concurrently. This is useful when you have multiple steps that can be run independently of each other and they have time-consuming operations that they `await`, allowing other steps to run in parallel. |
| 8 | + |
| 9 | +## Emitting multiple events |
| 10 | + |
| 11 | +To emit multiple events to trigger multiple steps, you can use `ctx.send_event()`: |
| 12 | + |
| 13 | +```python |
| 14 | +import asyncio |
| 15 | +from workflows import Workflow, Context, step |
| 16 | +from workflows.events import Event, StartEvent, StopEvent |
| 17 | + |
| 18 | +class StepTwoEvent(Event): |
| 19 | + query: str |
| 20 | + |
| 21 | +class ParallelFlow(Workflow): |
| 22 | + @step |
| 23 | + async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None: |
| 24 | + ctx.send_event(StepTwoEvent(query="Query 1")) |
| 25 | + ctx.send_event(StepTwoEvent(query="Query 2")) |
| 26 | + ctx.send_event(StepTwoEvent(query="Query 3")) |
| 27 | + |
| 28 | + @step(num_workers=4) |
| 29 | + async def step_two(self, ev: StepTwoEvent) -> StopEvent: |
| 30 | + print("Running slow query ", ev.query) |
| 31 | + await asyncio.sleep(random.randint(0, 5)) |
| 32 | + |
| 33 | + return StopEvent(result=ev.query) |
| 34 | +``` |
| 35 | + |
| 36 | +In this example, our `start` step emits 3 `StepTwoEvent`s. The `step_two` step is decorated with `num_workers=4`, which tells the workflow to run up to 4 instances of this step concurrently (this is the default). |
| 37 | + |
| 38 | +## Collecting events |
| 39 | + |
| 40 | +If you execute the previous example, you'll note that the workflow stops after whichever query is first to complete. Sometimes that's useful, but other times you'll want to wait for all your slow operations to complete before moving on to another step. You can do this using `collect_events`: |
| 41 | + |
| 42 | +```python |
| 43 | +import asyncio |
| 44 | +from workflows import Workflow, Context, step |
| 45 | +from workflows.events import Event, StartEvent, StopEvent |
| 46 | + |
| 47 | +class StepTwoEvent(Event): |
| 48 | + query: str |
| 49 | + |
| 50 | +class StepThreeEvent(Event): |
| 51 | + result: str |
| 52 | + |
| 53 | +class ConcurrentFlow(Workflow): |
| 54 | + @step |
| 55 | + async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent | None: |
| 56 | + ctx.send_event(StepTwoEvent(query="Query 1")) |
| 57 | + ctx.send_event(StepTwoEvent(query="Query 2")) |
| 58 | + ctx.send_event(StepTwoEvent(query="Query 3")) |
| 59 | + |
| 60 | + @step(num_workers=4) |
| 61 | + async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent: |
| 62 | + print("Running query ", ev.query) |
| 63 | + await asyncio.sleep(random.randint(1, 5)) |
| 64 | + return StepThreeEvent(result=ev.query) |
| 65 | + |
| 66 | + @step |
| 67 | + async def step_three( |
| 68 | + self, ctx: Context, ev: StepThreeEvent |
| 69 | + ) -> StopEvent | None: |
| 70 | + # wait until we receive 3 events |
| 71 | + result = ctx.collect_events(ev, [StepThreeEvent] * 3) |
| 72 | + if result is None: |
| 73 | + return None |
| 74 | + |
| 75 | + # do something with all 3 results together |
| 76 | + print(result) |
| 77 | + return StopEvent(result="Done") |
| 78 | +``` |
| 79 | + |
| 80 | +The `collect_events` method lives on the `Context` and takes the event that triggered the step and an array of event types to wait for. In this case, we are awaiting 3 events of the same `StepThreeEvent` type. |
| 81 | + |
| 82 | +The `step_three` step is fired every time a `StepThreeEvent` is received, but `collect_events` will return `None` until all 3 events have been received. At that point, the step will continue and you can do something with all 3 results together. |
| 83 | + |
| 84 | +The `result` returned from `collect_events` is an array of the events that were collected, in the order that they were received. |
| 85 | + |
| 86 | +## Multiple event types |
| 87 | + |
| 88 | +Of course, you do not need to wait for the same type of event. You can wait for any combination of events you like, such as in this example: |
| 89 | + |
| 90 | +```python |
| 91 | +import asyncio |
| 92 | +from workflows import Workflow, Context, step |
| 93 | +from workflows.events import Event, StartEvent, StopEvent |
| 94 | + |
| 95 | +class StepAEvent(Event): |
| 96 | + query: str |
| 97 | + |
| 98 | +class StepBEvent(Event): |
| 99 | + query: str |
| 100 | + |
| 101 | +class StepCEvent(Event): |
| 102 | + query: str |
| 103 | + |
| 104 | +class StepACompleteEvent(Event): |
| 105 | + result: str |
| 106 | + |
| 107 | +class StepBCompleteEvent(Event): |
| 108 | + result: str |
| 109 | + |
| 110 | +class StepCCompleteEvent(Event): |
| 111 | + result: str |
| 112 | + |
| 113 | + |
| 114 | +class ConcurrentFlow(Workflow): |
| 115 | + @step |
| 116 | + async def start( |
| 117 | + self, ctx: Context, ev: StartEvent |
| 118 | + ) -> StepAEvent | StepBEvent | StepCEvent | None: |
| 119 | + ctx.send_event(StepAEvent(query="Query 1")) |
| 120 | + ctx.send_event(StepBEvent(query="Query 2")) |
| 121 | + ctx.send_event(StepCEvent(query="Query 3")) |
| 122 | + |
| 123 | + @step |
| 124 | + async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent: |
| 125 | + print("Doing something A-ish") |
| 126 | + return StepACompleteEvent(result=ev.query) |
| 127 | + |
| 128 | + @step |
| 129 | + async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent: |
| 130 | + print("Doing something B-ish") |
| 131 | + return StepBCompleteEvent(result=ev.query) |
| 132 | + |
| 133 | + @step |
| 134 | + async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent: |
| 135 | + print("Doing something C-ish") |
| 136 | + return StepCCompleteEvent(result=ev.query) |
| 137 | + |
| 138 | + @step |
| 139 | + async def step_three( |
| 140 | + self, |
| 141 | + ctx: Context, |
| 142 | + ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent, |
| 143 | + ) -> StopEvent: |
| 144 | + print("Received event ", ev.result) |
| 145 | + |
| 146 | + # wait until we receive 3 events |
| 147 | + if ( |
| 148 | + ctx.collect_events( |
| 149 | + ev, |
| 150 | + [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent], |
| 151 | + ) |
| 152 | + is None |
| 153 | + ): |
| 154 | + return None |
| 155 | + |
| 156 | + # do something with all 3 results together |
| 157 | + return StopEvent(result="Done") |
| 158 | +``` |
| 159 | + |
| 160 | +There are several changes we've made to handle multiple event types: |
| 161 | + |
| 162 | +- `start` is now declared as emitting 3 different event types |
| 163 | +- `step_three` is now declared as accepting 3 different event types |
| 164 | +- `collect_events` now takes an array of the event types to wait for |
| 165 | + |
| 166 | +Note that the order of the event types in the array passed to `collect_events` is important. The events will be returned in the order they are passed to `collect_events`, regardless of when they were received. |
| 167 | + |
| 168 | +The visualization of this workflow is quite pleasing: |
| 169 | + |
| 170 | + |
0 commit comments