-
Notifications
You must be signed in to change notification settings - Fork 70
Implement handling for STDOUT StreamType in sandbox exec #3717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@cursor review |
|
@cursor review |
| self._task: Optional[asyncio.Task[None]] = None | ||
| self._file_descriptor = params.file_descriptor | ||
| # Kick off a background task that reads from the underlying text stream and prints to stdout. | ||
| self._start_printing_task(params, by_line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if we can avoid kicking off this task in the constructor and instead have some kind of delegation method we use from whatever constructs this in an async context to make the "asyncness" of this function explicit, e.g. have an a async def _init_stream() method on all StreamReader/StreamWriter objects that's called by their constructor to kick off any potential background tasks.
This helps with both testing and making sure refactors don't accidentally push this construction calling into some non-async scope where create_task wouldn't work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, yeah I remember you commenting on this when you were working on the other issue. I'll give this a try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it looks like this would unfortunately be pretty invasive. We instantiate StreamReaders pretty much only from sync functions, e.g. Sandbox._hydrate_metadata (here) and in ContainerProcess*.__init__, e.g. here. So we'd have to do a lot of rearranging to make this workable.
Let me know if you think it's still worth doing. But I think we really need to redesign a lot of the Sandbox API and do proper resource management via context managers and the like at some point anyway...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean... Yeah the similar thing I ran across in Italy was ContainerProcess also doing something similar - starting asyncio Tasks from the constructor. And _hydrate_metadata() should also not be doing anything with event loops, that's risky 😬
I think we should clean this up sooner rather than later, without necessarily adding the contextmanager support (that should probably be optional anyway to not break backwards compatibility or similarities with Popen), but I'm ok with it being a followup on this since it's not directly related.
| ) -> None: | ||
| self._reader: Optional[_TextStreamReaderThroughCommandRouter[str]] = None | ||
| self._task: Optional[asyncio.Task[None]] = None | ||
| self._file_descriptor = params.file_descriptor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we maybe just add self._params = params and then reference that from the rest of the implementation code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had that at first but changed it because I generally like to only keep data that's actually needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the full params are "needed" if you count them being used to pass into _TextStreamReaderThroughCommandRouter in the _start_printing_task - esp. if we would defer that task creation to some followup step that's not part of the contstrcutor (then it wouldn't take this as an argument but have to find it as some instance var)
| self._start_printing_task(params, by_line) | ||
|
|
||
| @property | ||
| def file_descriptor(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a "api_pb2.FileDescriptor.ValueType" right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, the _StreamReader class has always had int as the return type here though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... so many mini cleanups. Maybe it makes sense to add it as a quick driveby improvement to all the _StreamReaders?
|
|
||
| def _start_printing_task(self, params: _StreamReaderThroughCommandRouterParams, by_line: bool) -> None: | ||
| async def _run(): | ||
| self._reader = _TextStreamReaderThroughCommandRouter[str](params, by_line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a case for printing non-str (binary) data to stdout sometimes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good question, I wasn't sure, what do you think? I sort of assumed people wouldn't want to do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could see some potential use cases for it - e.g if you want to run commands that output terminal control sequences which aren't necessarily text-encodable. I guess we could have different underlying _reader implementations here depending on text=True | False ?
Btw, why is _TextStreamReaderThroughCommandRouter and _BytesStreamReader... - wouldn't they always use str as the generic type var for the first and bytes for the second? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe a word is missing there, are you asking why they're generic? If that's the question, then there's no good reason, I should make them concrete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, missed a word 🤦 Generic yes, they shouldn't need to be generic
| self._task = asyncio.create_task(_run()) | ||
|
|
||
| async def read(self) -> T: | ||
| raise InvalidError("Logs can only be retrieved using the PIPE stream type.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, maybe use "Output can only be read..." rather than the "Logs can only be retrieved" - command output isn't necessarily seen as "logs"? Maybe that's just me though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I agree with you, this was copied from an existing error message but I prefer output so I'll change it
| async def aclose(self): | ||
| if self._task is not None: | ||
| self._task.cancel() | ||
| with contextlib.suppress(Exception): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is to suppress the expected asyncio.CancelledError specifically, can we use that type instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is me not examining LLM generated code closely enough, I'll see if I can figure out the intent and scope it more specifically
| if stream_type == StreamType.STDOUT: | ||
| if not text: | ||
| raise ValueError("StreamType.STDOUT is only supported when text=True") | ||
| self._impl = _StdoutPrintingStreamReaderThroughCommandRouter(params, by_line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about it (considering the bytes vs str reading etc.) - could we implement the _StdoutPrintingStreamReaderThroughCommandRouter as accepting another streamreader as the underlying buffer - passing the _TextStreamReaderThroughCommandRouter or _BytesStreamReaderThroughCommandRouter created below into the constructor of _StdoutPrintingStreamReaderThroughCommandRouter ? Then we don't have to duplicate the logic of determining the underlying stream reader by looking at params etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that's better
Describe your changes
Checklists
Compatibility checklist
Check these boxes or delete any item (or this section) if not relevant for this PR.
Note on protobuf: protobuf message changes in one place may have impact to
multiple entities (client, server, worker, database). See points above.
Release checklist
If you intend for this commit to trigger a full release to PyPI, please ensure that the following steps have been taken:
modal_version/__init__.py) has been updated with the next logical versionChangelog
Note
Enable StreamType.STDOUT for sandbox exec via the command router by streaming to the client and printing immediately to local stdout (text-only), with tests covering behavior.
modal/io_streams.py):STDOUTreader: Add_StdoutPrintingStreamReaderThroughCommandRouterthat consumes text output and prints to local stdout immediately; disallowsread()/iteration; cleans up background task onaclose().StreamReadernow instantiates the new reader whenstream_type == StreamType.STDOUT(requirestext=True); otherwise uses existing text/bytes readers.modal/sandbox.py):stdout=STDOUTto routerPIPEstreaming so client-side reader can print locally; preserves existing configs forPIPE/DEVNULL/STDERR->STDOUT.test/sandbox_test.py):stdout=STDOUTprints immediately (includingbufsize=1), reading fromstdoutraisesInvalidError, and router backend requirestext=True.Written by Cursor Bugbot for commit 2f66dec. This will update automatically on new commits. Configure here.