[SPARK-54217] Synchronize PythonRunner's MonitorThread kill decision #52915
+8
−4
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This diff addresses the synchronization issue described in SPARK-54217 by respecting the existing releasedOrClosed AtomicBoolean in the PythonRunner's kill codepath, which is currently only used in the "released" codepath - not the "closed" one. In doing so, we avoid erroneously destroying a still-healthy Python worker; in the current state, it will be destroyed & a new one will be created.
Jira ticket description follows...
PythonWorkerFactory in daemon mode will allow for worker reuse, where possible, as long as the worker successfully completed its last-assigned task (via releasePythonWorker). The worker will be released into the idle queue to be picked up by the next createPythonWorker call.
However, there is a race condition that can result in a released worker in the PythonWorkerFactory idle queue getting killed. i.e. the
PythonRunnerlacks synchronization between:MonitorThread's decision to kill the associated Python worker (when requested by the executor, e.g. speculative execution where another attempt succeeds).So, the following sequence of events is possible:
PythonRunneris runningEND_OF_STREAMto signal back toPythonRunner's main task thread that it is donePythonRunner's main task thread receives this instruction and releases the worker for reusePythonRunner'sMonitorThreadreceives this instruction and kills the already-relinquishedPythonWorkerSo the next task that pulls this Python worker from the idle pool will have a dead Python worker.
Why are the changes needed?
In the latest Spark release, this change is NOT critical, however, it avoids the unnecessary killing of a still-healthy Python worker which results in another one being created.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
It is not possible (or very hard) to unit test this change.
However, I've created a minimal repro of the issue. The error occurs when running without this change, and goes away with this change.
PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue is dead, discarding.Was this patch authored or co-authored using generative AI tooling?
No.