-
-
Notifications
You must be signed in to change notification settings - Fork 6
Description
As discussed in an off-topic discussion in issue #32, djapify and async_djapify can be implemented to work with streams, with minor tweaks to ResponseParser, AsyncResponseParser, SyncDjapifyDecorator and AsyncDjapifyDecorator. It can be as easy as having the view function simply yield results, making it a generator, and return None when it is complete.
Do note that, while StreamingHttpResponse supports synchronous operations, performing long-running streams will lock up the process, and most WSGI implementations will timeout and cut off the stream after 30 seconds.
Here's the updated parse_data function from ResponseParse, along with additional functions _parse_data as a helper function to eliminate repeated code, is_sse to determine if the input schema calls for a Server Sent Events output format and format_sse_response to format the output if so.
class ResponseParser(BaseParser):
# Untouched functions omitted for clarity.
def _parse_data(self, data: Any) -> Dict[str, Any]:
"""Helper function to parse and validate response data."""
if isinstance(data, BaseModel): # Direct return if Pydantic model
return data.model_dump(mode="json", by_alias=True)
model = self._create_model()
validated = model.model_validate({JSON_OUTPUT_PARSE_NAME: data}, context={**self._context, "input_data": self.input_data})
return validated.model_dump(mode="json", by_alias=True)[JSON_OUTPUT_PARSE_NAME]
def parse_data(self) -> Dict[str, Any] | Generator:
"""Parse and validate response data."""
if not inspect.isgenerator(self.data):
# Standard HTTP response
return self._parse_data(self.data)
else:
# Streaming HTTP response: wrap the generator and pass it along
def wrapper():
for partial_data in self.data:
# Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]
yield self._parse_data(partial_data_content)
return wrapper()
def is_sse(self, response: Dict[str, Any]) -> bool:
"""Examine parsed schema to determine if Server Sent Events format is desired in place of Readable Stream format."""
# Check for disallowed keys
allowed_keys = {"data", "id", "event", "retry"}
if any(key not in allowed_keys for key in response):
return False
# Check if "data" is a dictionary
if "data" not in response or not isinstance(response["data"], dict):
return False
# Check if "id" is a string, an integer, or None
if "id" in response and not (isinstance(response["id"], (str, int)) or response["id"] is None):
return False
# Check if "event" is a string or None
if "event" in response and not (isinstance(response["event"], str) or response["event"] is None):
return False
# Check if "retry" is an integer or None
if "retry" in response and not (isinstance(response["retry"], int) or response["retry"] is None):
return False
return True
def format_sse_response(self, response: Dict[str, Any]) -> str:
"""If Server Sent Events format is desired, process the data into the appropriate format."""
response_string = ""
if "id" in response and response["id"] is not None:
response_string += f"id: {response["id"]}\n"
if "event" in response and response["event"] is not None:
response_string += f"event: {response["event"]}\n"
if "retry" in response and response["retry"] is not None:
response_string += f"retry: {response["retry"]}\n"
return response_string + f"data: {json.dumps(response["data"], cls=DjangoJSONEncoder)}\n\n"AsyncResponseParser would then need to have parse_data updated to account for asynchronous views:
class AsyncResponseParser(ResponseParser):
async def parse_data(self) -> Dict[str, Any] | AsyncGenerator:
"""Async version of response data parsing."""
if not inspect.isasyncgen(self.data):
# Standard HTTP response
return await sync_to_async(self._parse_data)(self.data)
else:
# Streaming HTTP response: wrap the generator and pass it along
_iterator = self.data # Save a local copy in case it is re-assigned
async def awrapper():
async for partial_data in _iterator:
# Note that this parsing needs to be done here since the generator doesn't run in the decorator until it's passed on to StreamingHttpResponse
partial_data_content = partial_data if not isinstance(partial_data, tuple) else partial_data[1]
yield await sync_to_async(self._parse_data)(partial_data_content)
return awrapper()With the updated parsers, the decorators can now work with the parsed data:
class SyncDjapifyDecorator(BaseDjapifyDecorator):
def __call__(self, view_func: WrappedViewT = None):
if view_func is None:
return lambda v: self.__call__(v)
@wraps(view_func)
def wrapped_view(request: HttpRequest, *args, **kwargs):
self._prepare(view_func)
if msg := self.check_access(request, view_func, *args, **kwargs):
return msg
try:
if not inspect.isgeneratorfunction(view_func):
# Standard HTTP response
response = HttpResponse(content_type="application/json")
else:
# Streaming HTTP response
response = StreamingHttpResponse(content_type="application/stream+json")
# Use sync request parser
req_p = RequestParser(request, view_func, kwargs)
data = req_p.parse_data()
if view_func.djapy_resp_param:
data[view_func.djapy_resp_param.name] = response
content = view_func(request, *args, **data)
# Use sync response parser
res_p = ResponseParser(
request=request,
status=200 if not isinstance(content, tuple) else content[0],
data=content if not isinstance(content, tuple) else content[1],
schemas=view_func.schema,
input_data=data,
)
result = res_p.parse_data()
if not inspect.isgenerator(result):
# Standard HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
response.content = json.dumps(result, cls=DjangoJSONEncoder)
else:
# Streaming HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
# Wrap the generator and pass it along
def wrapper():
for partial_result in result:
if res_p.is_sse(partial_result):
# Server Sent Events response
response.content_type = "text/event-stream"
yield res_p.format_sse_response(partial_result)
else:
# Readable Stream response
yield json.dumps(partial_result, cls=DjangoJSONEncoder)
response.streaming_content = wrapper()
return response
except Exception as exc:
return self.handle_error(request, exc)
self._set_common_attributes(wrapped_view, view_func)
return wrapped_view
class AsyncDjapifyDecorator(BaseDjapifyDecorator):
def __call__(self, view_func: WrappedViewT = None):
if view_func is None:
return lambda v: self.__call__(v)
if not (asyncio.iscoroutinefunction(view_func) or inspect.isasyncgenfunction):
raise ValueError(f"View function {view_func.__name__} must be async")
@wraps(view_func)
async def wrapped_view(request: HttpRequest, *args, **kwargs):
self._prepare(view_func)
if msg := await sync_to_async(self.check_access)(request, view_func, *args, **kwargs):
return msg
try:
if not inspect.isasyncgenfunction(view_func):
# Standard HTTP response
response = HttpResponse(content_type="application/json")
else:
# Streaming HTTP response
response = StreamingHttpResponse(content_type="application/stream+json")
# Use async request parser
parser = AsyncRequestParser(request, view_func, kwargs)
data = await parser.parse_data()
if view_func.djapy_resp_param:
data[view_func.djapy_resp_param.name] = response
if not inspect.isasyncgenfunction(view_func):
# Standard HTTP response: obtain the function's response asynchronously
content = await view_func(request, *args, **data)
else:
# Streaming HTTP response: obtain the generator synchronously
content = view_func(request, *args, **data)
# Use async response parser
parser = AsyncResponseParser(
request=request,
status=200 if not isinstance(content, tuple) else content[0],
data=content if not isinstance(content, tuple) else content[1],
schemas=view_func.schema,
input_data=data,
)
result = await parser.parse_data()
if not inspect.isasyncgen(result):
# Standard HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
response.content = json.dumps(result, cls=DjangoJSONEncoder)
else:
# Streaming HTTP response
if isinstance(content, tuple):
response.status_code = content[0]
# Wrap the generator and pass it along
async def awrapper():
async for partial_result in result:
if await sync_to_async(parser.is_sse)(partial_result):
# Server Sent Events response
response.content_type = "text/event-stream"
yield parser.format_sse_response(partial_result)
else:
# Readable Stream response
yield json.dumps(partial_result, cls=DjangoJSONEncoder)
response.streaming_content = awrapper()
return response
except Exception as exc:
return await sync_to_async(self.handle_error)(request, exc)
self._set_common_attributes(wrapped_view, view_func)
return wrapped_viewNow, the user can implement their views as follows:
class IteratorSchema(Schema):
test_data: str
@async_djapify
async def receive_updates_async(request) -> {200: IteratorSchema}:
for i in range(10):
yield 200, IteratorSchema(test_data=f"test string {i}.")
await asyncio.sleep(1)
return
@djapify
def receive_updates_sync(request) -> {200: IteratorSchema}:
for i in range(10):
yield 200, IteratorSchema(test_data=f"test string {i}.")
time.sleep(1)
return
class ServerSentEvents(Schema):
# If the user wants to implement EventSource with the Server Sent Events formatting, note that only the data field
# is required, and no additional fields can be included.
id: str | int | None
event: str | None
data: IteratorSchema
retry: int | None
@async_djapify
async def receive_updates_sse_async(request) -> {200: ServerSentEvents}:
for i in range(10):
yield 200, ServerSentEvents(
data=IteratorSchema(test_data=f"test string {i}."),
id=i,
event=f"Event #{i}",
retry=None,
)
await asyncio.sleep(1)
return
@djapify
def receive_updates_sse_sync(request) -> {200: ServerSentEvents}:
for i in range(10):
yield 200, ServerSentEvents(
data=IteratorSchema(test_data=f"test string {i}."),
id=i,
event=f"Event #{i}",
retry=None,
)
time.sleep(1)
returnI've tested this code with swagger, and everything works, except that it only outputs the yielded data once return has been called, and it doesn't know how to process the data so it outputs it in raw format. This is a limitation with swagger, and there has been an issue opened since 2018 that has since been ignored.
I haven't tested this with yielding ORM Model or Queryset objects yet, but I suspect it would work since the underlying data parsing logic hasn't changed.
I'm also not familiar with the implication of data[view_func.djapy_resp_param.name] = response and if it matters that the content_type is changed afterward for Server Sent Events.