Skip to content

Commit 203aa2b

Browse files
authored
Merge branch 'main' into chhwang/logger-fix
2 parents 7ebc71d + ddf84a6 commit 203aa2b

File tree

16 files changed

+315
-98
lines changed

16 files changed

+315
-98
lines changed

docs/dsl_quick_start.md

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# DSL Quick Start
2+
3+
The MSCCL++ DSL (Domain Specific Language) provides a high-level Python API for defining custom collective communication algorithms. This guide will help you get started with writing and testing your own communication patterns.
4+
5+
## Installation
6+
7+
You can follow the same steps in the [Quick Start](quickstart).
8+
9+
After finishing the installation in the quick start section, you can add the following steps to install some default algorithms from the DSL:
10+
11+
```bash
12+
python3 -m mscclpp --install
13+
```
14+
15+
## Your First Algorithm: AllGather
16+
17+
Let's walk through a simple AllGather algorithm to understand the DSL basics. This example demonstrates the key concepts without diving into all the advanced features.
18+
19+
### Complete Example
20+
21+
```python
22+
from mscclpp.language import *
23+
24+
def simple_allgather(name):
25+
"""
26+
A simple AllGather implementation using the MSCCL++ DSL.
27+
28+
This example demonstrates a 2-GPU AllGather where each GPU sends
29+
its data to all other GPUs, so all GPUs end up with everyone's data.
30+
31+
Args:
32+
name: Algorithm name for identification
33+
"""
34+
num_gpus = 2
35+
chunk_factor = 1 # Split data into num_gpus chunks
36+
37+
# Define the collective operation
38+
collective = AllGather(num_gpus, chunk_factor, inplace=True)
39+
40+
# Create the program context
41+
with CollectiveProgram(
42+
name,
43+
collective,
44+
num_gpus,
45+
protocol="Simple", # Use Simple protocol (vs "LL" for low-latency)
46+
min_message_size=0,
47+
max_message_size=2**30 # 1GB
48+
):
49+
# Loop over each source GPU rank
50+
for src_rank in range(num_gpus):
51+
# Create a Rank object for the source GPU
52+
rank = Rank(src_rank)
53+
# Get the output buffer where the data is stored
54+
src_buffer = rank.get_output_buffer()
55+
# Take a slice corresponding to this rank's data
56+
src_chunk = src_buffer[src_rank:src_rank + 1]
57+
58+
# Loop over each destination GPU rank
59+
for dst_rank in range(num_gpus):
60+
# Skip sending from a rank to itself
61+
if src_rank != dst_rank:
62+
# Create a Rank object for the destination GPU
63+
dst_rank_obj = Rank(dst_rank)
64+
# Get the destination buffer where data will be sent
65+
dst_buffer = dst_rank_obj.get_output_buffer()
66+
# Take a slice where the data will be placed
67+
dst_chunk = dst_buffer[src_rank:src_rank + 1]
68+
69+
# Define a channel from src_rank → dst_rank
70+
channel = MemoryChannel(dst_rank, src_rank)
71+
72+
# Step 1: Source signals it is ready to send data
73+
channel.signal(tb=0, relaxed=True)
74+
75+
# Step 2: Wait for destination to be ready
76+
channel.wait(tb=0, data_sync=SyncType.after, relaxed=True)
77+
78+
# Step 3: Source rank sends data to destination rank
79+
channel.put(dst_chunk, src_chunk, tb=0)
80+
81+
# Step 4: Signal that put operation is complete
82+
channel.signal(tb=0, data_sync=SyncType.before)
83+
84+
# Step 5: Wait for acknowledgment
85+
channel.wait(tb=0, data_sync=SyncType.after)
86+
87+
print(JSON())
88+
89+
simple_allgather("simple_allgather_2gpus")
90+
```
91+
92+
### Key Concepts Explained
93+
94+
**1. Collective Definition**
95+
```python
96+
collective = AllGather(num_gpus, chunk_factor=1, inplace=True)
97+
```
98+
- Defines what collective operation to implement (AllGather in this case)
99+
- `chunk_factor` determines data chunking strategy
100+
- `inplace=True` means input and output use the same buffer. For AllGather, the input buffer is a slice of the output buffer. For example, on rank 0, the input buffer is the first half of the output buffer, and on rank 1, the input buffer is the second half of the output buffer.
101+
102+
**2. Program Context**
103+
```python
104+
with CollectiveProgram(name, collective, num_gpus, ...):
105+
```
106+
- Sets up the execution environment
107+
- Configures protocol, threading, and message size ranges
108+
109+
**3. Ranks and Buffers**
110+
```python
111+
rank = Rank(src_rank)
112+
src_buffer = rank.get_output_buffer()
113+
src_chunk = src_buffer[src_rank:src_rank + 1]
114+
```
115+
- `Rank` represents a GPU in the collective
116+
- Buffers hold the data being communicated
117+
- Chunks are slices of buffers representing data portions
118+
119+
**4. Channels**
120+
```python
121+
channel = MemoryChannel(dst_rank, src_rank)
122+
```
123+
- Establishes communication paths between GPUs
124+
- `MemoryChannel` for intra-node (fast, direct memory access)
125+
- Created for each source-destination pair
126+
- Can also use `PortChannel` for inter-node communication
127+
128+
**5. Synchronization and Data Transfer**
129+
```python
130+
channel.signal(tb=0, relaxed=True)
131+
channel.wait(tb=0, data_sync=SyncType.after, relaxed=True)
132+
channel.put(dst_chunk, src_chunk, tb=0)
133+
```
134+
- `signal()`: Notify remote GPU of state changes
135+
- `wait()`: Wait for remote GPU to reach a certain state
136+
- `put()`: Write data from local to remote GPU memory
137+
- `tb=0` assigns operations to thread block 0
138+
- `relaxed=True` uses relaxed memory ordering for performance
139+
140+
For more advanced concepts like synchronization, scratch buffers, and pipelining, refer to the [full DSL documentation](py_api).
141+
142+
## Testing Your Algorithm
143+
144+
Once you've written your algorithm, you need to run it:
145+
146+
```bash
147+
python3 path/to/simple_allgather.py > /path/to/simple_allgather.json
148+
```
149+
150+
After this, use `executor_test.py` to validate correctness and measure performance.
151+
152+
```bash
153+
# Test with 2 GPUs on a single node
154+
mpirun --allow-run-as-root -np 2 python3 python/test/executor_test.py \
155+
-path /path/to/simple_allgather.json \
156+
--size 1M \
157+
--in_place
158+
```
159+
160+
## Next Steps
161+
162+
Now that you understand the basics:
163+
164+
1. **Explore Examples**: Check `python/mscclpp/language/tests/` for more algorithm examples
165+
2. **Optimize**: Experiment with different chunk strategies, pipelining, and synchronization patterns
166+
3. **Advanced Features**: Learn about scratch buffers, thread block groups, and packet-based communication
167+
168+
For detailed API documentation and advanced features, refer to:
169+
- [Programming Guide](programming_guide)
170+
- [Tutorials](tutorials)
171+
172+
## Troubleshooting
173+
174+
**Import Error**: If you see `ModuleNotFoundError: No module named 'mscclpp'`, ensure you've installed the package with `pip install .`
175+
176+
For more help, please file an issue on the [GitHub repository](https://github.com/microsoft/mscclpp/issues).

docs/index.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ You can find the followings from this documentation.
1010

1111
- **Overview:** An overview of MSCCL++ and its features. :doc:`🔗 <overview>`
1212
- **Quick Start:** A guide to build, install, and run MSCCL++. :doc:`🔗 <quickstart>`
13+
- **DSL Quick Start:** A guide to get started with the MSCCL++ DSL for defining custom algorithms. :doc:`🔗 <dsl_quick_start>`
1314
- **Tutorials:** A step-by-step guide for GPU communication using MSCCL++. :doc:`🔗 <tutorials>`
1415
- **Programming Guide:** Advanced topics and best practices for using MSCCL++. :doc:`🔗 <programming_guide>`
1516
- **C++ API Reference:** Detailed documentation of the MSCCL++ C++ API. :doc:`🔗 <cpp_api>`
@@ -21,6 +22,7 @@ You can find the followings from this documentation.
2122

2223
overview
2324
quickstart
25+
dsl_quick_start
2426
tutorials
2527
programming_guide
2628
cpp_api

include/mscclpp/gpu_utils.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,26 @@ namespace mscclpp {
4141
struct AvoidCudaGraphCaptureGuard {
4242
AvoidCudaGraphCaptureGuard();
4343
~AvoidCudaGraphCaptureGuard();
44+
AvoidCudaGraphCaptureGuard(const AvoidCudaGraphCaptureGuard&) = delete;
45+
AvoidCudaGraphCaptureGuard& operator=(const AvoidCudaGraphCaptureGuard&) = delete;
46+
AvoidCudaGraphCaptureGuard(AvoidCudaGraphCaptureGuard&&) = delete;
47+
AvoidCudaGraphCaptureGuard& operator=(AvoidCudaGraphCaptureGuard&&) = delete;
4448
cudaStreamCaptureMode mode_;
4549
bool active_;
4650
};
4751

52+
/// A RAII guard that will set the current device on construction and restore the previous device on destruction.
53+
struct CudaDeviceGuard {
54+
CudaDeviceGuard(int deviceId);
55+
~CudaDeviceGuard();
56+
CudaDeviceGuard(const CudaDeviceGuard&) = delete;
57+
CudaDeviceGuard& operator=(const CudaDeviceGuard&) = delete;
58+
CudaDeviceGuard(CudaDeviceGuard&&) = delete;
59+
CudaDeviceGuard& operator=(CudaDeviceGuard&&) = delete;
60+
int deviceId_;
61+
int origDeviceId_;
62+
};
63+
4864
/// A RAII wrapper around cudaStream_t that will call cudaStreamDestroy on destruction.
4965
struct CudaStreamWithFlags {
5066
/// Constructor without flags. This will not create any stream. set() can be called later to create a stream with
@@ -128,6 +144,7 @@ std::shared_ptr<GpuStreamPool> gpuStreamPool();
128144
namespace detail {
129145

130146
void setReadWriteMemoryAccess(void* base, size_t size);
147+
int gpuIdFromAddress(void* ptr);
131148

132149
void* gpuCalloc(size_t bytes);
133150
void* gpuCallocHost(size_t bytes, unsigned int flags);

python/csrc/core_py.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ void register_core(nb::module_& m) {
216216

217217
def_shared_future<RegisteredMemory>(m, "RegisteredMemory");
218218
def_shared_future<Connection>(m, "Connection");
219+
def_shared_future<Semaphore>(m, "Semaphore");
219220

220221
nb::class_<Communicator>(m, "Communicator")
221222
.def(nb::init<std::shared_ptr<Bootstrap>, std::shared_ptr<Context>>(), nb::arg("bootstrap"),
@@ -242,7 +243,7 @@ void register_core(nb::module_& m) {
242243
nb::arg("remote_rank"), nb::arg("tag"), nb::arg("local_config"))
243244
.def("send_memory_on_setup", &Communicator::sendMemory, nb::arg("memory"), nb::arg("remote_rank"), nb::arg("tag"))
244245
.def("recv_memory_on_setup", &Communicator::recvMemory, nb::arg("remote_rank"), nb::arg("tag"))
245-
.def("build_semaphore", &Communicator::buildSemaphore, nb::arg("local_flag"), nb::arg("remote_rank"),
246+
.def("build_semaphore", &Communicator::buildSemaphore, nb::arg("connection"), nb::arg("remote_rank"),
246247
nb::arg("tag") = 0)
247248
.def("remote_rank_of", &Communicator::remoteRankOf)
248249
.def("tag_of", &Communicator::tagOf)

python/csrc/memory_channel_py.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@ void register_memory_channel(nb::module_& m) {
2626

2727
nb::class_<MemoryChannel>(m, "MemoryChannel")
2828
.def(nb::init<>())
29-
.def("__init__",
30-
[](MemoryChannel* memoryChannel, std::shared_ptr<MemoryDevice2DeviceSemaphore> semaphore,
31-
RegisteredMemory dst, RegisteredMemory src) { new (memoryChannel) MemoryChannel(semaphore, dst, src); })
32-
.def("__init__",
33-
[](MemoryChannel* memoryChannel, std::shared_ptr<MemoryDevice2DeviceSemaphore> semaphore,
34-
RegisteredMemory dst, RegisteredMemory src, uintptr_t packet_buffer) {
35-
new (memoryChannel) MemoryChannel(semaphore, dst, src, reinterpret_cast<void*>(packet_buffer));
36-
})
29+
.def(
30+
"__init__",
31+
[](MemoryChannel* memoryChannel, std::shared_ptr<MemoryDevice2DeviceSemaphore> semaphore,
32+
RegisteredMemory dst, RegisteredMemory src, uintptr_t packet_buffer) {
33+
new (memoryChannel) MemoryChannel(semaphore, dst, src, reinterpret_cast<void*>(packet_buffer));
34+
},
35+
nb::arg("semaphore"), nb::arg("dst"), nb::arg("src"), nb::arg("packet_buffer") = 0)
36+
.def(
37+
"__init__",
38+
[](MemoryChannel* memoryChannel, const Semaphore& semaphore, RegisteredMemory dst, RegisteredMemory src,
39+
uintptr_t packet_buffer = 0) {
40+
new (memoryChannel) MemoryChannel(semaphore, dst, src, reinterpret_cast<void*>(packet_buffer));
41+
},
42+
nb::arg("semaphore"), nb::arg("dst"), nb::arg("src"), nb::arg("packet_buffer") = 0)
3743
.def("device_handle", &MemoryChannel::deviceHandle);
3844

3945
nb::class_<MemoryChannel::DeviceHandle>(m, "MemoryChannelDeviceHandle")

python/mscclpp/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
connect_nvls_collective,
4848
EndpointConfig,
4949
Fifo,
50+
Semaphore,
5051
Host2DeviceSemaphore,
5152
Host2HostSemaphore,
5253
numa,
@@ -79,6 +80,7 @@
7980
"connect_nvls_collective",
8081
"EndpointConfig",
8182
"Fifo",
83+
"Semaphore",
8284
"Host2DeviceSemaphore",
8385
"Host2HostSemaphore",
8486
"numa",

python/mscclpp/comm.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
Connection,
1111
connect_nvls_collective,
1212
EndpointConfig,
13+
Semaphore,
1314
Host2DeviceSemaphore,
1415
Host2HostSemaphore,
1516
ProxyService,
@@ -133,18 +134,14 @@ def _register_memory_with_connections(
133134
all_registered_memories[rank] = future_memories[rank].get()
134135
return all_registered_memories
135136

136-
def make_semaphore(
137-
self,
138-
connections: dict[int, Connection],
139-
semaphore_type: Type[Host2HostSemaphore] | Type[Host2DeviceSemaphore] | Type[MemoryDevice2DeviceSemaphore],
140-
) -> dict[int, Host2HostSemaphore]:
141-
semaphores = {}
137+
def make_semaphores(self, connections: dict[int, Connection]) -> dict[int, Semaphore]:
138+
future_semaphores = {}
142139
for rank in connections:
143-
semaphores[rank] = semaphore_type(self.communicator, connections[rank])
144-
return semaphores
140+
future_semaphores[rank] = self.communicator.build_semaphore(connections[rank], rank)
141+
return {rank: future.get() for rank, future in future_semaphores.items()}
145142

146143
def make_memory_channels(self, tensor: cp.ndarray, connections: dict[int, Connection]) -> dict[int, MemoryChannel]:
147-
semaphores = self.make_semaphore(connections, MemoryDevice2DeviceSemaphore)
144+
semaphores = self.make_semaphores(connections)
148145
registered_memories = self.register_tensor_with_connections(tensor, connections)
149146
channels = {}
150147
for rank in connections:
@@ -159,7 +156,7 @@ def make_memory_channels_with_scratch(
159156
registeredScratchBuffer: RegisteredMemory,
160157
connections: dict[int, Connection],
161158
) -> dict[int, MemoryChannel]:
162-
semaphores = self.make_semaphore(connections, MemoryDevice2DeviceSemaphore)
159+
semaphores = self.make_semaphores(connections)
163160
registered_memories = self._register_memory_with_connections(registeredScratchBuffer, connections)
164161
channels = {}
165162
tensor_data_ptr = tensor.data_ptr() if is_torch_tensor(tensor) else tensor.data.ptr
@@ -177,7 +174,7 @@ def make_memory_channels_with_scratch(
177174
def make_port_channels(
178175
self, proxy_service: ProxyService, tensor: cp.ndarray, connections: dict[int, Connection]
179176
) -> dict[int, PortChannel]:
180-
semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
177+
semaphores = self.make_semaphores(connections)
181178
registered_memories = self.register_tensor_with_connections(tensor, connections)
182179
memory_ids = {}
183180
semaphore_ids = {}
@@ -210,7 +207,7 @@ def make_port_channels_with_scratch(
210207
)
211208
local_reg_memory = self.communicator.register_memory(data_ptr, tensor_size, transport_flags)
212209

213-
semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
210+
semaphores = self.make_semaphores(connections)
214211
registered_memories = self._register_memory_with_connections(registeredScratchBuffer, connections)
215212
memory_ids = {}
216213
semaphore_ids = {}
@@ -229,7 +226,7 @@ def make_port_channels_with_scratch(
229226
def register_semaphore_with_proxy(
230227
self, proxy_service: ProxyService, connections: dict[int, Connection]
231228
) -> dict[int, PortChannel]:
232-
semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
229+
semaphores = self.make_semaphores(connections)
233230
semaphore_ids = {}
234231
for rank in semaphores:
235232
semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank])

python/mscclpp_benchmark/mscclpp_op.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,10 @@ def __init__(
453453
)
454454

455455
# create a memory_channel for each remote neighbor
456-
self.semaphores = group.make_semaphore(self.nvlink_connections, MemoryDevice2DeviceSemaphore)
456+
self.semaphores = {
457+
rank: MemoryDevice2DeviceSemaphore(sema)
458+
for rank, sema in group.make_semaphores(self.nvlink_connections).items()
459+
}
457460
file_dir = os.path.dirname(os.path.abspath(__file__))
458461
self.kernel = KernelBuilder(
459462
file="allreduce.cu",

0 commit comments

Comments
 (0)