Skip to content

Implementing MPI communication protocol for Dask #48

@aamirshafi

Description

@aamirshafi

I am currently experimenting with dask-mpi to replace point-to-point communication between scheduler<-->workers and workers<-->workers to use MPI. The goal is to exploit high-performance interconnects . I am aware of the ucx communication protocol already implemented but the MPI approach will be an alternative to that. Also, I have seen discussions on dask starting MPI jobs (#25) but obviously this post is for doing the opposite.

While starting dask jobs with dask-mpi, the COMM_WORLD has already been initialized in core.py. After that process 0 runs the scheduler code, process 1 runs that client code, and process >= 2 run worker processes. I can also see that the communication code is nicely abstracted out in dask/distributed/comm/ folder (inside dask.distributed). There are three implementations tcp.py, ucx.py, and inproc.py already available. All three of them implement the abstract class Comm. So should I be looking at producing mpi.py as the first step? Somehow pass the COMM_WORLD object to mpi.py and use this communicator for communication. My initial goal is to leave connection establishment as is and only replace data communication with MPI's point-to-point communication. Also there must be some code that multiplexes between these communication protocols based on user's parameters. I would appreciate if I can be pointed to that code.

Also once this is implemented, do we need to do anything special for communication between GPU devices. As per my understanding, GPU devices will be able to communicate as long as the underlying MPI library is CUDA-aware meaning that it supports communication between GPU devices directly. In other words, I am asking if dask-cuda has its own communication mechanism or does it also rely on distributed for its communication.

And also one last clarification please. Is there explicit communication between dask workers? Typically in a loosely coupled systems, workers are only aware of schedulers but going through the code I got this impression that dask workers also communicate with one another. The architecture diagram does not have connections between workers and hence this confusion. And if there is communication between workers, then does mpi.py that we envisaged above enables workers<-->workers communication or scheduler<-->workers communication or both.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions