Skip to content

Commit 2de91c7

Browse files
Simplify schema for send event to handler in WorkflowServer (#129)
1 parent 85d3985 commit 2de91c7

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

src/workflows/server/server.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,8 @@ async def _post_event(self, request: Request) -> JSONResponse:
845845
event:
846846
type: string
847847
description: Serialized event in JSON format.
848+
examples:
849+
{"type": "event_name", "data": {"key": "value"}}
848850
step:
849851
type: string
850852
description: Optional target step name. If not provided, event is sent to all steps.
@@ -876,6 +878,10 @@ async def _post_event(self, request: Request) -> JSONResponse:
876878
raise HTTPException(detail="Handler not found", status_code=404)
877879

878880
handler = wrapper.run_handler
881+
events_by_title = {
882+
e.__name__: e for e in self._workflows[wrapper.workflow_name].events
883+
}
884+
879885
# Check if workflow is still running
880886
if handler.done():
881887
raise HTTPException(detail="Workflow already completed", status_code=409)
@@ -897,7 +903,22 @@ async def _post_event(self, request: Request) -> JSONResponse:
897903
# Deserialize the event
898904
serializer = JsonSerializer()
899905
try:
900-
event = serializer.deserialize(event_str)
906+
event_data = serializer.deserialize(event_str)
907+
if (
908+
isinstance(event_data, dict)
909+
and "type" in event_data
910+
and "data" in event_data
911+
):
912+
event_title = event_data["type"]
913+
event = events_by_title[event_title].model_validate(
914+
event_data["data"]
915+
)
916+
elif isinstance(event_data, Event):
917+
event = event_data
918+
else:
919+
raise ValueError(
920+
"Invalid event data. Should be a dictionary of {'type': 'event_type', 'data': {...}} or a serialized event"
921+
)
901922
except Exception as e:
902923
raise HTTPException(
903924
detail=f"Failed to deserialize event: {e}", status_code=400

tests/server/test_server_endpoints.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,31 @@ async def test_post_event_to_running_workflow(client: AsyncClient) -> None:
711711
assert result["result"] == "received: Hello from test"
712712

713713

714+
@pytest.mark.asyncio
715+
async def test_post_event_simple_schema_to_running_workflow(
716+
client: AsyncClient,
717+
) -> None:
718+
# Start an interactive workflow
719+
response = await client.post("/workflows/interactive/run-nowait", json={})
720+
assert response.status_code == 200
721+
handler_id = response.json()["handler_id"]
722+
723+
# Wait a bit for workflow to start
724+
await asyncio.sleep(0.1)
725+
726+
# Send the event
727+
event_str = '{"type": "ExternalEvent", "data": {"response": "Hello from test"}}'
728+
response = await client.post(f"/events/{handler_id}", json={"event": event_str})
729+
assert response.status_code == 200
730+
assert response.json() == {"status": "sent"}
731+
732+
result = await wait_for_passing(
733+
lambda: validate_result_response(handler_id, client)
734+
)
735+
736+
assert result["result"] == "received: Hello from test"
737+
738+
714739
@pytest.mark.asyncio
715740
async def test_get_workflow_result_returns_202_when_pending(
716741
client: AsyncClient,
@@ -787,7 +812,9 @@ async def test_post_event_context_not_available(
787812
client: AsyncClient, server: WorkflowServer
788813
) -> None:
789814
# Dumb test for code coverage. Inject a dummy handler with no context to trigger 500 path
790-
wrapper = SimpleNamespace(run_handler=SimpleNamespace(done=lambda: False, ctx=None))
815+
wrapper = SimpleNamespace(
816+
run_handler=SimpleNamespace(done=lambda: False, ctx=None), workflow_name="test"
817+
)
791818

792819
handler_id = "noctx-1"
793820
server._handlers[handler_id] = wrapper # type: ignore[assignment]

0 commit comments

Comments
 (0)