Skip to content

Commit f15bfa1

Browse files
zhaotaiclaude
andauthored
feat: Add POST /events/{handler_id} endpoint and /handlers endpoint (#89)
* feat: Add POST /events/{handler_id} endpoint and /tasks endpoint - Add GET /tasks endpoint to list running workflow tasks - Add POST /events/{handler_id} endpoint to send events to running workflows - Enable external event injection into workflow contexts - Add InteractiveWorkflow test fixture for testing event handling - Add comprehensive tests for both endpoints This allows developers to: 1. Monitor running workflows via the /tasks endpoint 2. Send events to running workflows for interactive use cases 3. Build event-driven integrations with external systems 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fix lint * refactor: Update /tasks endpoint to /handlers with enhanced status tracking - Renamed /tasks endpoint to /handlers for better naming consistency - Enhanced handler status tracking (running, completed, failed) - Added result and error fields to handler response - Fixed safe access to handler.result() and handler.exception() methods - Added comprehensive tests for all handler states The /handlers endpoint now provides complete visibility into workflow execution state. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * update comments --------- Co-authored-by: Claude <[email protected]>
1 parent 901714a commit f15bfa1

File tree

5 files changed

+414
-4
lines changed

5 files changed

+414
-4
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ dev = [
1616

1717
[project]
1818
name = "llama-index-workflows"
19-
version = "2.0.1"
19+
version = "2.1.0"
2020
description = "An event-driven, async-first, step-based way to control the execution flow of AI applications like Agents."
2121
readme = "README.md"
2222
license = "MIT"

src/workflows/server/server.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,21 @@ def __init__(self, middleware: list[Middleware] | None = None):
6969
self._stream_events,
7070
methods=["GET"],
7171
),
72+
Route(
73+
"/events/{handler_id}",
74+
self._post_event,
75+
methods=["POST"],
76+
),
7277
Route(
7378
"/health",
7479
self._health_check,
7580
methods=["GET"],
7681
),
82+
Route(
83+
"/handlers",
84+
self._get_handlers,
85+
methods=["GET"],
86+
),
7787
]
7888

7989
self.app = Starlette(routes=self._routes, middleware=self._middleware)
@@ -389,6 +399,156 @@ async def event_stream(handler: WorkflowHandler) -> AsyncGenerator[str, None]:
389399

390400
return StreamingResponse(event_stream(handler), media_type=media_type)
391401

402+
async def _get_handlers(self, request: Request) -> JSONResponse:
403+
"""
404+
---
405+
summary: Get handlers
406+
description: Returns all workflow handlers.
407+
responses:
408+
200:
409+
description: List of handlers
410+
content:
411+
application/json:
412+
schema:
413+
type: object
414+
properties:
415+
handlers:
416+
type: array
417+
items:
418+
type: object
419+
properties:
420+
handler_id:
421+
type: string
422+
result:
423+
type: object
424+
error:
425+
type: object
426+
status:
427+
type: string
428+
enum: [running, completed, failed]
429+
required: [handlers]
430+
"""
431+
handlers = []
432+
for handler_id in self._handlers.keys():
433+
handler = self._handlers[handler_id]
434+
status = "running"
435+
result = None
436+
error = None
437+
438+
if handler.done():
439+
try:
440+
result = handler.result()
441+
status = "completed"
442+
except Exception as e:
443+
error = str(e)
444+
status = "failed"
445+
446+
handler_json = {
447+
"handler_id": handler_id,
448+
"status": status,
449+
"result": result,
450+
"error": error,
451+
}
452+
handlers.append(handler_json)
453+
454+
return JSONResponse({"handlers": handlers})
455+
456+
async def _post_event(self, request: Request) -> JSONResponse:
457+
"""
458+
---
459+
summary: Send event to workflow
460+
description: Sends an event to a running workflow's context.
461+
parameters:
462+
- in: path
463+
name: handler_id
464+
required: true
465+
schema:
466+
type: string
467+
description: Workflow handler identifier.
468+
requestBody:
469+
required: true
470+
content:
471+
application/json:
472+
schema:
473+
type: object
474+
properties:
475+
event:
476+
type: string
477+
description: Serialized event in JSON format.
478+
step:
479+
type: string
480+
description: Optional target step name. If not provided, event is sent to all steps.
481+
required: [event]
482+
responses:
483+
200:
484+
description: Event sent successfully
485+
content:
486+
application/json:
487+
schema:
488+
type: object
489+
properties:
490+
status:
491+
type: string
492+
enum: [sent]
493+
required: [status]
494+
400:
495+
description: Invalid event data
496+
404:
497+
description: Handler not found
498+
409:
499+
description: Workflow already completed
500+
"""
501+
handler_id = request.path_params["handler_id"]
502+
503+
# Check if handler exists
504+
handler = self._handlers.get(handler_id)
505+
if handler is None:
506+
raise HTTPException(detail="Handler not found", status_code=404)
507+
508+
# Check if workflow is still running
509+
if handler.done():
510+
raise HTTPException(detail="Workflow already completed", status_code=409)
511+
512+
# Get the context
513+
ctx = handler.ctx
514+
if ctx is None:
515+
raise HTTPException(detail="Context not available", status_code=500)
516+
517+
# Parse request body
518+
try:
519+
body = await request.json()
520+
event_str = body.get("event")
521+
step = body.get("step")
522+
523+
if not event_str:
524+
raise HTTPException(detail="Event data is required", status_code=400)
525+
526+
# Deserialize the event
527+
serializer = JsonSerializer()
528+
try:
529+
event = serializer.deserialize(event_str)
530+
except Exception as e:
531+
raise HTTPException(
532+
detail=f"Failed to deserialize event: {e}", status_code=400
533+
)
534+
535+
# Send the event to the context
536+
try:
537+
ctx.send_event(event, step=step)
538+
except Exception as e:
539+
raise HTTPException(
540+
detail=f"Failed to send event: {e}", status_code=400
541+
)
542+
543+
return JSONResponse({"status": "sent"})
544+
545+
except HTTPException:
546+
raise
547+
except Exception as e:
548+
raise HTTPException(
549+
detail=f"Error processing request: {e}", status_code=500
550+
)
551+
392552
#
393553
# Private methods
394554
#

tests/server/conftest.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@ async def stream_data(self, ctx: Context, ev: StartEvent) -> StopEvent:
4242
return StopEvent(result=f"completed_{count}_events")
4343

4444

45+
class ExternalEvent(Event):
46+
message: str
47+
48+
49+
class InteractiveWorkflow(Workflow):
50+
@step
51+
async def start(self, ctx: Context, ev: StartEvent) -> StopEvent:
52+
# Wait for an external event
53+
external_event = await ctx.wait_for_event(ExternalEvent)
54+
return StopEvent(result=f"received: {external_event.message}")
55+
56+
4557
@pytest.fixture
4658
def simple_test_workflow() -> Workflow:
4759
return SimpleTestWorkflow()
@@ -55,3 +67,8 @@ def error_workflow() -> Workflow:
5567
@pytest.fixture
5668
def streaming_workflow() -> Workflow:
5769
return StreamingWorkflow()
70+
71+
72+
@pytest.fixture
73+
def interactive_workflow() -> Workflow:
74+
return InteractiveWorkflow()

0 commit comments

Comments
 (0)