Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
133 commits
Select commit Hold shift + click to select a range
a91d273
WIP
Binyang2014 Aug 18, 2025
2fb4d74
WIP
Binyang2014 Aug 18, 2025
cc96414
WIP
Binyang2014 Aug 19, 2025
d561a5c
WIP
Binyang2014 Aug 19, 2025
0dfc13a
WIP
Binyang2014 Aug 19, 2025
11b3bdb
WIP
Binyang2014 Aug 19, 2025
a9e98ec
WIP
Binyang2014 Aug 20, 2025
f89ffe4
WIP
Binyang2014 Aug 21, 2025
82211f6
WIP
Binyang2014 Aug 21, 2025
978a182
WIP
Binyang2014 Aug 21, 2025
b89697d
WIP
Binyang2014 Aug 22, 2025
0f7ce20
Fix correctness
Binyang2014 Aug 22, 2025
e620110
WIP
Binyang2014 Aug 22, 2025
9df8ca3
clean code
Binyang2014 Aug 22, 2025
c922620
all works
Binyang2014 Aug 23, 2025
b5a8793
WIP
Binyang2014 Aug 23, 2025
f1a905f
Merge branch 'main' into binyli/nccl-algo
Binyang2014 Aug 23, 2025
1235a90
clean up
Binyang2014 Aug 23, 2025
030a4c2
WIP
Binyang2014 Aug 23, 2025
7bcd613
bug fix
Binyang2014 Aug 24, 2025
f1cde8b
fix compile error
Binyang2014 Aug 24, 2025
775f48e
WIP
Binyang2014 Aug 24, 2025
ca638b5
for amd
Binyang2014 Aug 24, 2025
646656b
WIP
Binyang2014 Aug 24, 2025
25f4ee2
WIP
Binyang2014 Aug 24, 2025
52211ce
WIP
Binyang2014 Aug 24, 2025
ffae384
add logs
Binyang2014 Aug 24, 2025
7be42b9
merge main
Binyang2014 Aug 25, 2025
f5b6f48
Merge branch 'main' into binyli/nccl-algo
Binyang2014 Aug 25, 2025
81ba828
WIP
Binyang2014 Aug 26, 2025
40f66e3
Update docs/guide/msccl_dsl_integration.md
Binyang2014 Aug 26, 2025
02f608a
address comments
Binyang2014 Aug 26, 2025
2313197
remove nccl.h from algorithm file
Binyang2014 Aug 26, 2025
1d6aee0
move algo to core lib
Binyang2014 Aug 26, 2025
3d3842b
WIP
Binyang2014 Aug 27, 2025
8e57b6b
WIP
Binyang2014 Aug 28, 2025
bea70b0
WIP
Binyang2014 Aug 28, 2025
b025bd3
WIP
Binyang2014 Aug 29, 2025
cd28260
WIP
Binyang2014 Aug 29, 2025
e2d33fb
example works
Binyang2014 Aug 29, 2025
d2941f1
add doc
Binyang2014 Aug 29, 2025
d1e636a
WIp
Binyang2014 Aug 29, 2025
33c6eb5
WIP
Binyang2014 Sep 2, 2025
44196ec
update doc
Binyang2014 Sep 2, 2025
0cbe6d7
update
Binyang2014 Sep 3, 2025
b51edcf
Merge branch 'main' into binyli/py-api
Binyang2014 Sep 4, 2025
04e53c8
refactor
Binyang2014 Sep 9, 2025
4f91a50
merge main
Binyang2014 Sep 9, 2025
6583661
WIP
Binyang2014 Sep 9, 2025
351b5b0
WIP
Binyang2014 Sep 9, 2025
3b3c2d4
Merge branch 'main' into binyli/py-api
Binyang2014 Sep 10, 2025
652b78c
WIP
Binyang2014 Sep 10, 2025
bfd1171
WIP
Binyang2014 Sep 11, 2025
dba7b5d
WIP
Binyang2014 Sep 11, 2025
bd38165
fix and update
Binyang2014 Sep 11, 2025
6c324a6
WIP
Binyang2014 Sep 11, 2025
c28d401
Merge branch 'binyli/nccl-algo' into binyli/py-api
Binyang2014 Sep 11, 2025
b1b0338
Merge branch 'main' into binyli/py-api
Binyang2014 Sep 11, 2025
6d6f9fa
WIP
Binyang2014 Sep 12, 2025
209ae57
WIP
Binyang2014 Sep 12, 2025
acea3e1
pass compile
Binyang2014 Sep 12, 2025
a65cbb1
WIP
Binyang2014 Sep 12, 2025
bfbdded
WIP
Binyang2014 Sep 12, 2025
3102fb5
Fix
Binyang2014 Sep 13, 2025
13d485e
WIP
Binyang2014 Sep 13, 2025
4ba2c0d
WIP
Binyang2014 Sep 15, 2025
eb8d926
WIP
Binyang2014 Sep 15, 2025
a2fa58e
update
Binyang2014 Sep 15, 2025
4e233fc
working with leak
Binyang2014 Sep 15, 2025
1b4023e
works for now
Binyang2014 Sep 15, 2025
ccf635f
WIP
Binyang2014 Sep 15, 2025
fd5fbc8
WIP
Binyang2014 Sep 16, 2025
e76073f
WIP
Binyang2014 Sep 16, 2025
1606dad
WIP
Binyang2014 Sep 23, 2025
c00f88f
WIP
Binyang2014 Sep 23, 2025
a1fc471
WIP
Binyang2014 Sep 23, 2025
a9ec4ef
merge main
Binyang2014 Sep 26, 2025
8e154d9
resolving conflicts
caiomcbr Oct 2, 2025
7fde9c1
wip
caiomcbr Oct 6, 2025
3db9f4b
Merge branch 'main' into binyli/py-api
caiomcbr Oct 6, 2025
ace7301
wip
caiomcbr Oct 6, 2025
f8ca314
wip
caiomcbr Oct 6, 2025
92b4d3a
wip
caiomcbr Oct 6, 2025
6ad10eb
Merge branch 'main' into binyli/py-api
Binyang2014 Oct 6, 2025
f05b857
lint
Binyang2014 Oct 6, 2025
1cb68ac
wip
caiomcbr Oct 7, 2025
175f49f
wip
caiomcbr Oct 7, 2025
1a3e93a
wip
caiomcbr Oct 7, 2025
0da9515
fix
Binyang2014 Oct 7, 2025
ef91b29
wip
caiomcbr Oct 7, 2025
15e018e
wip
caiomcbr Oct 7, 2025
0502327
update
Binyang2014 Oct 7, 2025
10b453b
fix hang issue
Oct 9, 2025
92ad551
wip
caiomcbr Oct 10, 2025
f595852
fix
Oct 10, 2025
7912487
bug fix
Binyang2014 Oct 12, 2025
a442f37
wip
caiomcbr Oct 15, 2025
9d7386a
wip
caiomcbr Oct 15, 2025
248461e
wip
caiomcbr Oct 15, 2025
3eefbeb
wip
caiomcbr Oct 15, 2025
43c2155
wip
caiomcbr Oct 16, 2025
be75d20
wip
caiomcbr Oct 16, 2025
2a0e39d
wip
caiomcbr Oct 16, 2025
dff8d86
wip
caiomcbr Oct 16, 2025
df9fcd0
WIP
Binyang2014 Oct 21, 2025
cbee2f6
Merge branch 'main' into binyli/py-api
Binyang2014 Oct 21, 2025
9b1afd7
address comments
Binyang2014 Oct 21, 2025
988935f
wip
caiomcbr Oct 21, 2025
c97b9c3
wip
caiomcbr Oct 22, 2025
cd9379d
wip
caiomcbr Oct 22, 2025
43a4417
wip
caiomcbr Oct 22, 2025
c3fea11
wip
caiomcbr Oct 22, 2025
305b16b
wip
caiomcbr Oct 24, 2025
a2b84da
wip
caiomcbr Oct 24, 2025
7a8e183
merge main
Binyang2014 Oct 27, 2025
fbe6911
fix test
Binyang2014 Oct 27, 2025
858c3b7
minor fix
Binyang2014 Oct 27, 2025
8e14e97
wip
caiomcbr Oct 27, 2025
35a7e04
update doc
Binyang2014 Oct 27, 2025
0d5cb6a
update the doc
Binyang2014 Oct 27, 2025
a01b5aa
Merge branch 'main' into binyli/py-api
Binyang2014 Oct 27, 2025
517767f
fix doc build
Binyang2014 Oct 27, 2025
08a8903
Merge branch 'main' into binyli/py-api
Binyang2014 Oct 27, 2025
f4a77c5
Fix rocm build issue
Binyang2014 Oct 27, 2025
2e79d28
Update python/mscclpp/__init__.py
Binyang2014 Oct 27, 2025
6af6fd0
Update python/mscclpp/__init__.py
Binyang2014 Oct 27, 2025
d18f276
Update python/mscclpp/language/default_algos/allreduce_2nodes.py
Binyang2014 Oct 27, 2025
87beb94
Update src/executor/execution_plan.cc
Binyang2014 Oct 28, 2025
f89a6f9
Fix
Binyang2014 Oct 28, 2025
42ef9cc
lint
Binyang2014 Oct 28, 2025
ceaf4d6
wip
caiomcbr Oct 28, 2025
488fda3
wip
caiomcbr Oct 29, 2025
860f5ea
Merge branch 'main' into binyli/py-api
chhwang Oct 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/nccl/src/allreduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,6 @@ class AllreduceNvlsPacket : public mscclpp::AlgorithmBuilder {

size_t scratchBufferSize_;
std::shared_ptr<char> scratchBuffer_;
const int nSegmentsForScratchBuffer_ = 2;
const size_t nvlsBufferSize_ = (1 << 30);

std::shared_ptr<uint32_t> deviceFlag_;
Expand Down
149 changes: 51 additions & 98 deletions apps/nccl/src/nccl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ static inline int mscclppNcclDlopenInit() {
return dlopenSuccess;
}

static inline void mscclppNcclDlopenFinalize() {
// No need to call this function, handle will be closed at program exit
[[maybe_unused]] static inline void mscclppNcclDlopenFinalize() {
if (mscclppNcclDlHandle) {
dlclose(mscclppNcclDlHandle);
}
Expand Down Expand Up @@ -159,17 +160,6 @@ static bool tryLoadNcclSharedLib() {
// Declare the global map to store associations between raw pointer and shared pointer
static std::unordered_map<void*, std::shared_ptr<char>> ptrMap;

struct planKey {
size_t minMessageSize;
size_t maxMessageSize;
bool isInPlace;
};

struct executionPlanInstance {
planKey key;
std::shared_ptr<mscclpp::ExecutionPlan> plan;
};

struct splitCommInfo {
int color;
int key;
Expand All @@ -179,23 +169,16 @@ struct splitCommInfo {
struct ncclComm {
std::shared_ptr<mscclpp::Communicator> comm;
std::shared_ptr<mscclpp::Executor> executor;
std::unordered_map<std::string, std::vector<executionPlanInstance>> executionPlans;
std::shared_ptr<mscclpp::AlgorithmCollection> algorithmCollection;
std::shared_ptr<char> scratchBuffer_;
const size_t scratchBufferSize_ = (1 << 27); // 128MB
std::shared_ptr<mscclpp::ExecutionPlanRegistry> planRegistry_;
int nRanksPerNode;
int worldSize;

void* mscclppNcclComm;
};

static std::pair<std::string, executionPlanInstance> loadExecutionPlan(const std::string& filename, int rank) {
std::shared_ptr<mscclpp::ExecutionPlan> plan = std::make_shared<mscclpp::ExecutionPlan>(filename, rank);
std::string collective = plan->collective();
planKey key{plan->minMessageSize(), plan->maxMessageSize(), plan->isInPlace()};
return std::make_pair(collective, executionPlanInstance{key, plan});
}

static ncclResult_t executeWithPlan(std::shared_ptr<mscclpp::Executor> executor, int rank, ncclDataType_t datatype,
const void* sendbuff, void* recvbuff, size_t sendBytes, size_t recvBytes,
std::shared_ptr<mscclpp::ExecutionPlan> plan, cudaStream_t stream) {
Expand Down Expand Up @@ -352,6 +335,20 @@ static mscclpp::Algorithm algoSelector(
return mscclpp::Algorithm();
}

std::shared_ptr<mscclpp::ExecutionPlanHandle> executionPlanDefaultSelector(
const std::vector<std::shared_ptr<mscclpp::ExecutionPlanHandle>> plans, const mscclpp::ExecutionRequest&) {
if (plans.empty()) {
INFO(MSCCLPP_NCCL, "No execution plans available for selection");
return nullptr;
}
for (auto plan : plans) {
if (plan->tags.find("default") == plan->tags.end()) {
return plan;
}
}
return plans[0];
}

NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank) {
INFO(MSCCLPP_NCCL, "Initializing NCCL communicator for rank %d, world_size=%d", rank, nranks);
if (comm == nullptr) {
Expand All @@ -371,29 +368,13 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI

commPtr->comm = mscclppComm;
commPtr->scratchBuffer_ = mscclpp::GpuBuffer<char>(commPtr->scratchBufferSize_).memory();
commPtr->executor = std::make_shared<mscclpp::Executor>(mscclppComm);
commPtr->executor = std::make_shared<mscclpp::Executor>(mscclppComm, commPtr->scratchBuffer_);
commPtr->planRegistry_ = mscclpp::ExecutionPlanRegistry::getInstance();

commPtr->nRanksPerNode = mscclppComm->bootstrap()->getNranksPerNode();
commPtr->worldSize = mscclppComm->bootstrap()->getNranks();

if (commPtr->worldSize == 1) {
*comm = commPtr;
return ncclSuccess;
}

const std::string& collectiveDir = mscclpp::env()->executionPlanDir;
if (collectiveDir != "") {
if (!std::filesystem::is_directory(collectiveDir)) {
WARN("The value of the environment variable %s is not a directory", collectiveDir.c_str());
return ncclInvalidArgument;
}
for (const auto& entry : std::filesystem::directory_iterator(collectiveDir)) {
if (entry.is_regular_file()) {
auto plan = loadExecutionPlan(entry.path(), rank);
commPtr->executionPlans[plan.first].push_back(plan.second);
}
}
}

commPtr->planRegistry_->loadDefaultPlans(rank);
commPtr->planRegistry_->setDefaultSelector(executionPlanDefaultSelector);
mscclpp::AlgorithmCollectionBuilder::getInstance()->setFallbackAlgorithmSelector(algoSelector);
registerCustomizedAlgo();
commPtr->algorithmCollection = mscclpp::AlgorithmCollectionBuilder::getInstance()->build();
Expand Down Expand Up @@ -462,12 +443,12 @@ NCCL_API ncclResult_t ncclCommDestroy(ncclComm_t comm) {
}
#endif

if (mscclppNcclDlopenSharedLib == true) {
mscclppNcclOps.CommDestroy(*reinterpret_cast<ncclComm_t*>(comm->mscclppNcclComm));
mscclppNcclDlopenFinalize();
delete static_cast<ncclComm_t*>(comm->mscclppNcclComm);
}
ncclComm_t* mscclppNcclCommPtr = reinterpret_cast<ncclComm_t*>(comm->mscclppNcclComm);
delete comm;
if (mscclppNcclCommPtr != nullptr) {
mscclppNcclOps.CommDestroy(*reinterpret_cast<ncclComm_t*>(mscclppNcclCommPtr));
delete static_cast<ncclComm_t*>(mscclppNcclCommPtr);
}
return ncclSuccess;
}

Expand Down Expand Up @@ -646,18 +627,13 @@ NCCL_API ncclResult_t ncclBroadcast(const void* sendbuff, void* recvbuff, size_t
*reinterpret_cast<ncclComm_t*>(comm->mscclppNcclComm), stream);
}

std::vector<executionPlanInstance>& plans = comm->executionPlans["broadcast"];
std::shared_ptr<mscclpp::ExecutionPlan> plan;
bool inPlace = sendbuff == recvbuff;
for (const auto& p : plans) {
if (bytes >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) {
plan = p.plan;
break;
}
}

if (plan != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, bytes, bytes, plan, stream);
static std::unordered_map<std::string, std::vector<uint64_t>> hints{{"root", {static_cast<uint64_t>(root)}}};
hints["root"][0] = static_cast<uint64_t>(root);
auto planHandle = comm->planRegistry_->select("broadcast", comm->comm->bootstrap()->getNranks(),
comm->comm->bootstrap()->getNranksPerNode(),
comm->comm->bootstrap()->getRank(), sendbuff, recvbuff, bytes, hints);
if (planHandle != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, bytes, bytes, planHandle->plan, stream);
}
auto algo = comm->algorithmCollection->selectAlgorithm(
"broadcast", sendbuff, recvbuff, count * ncclTypeSize(datatype), datatype,
Expand Down Expand Up @@ -706,18 +682,11 @@ NCCL_API ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t
*reinterpret_cast<ncclComm_t*>(comm->mscclppNcclComm), stream);
}

std::vector<executionPlanInstance>& plans = comm->executionPlans["allreduce"];
std::shared_ptr<mscclpp::ExecutionPlan> plan;
bool inPlace = sendbuff == recvbuff;
for (const auto& p : plans) {
if (bytes >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) {
plan = p.plan;
break;
}
}

if (plan != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, bytes, bytes, plan, stream);
auto planHandler = comm->planRegistry_->select("allreduce", comm->comm->bootstrap()->getNranks(),
comm->comm->bootstrap()->getNranksPerNode(),
comm->comm->bootstrap()->getRank(), sendbuff, recvbuff, bytes, {});
if (planHandler != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, bytes, bytes, planHandler->plan, stream);
}

auto algo = comm->algorithmCollection->selectAlgorithm(
Expand Down Expand Up @@ -769,20 +738,12 @@ NCCL_API ncclResult_t ncclReduceScatter(const void* sendbuff, void* recvbuff, si
int rank = comm->comm->bootstrap()->getRank();
int nRank = comm->comm->bootstrap()->getNranks();

std::vector<executionPlanInstance>& plans = comm->executionPlans["reducescatter"];
std::shared_ptr<mscclpp::ExecutionPlan> plan;
void* basePtr = (char*)sendbuff + rank * bytes;
bool inPlace = basePtr == recvbuff;
const size_t totalBytes = bytes * nRank;
for (const auto& p : plans) {
if (totalBytes >= p.key.minMessageSize && totalBytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) {
plan = p.plan;
break;
}
}

if (plan != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, totalBytes, bytes, plan, stream);
auto planHandle = comm->planRegistry_->select("reducescatter", comm->comm->bootstrap()->getNranks(),
comm->comm->bootstrap()->getNranksPerNode(),
comm->comm->bootstrap()->getRank(), sendbuff, recvbuff, bytes, {});
if (planHandle != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, bytes * nRank, bytes, planHandle->plan,
stream);
}

if (mscclppNcclDlopenSharedLib == true) {
Expand Down Expand Up @@ -821,20 +782,12 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t
*reinterpret_cast<ncclComm_t*>(comm->mscclppNcclComm), stream);
}

std::vector<executionPlanInstance>& plans = comm->executionPlans["allgather"];
std::shared_ptr<mscclpp::ExecutionPlan> plan;
void* basePtr = (char*)sendbuff - rank * bytes;
bool inPlace = basePtr == recvbuff;
const size_t totalBytes = bytes * nRank;
for (const auto& p : plans) {
if (totalBytes >= p.key.minMessageSize && totalBytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) {
plan = p.plan;
break;
}
}

if (plan != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, bytes, totalBytes, plan, stream);
auto planHandle = comm->planRegistry_->select("allgather", comm->comm->bootstrap()->getNranks(),
comm->comm->bootstrap()->getNranksPerNode(),
comm->comm->bootstrap()->getRank(), sendbuff, recvbuff, bytes, {});
if (planHandle != nullptr) {
return executeWithPlan(comm->executor, rank, datatype, sendbuff, recvbuff, bytes, bytes * nRank, planHandle->plan,
stream);
}

auto algo = comm->algorithmCollection->selectAlgorithm(
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"show-inheritance": True,
}
# only mock the C-extension when using the source tree
autodoc_mock_imports = ["mscclpp._version", "mscclpp._mscclpp", "cupy", "mpi4py", "numpy", "sortedcontainers"]
autodoc_mock_imports = ["mscclpp._version", "mscclpp._mscclpp", "blake3", "cupy", "mpi4py", "numpy", "sortedcontainers"]
autodoc_typehints = "description"
napoleon_google_docstring = True
napoleon_numpy_docstring = True
Expand Down
126 changes: 126 additions & 0 deletions docs/guide/mscclpp-dsl-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# MSCCL++ DSL Integration Guide

MSCCL++ DSL (domain-specific language) enables concise expression of collective algorithms as Python functions.
MSCCL++ offers pythonic utilities to author, JIT-compile, register, and select execution plans. This guide walks through two integration paths: a customized MSCCL++ communicator and NCCL interposition that accelerates existing PyTorch `backend="nccl"` workloads.

## Initial Setup

Run the following from the repository root after completing the basic project setup:

1. Install Python dependencies.
```bash
pip install -r ./python/<requirements_file>
```
Replace `<requirements_file>` with the file that matches your environment (e.g., `requirements_cuda11.txt`, `requirements_cuda12.txt`, or `requirements_rocm6.txt`).

2. Install the module and generate default algorithm plans.
```bash
pip install . && python3 -m mscclpp --install
```

## Integration Options

MSCCL++ DSL integrates into your training or inference workload in two ways:
1. **Custom MSCCL++ Communicator** — directly manage an MSCCL++ communicator and launch collectives with the MSCCL++ executor.
2. **NCCL Interposition** — keep using `backend="nccl"`; MSCCL++ intercepts NCCL calls at runtime for drop-in acceleration.

Both paths follow the same high-level flow:
1. Author (or reuse) a collective algorithm with the MSCCL++ DSL.
2. Compile it into an execution plan.
3. Register the plan with the MSCCL++ runtime.
4. Configure a selector to choose the plan for each collective call.

Below we show an AllReduce example and then detail each integration option.

### Example: AllReduce in the MSCCL++ DSL
The snippet defines an AllReduce that uses NVLS for intra-node reduce-scatter followed by broadcast.
```python
def allreduce_nvls(spec: mscclpp.AlgoSpec) -> CollectiveProgram:
gpu_size = spec.world_size
with CollectiveProgram(
spec.name,
spec.collective,
gpu_size,
instances=8,
protocol=spec.protocol,
num_threads_per_block=spec.num_threads_per_block,
min_message_size=spec.min_message_size,
max_message_size=spec.max_message_size,
) as program:
# Creating Channels
nvls_chan = SwitchChannel(rank_list=[gpu for gpu in range(gpu_size)], buffer_type=BufferType.input)
channels = {}
for gpu in range(gpu_size):
for peer in range(gpu_size):
if peer != gpu:
channels[(peer, gpu)] = MemoryChannel(peer, gpu)

# Synchronization to Ensure all the Gpus are Ready
for gpu in range(gpu_size):
src_rank = gpu
for peer in range(gpu_size):
if peer != src_rank:
dst_rank = peer
channels[(dst_rank, src_rank)].signal(tb=0, relaxed=True)
for peer in range(gpu_size):
if peer != src_rank:
dst_rank = peer
channels[(dst_rank, src_rank)].wait(tb=0, relaxed=True, data_sync=SyncType.after)
# Reducing and Storing the data
for gpu in range(gpu_size):
buffer_offset = gpu
rank = Rank(gpu)
input_buffer = rank.get_input_buffer()
nvls_chan.at_rank(gpu).reduce(
buffer_offset=buffer_offset, size=1, dst_chunk=input_buffer[gpu : gpu + 1], tb=0
)
nvls_chan.at_rank(gpu).broadcast(
src_chunk=input_buffer[gpu : gpu + 1], buffer_offset=buffer_offset, size=1, tb=0
)
# Synchronization to Ensure the Gpus finished
for gpu in range(gpu_size):
src_rank = gpu
for peer in range(gpu_size):
if peer != src_rank:
dst_rank = peer
channels[(dst_rank, src_rank)].signal(tb=0, relaxed=True, data_sync=SyncType.before)
for peer in range(gpu_size):
if peer != src_rank:
dst_rank = peer
channels[(dst_rank, src_rank)].wait(tb=0, relaxed=True)

return program
```

### Integrate with MSCCL++ customized communicator
Use when you want a PyTorch‑compatible interface with fine‑grained control. You manage the communicator, compile/register DSL plans, and invoke collectives via a thin wrapper. The example below shows an AllReduce built on the MSCCL++ communicator and executor.
Example source directory:
```
examples/torch-integration
```
Key file: `customized_comm.py`.


#### Launch (single node)
```bash
MSCCLPP_MASTER_ADDR=<master_ip> MSCCLPP_MASTER_PORT=<port> torchrun --nnodes=1 --nproc_per_node=8 customized_comm.py
```

### Integrate via NCCL Interposition
Keep your script as‑is: init PyTorch with backend="nccl"; MSCCL++ intercepts NCCL calls for drop‑in acceleration.
Example source directory:
```
examples/torch-integration
```
Key file: `dsl_with_nccl_api.py`.

#### Launch with interposition
To run with NCCL interposition, you preload the MSCCL++ shim so it transparently intercepts NCCL calls made by PyTorch’s nccl backend.
```bash
LD_PRELOAD=<MSCCLPP_REPO>/build/apps/nccl/libmscclpp_nccl.so torchrun --nnodes=1 --nproc_per_node=8 dsl_with_nccl_api.py
```
## Notices:
- When using NCCL interposition, the algorithm selection order is:
1. Check for registered DSL plans matching the collective call.
2. Check for a customized kernel implementation if no DSL plan fits.
3. Fall back to the default NCCL implementation (set `MSCCLPP_NCCL_LIB_PATH` to the original NCCL library).
1 change: 1 addition & 0 deletions docs/programming_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ This section provides advanced topics and best practices for using MSCCL++. It i
guide/cpp-examples
guide/mscclpp-dsl
guide/customized-algorithm-with-nccl-api
guide/mscclpp-dsl-integration
Loading
Loading