Skip to content

Implement StreamingHttpResponse for Server Sent Events (EventSource) or ReadableStream #36

@philrdubois

Description

@philrdubois

As discussed in an off-topic discussion in issue #32, djapify and async_djapify can be implemented to work with streams, with minor tweaks to ResponseParser, AsyncResponseParser, SyncDjapifyDecorator and AsyncDjapifyDecorator. It can be as easy as having the view function simply yield results, making it a generator, and return None when it is complete.

Do note that, while StreamingHttpResponse supports synchronous operations, performing long-running streams will lock up the process, and most WSGI implementations will timeout and cut off the stream after 30 seconds.

Here's the updated parse_data function from ResponseParse, along with additional functions _parse_data as a helper function to eliminate repeated code, is_sse to determine if the input schema calls for a Server Sent Events output format and format_sse_response to format the output if so.

class ResponseParser(BaseParser):
   # Untouched functions omitted for clarity.
   def _parse_data(self, data: Any) -> Dict[str, Any]:
      """Helper function to parse and validate response data."""
      if isinstance(data, BaseModel):  # Direct return if Pydantic model
         return data.model_dump(mode="json", by_alias=True)

      model = self._create_model()
      validated = model.model_validate({JSON_OUTPUT_PARSE_NAME: data}, context={**self._context, "input_data": self.input_data})
      return validated.model_dump(mode="json", by_alias=True)[JSON_OUTPUT_PARSE_NAME]

   def parse_data(self) -> Dict[str, Any] | Generator:
      """Parse and validate response data."""
      if not inspect.isgenerator(self.data):
         # Standard HTTP response
         return self._parse_data(self.data)

      else:
         # Streaming HTTP response: wrap the generator and pass it along
         def wrapper():
            for partial_data in self.data:
               # Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
               partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]

               yield self._parse_data(partial_data_content)

         return wrapper()

   def is_sse(self, response: Dict[str, Any]) -> bool:
      """Examine parsed schema to determine if Server Sent Events format is desired in place of Readable Stream format."""
      # Check for disallowed keys
      allowed_keys = {"data", "id", "event", "retry"}
      if any(key not in allowed_keys for key in response):
         return False
      # Check if "data" is a dictionary
      if "data" not in response or not isinstance(response["data"], dict):
         return False
      # Check if "id" is a string, an integer, or None
      if "id" in response and not (isinstance(response["id"], (str, int)) or response["id"] is None):
         return False
      # Check if "event" is a string or None
      if "event" in response and not (isinstance(response["event"], str) or response["event"] is None):
         return False
      # Check if "retry" is an integer or None
      if "retry" in response and not (isinstance(response["retry"], int) or response["retry"] is None):
         return False

      return True

   def format_sse_response(self, response: Dict[str, Any]) -> str:
      """If Server Sent Events format is desired, process the data into the appropriate format."""
      response_string = ""

      if "id" in response and response["id"] is not None:
         response_string += f"id: {response["id"]}\n"
      if "event" in response and response["event"] is not None:
         response_string += f"event: {response["event"]}\n"
      if "retry" in response and response["retry"] is not None:
         response_string += f"retry: {response["retry"]}\n"

      return response_string + f"data: {json.dumps(response["data"], cls=DjangoJSONEncoder)}\n\n"

AsyncResponseParser would then need to have parse_data updated to account for asynchronous views:

class AsyncResponseParser(ResponseParser):
   async def parse_data(self) -> Dict[str, Any] | AsyncGenerator:
      """Async version of response data parsing."""
      if not inspect.isasyncgen(self.data):
         # Standard HTTP response
         return await sync_to_async(self._parse_data)(self.data)

      else:
         # Streaming HTTP response: wrap the generator and pass it along
         _iterator = self.data  # Save a local copy in case it is re-assigned

         async def awrapper():
            async for partial_data in _iterator:
               # Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
               partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]

               yield await sync_to_async(self._parse_data)(partial_data_content)

         return awrapper()

With the updated parsers, the decorators can now work with the parsed data:

class SyncDjapifyDecorator(BaseDjapifyDecorator):
   def __call__(self, view_func: WrappedViewT = None):
      if view_func is None:
         return lambda v: self.__call__(v)

      @wraps(view_func)
      def wrapped_view(request: HttpRequest, *args, **kwargs):
         self._prepare(view_func)

         if msg := self.check_access(request, view_func, *args, **kwargs):
            return msg

         try:
            if not inspect.isgeneratorfunction(view_func):
               # Standard HTTP response
               response = HttpResponse(content_type="application/json")
            else:
               # Streaming HTTP response
               response = StreamingHttpResponse(content_type="application/stream+json")

            # Use sync request parser
            req_p = RequestParser(request, view_func, kwargs)
            data = req_p.parse_data()

            if view_func.djapy_resp_param:
               data[view_func.djapy_resp_param.name] = response

            content = view_func(request, *args, **data)

            # Use sync response parser
            res_p = ResponseParser(
               request=request,
               status=200 if not isinstance(content, tuple) else content[0],
               data=content if not isinstance(content, tuple) else content[1],
               schemas=view_func.schema,
               input_data=data,
            )

            result = res_p.parse_data()

            if not inspect.isgenerator(result):
               # Standard HTTP response
               if isinstance(content, tuple):
                  response.status_code = content[0]
               response.content = json.dumps(result, cls=DjangoJSONEncoder)
            else:
               # Streaming HTTP response
               if isinstance(content, tuple):
                  response.status_code = content[0]

               # Wrap the generator and pass it along
               def wrapper():
                  for partial_result in result:
                     if res_p.is_sse(partial_result):
                        # Server Sent Events response
                        response.content_type = "text/event-stream"
                        yield res_p.format_sse_response(partial_result)

                     else:
                        # Readable Stream response
                        yield json.dumps(partial_result, cls=DjangoJSONEncoder)

               response.streaming_content = wrapper()
            return response

         except Exception as exc:
            return self.handle_error(request, exc)

      self._set_common_attributes(wrapped_view, view_func)
      return wrapped_view


class AsyncDjapifyDecorator(BaseDjapifyDecorator):
   def __call__(self, view_func: WrappedViewT = None):
      if view_func is None:
         return lambda v: self.__call__(v)

      if not (asyncio.iscoroutinefunction(view_func) or inspect.isasyncgenfunction):
         raise ValueError(f"View function {view_func.__name__} must be async")

      @wraps(view_func)
      async def wrapped_view(request: HttpRequest, *args, **kwargs):
         self._prepare(view_func)

         if msg := await sync_to_async(self.check_access)(request, view_func, *args, **kwargs):
            return msg

         try:
            if not inspect.isasyncgenfunction(view_func):
               # Standard HTTP response
               response = HttpResponse(content_type="application/json")
            else:
               # Streaming HTTP response
               response = StreamingHttpResponse(content_type="application/stream+json")

            # Use async request parser
            parser = AsyncRequestParser(request, view_func, kwargs)
            data = await parser.parse_data()

            if view_func.djapy_resp_param:
               data[view_func.djapy_resp_param.name] = response

            if not inspect.isasyncgenfunction(view_func):
               # Standard HTTP response: obtain the function's response asynchronously
               content = await view_func(request, *args, **data)
            else:
               # Streaming HTTP response: obtain the generator synchronously
               content = view_func(request, *args, **data)

            # Use async response parser
            parser = AsyncResponseParser(
               request=request,
               status=200 if not isinstance(content, tuple) else content[0],
               data=content if not isinstance(content, tuple) else content[1],
               schemas=view_func.schema,
               input_data=data,
            )

            result = await parser.parse_data()

            if not inspect.isasyncgen(result):
               # Standard HTTP response
               if isinstance(content, tuple):
                  response.status_code = content[0]
               response.content = json.dumps(result, cls=DjangoJSONEncoder)
            else:
               # Streaming HTTP response
               if isinstance(content, tuple):
                  response.status_code = content[0]

               # Wrap the generator and pass it along
               async def awrapper():
                  async for partial_result in result:
                     if await sync_to_async(parser.is_sse)(partial_result):
                        # Server Sent Events response
                        response.content_type = "text/event-stream"
                        yield parser.format_sse_response(partial_result)

                     else:
                        # Readable Stream response
                        yield json.dumps(partial_result, cls=DjangoJSONEncoder)

               response.streaming_content = awrapper()
            return response

         except Exception as exc:
            return await sync_to_async(self.handle_error)(request, exc)

      self._set_common_attributes(wrapped_view, view_func)
      return wrapped_view

Now, the user can implement their views as follows:

class IteratorSchema(Schema):
   test_data: str


@async_djapify
async def receive_updates_async(request) -> {200: IteratorSchema}:
   for i in range(10):
      yield 200, IteratorSchema(test_data=f"test string {i}.")
      await asyncio.sleep(1)
   return


@djapify
def receive_updates_sync(request) -> {200: IteratorSchema}:
   for i in range(10):
      yield 200, IteratorSchema(test_data=f"test string {i}.")
      time.sleep(1)
   return


class ServerSentEvents(Schema):
   # If the user wants to implement EventSource with the Server Sent Events formatting, note that only the data field
   # is required, and no additional fields can be included.
   id: str | int | None
   event: str | None
   data: IteratorSchema
   retry: int | None


@async_djapify
async def receive_updates_sse_async(request) -> {200: ServerSentEvents}:
   for i in range(10):
      yield 200, ServerSentEvents(
         data=IteratorSchema(test_data=f"test string {i}."),
         id=i,
         event=f"Event #{i}",
         retry=None,
      )
      await asyncio.sleep(1)
   return


@djapify
def receive_updates_sse_sync(request) -> {200: ServerSentEvents}:
   for i in range(10):
      yield 200, ServerSentEvents(
         data=IteratorSchema(test_data=f"test string {i}."),
         id=i,
         event=f"Event #{i}",
         retry=None,
      )
      time.sleep(1)
   return

I've tested this code with swagger, and everything works, except that it only outputs the yielded data once return has been called, and it doesn't know how to process the data so it outputs it in raw format. This is a limitation with swagger, and there has been an issue opened since 2018 that has since been ignored.

I haven't tested this with yielding ORM Model or Queryset objects yet, but I suspect it would work since the underlying data parsing logic hasn't changed.

I'm also not familiar with the implication of data[view_func.djapy_resp_param.name] = response and if it matters that the content_type is changed afterward for Server Sent Events.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requesthelp wantedExtra attention is needed

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions