Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions apps/nccl/include/mscclpp/datatype_conversion.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef MSCCLPP_DATATYPE_CONVERSION_HPP_
#define MSCCLPP_DATATYPE_CONVERSION_HPP_

#include <mscclpp/nccl.h>

#include <cstddef>
#include <mscclpp/executor.hpp>

// Convert ncclDataType_t to mscclpp::DataType
inline mscclpp::DataType ncclDataTypeToMscclpp(ncclDataType_t dtype) {
switch (dtype) {
case ncclInt32:
return mscclpp::DataType::INT32;
case ncclUint32:
return mscclpp::DataType::UINT32;
case ncclFloat16:
return mscclpp::DataType::FLOAT16;
case ncclFloat32:
return mscclpp::DataType::FLOAT32;
case ncclBfloat16:
return mscclpp::DataType::BFLOAT16;
#ifdef __FP8_TYPES_EXIST__
case ncclFloat8e4m3:
return mscclpp::DataType::FP8_E4M3;
case ncclFloat8e5m2:
return mscclpp::DataType::FP8_E5M2;
#endif
default:
throw mscclpp::Error("Unsupported ncclDataType_t: " + std::to_string(dtype), mscclpp::ErrorCode::InvalidUsage);
}
}

// Get the size in bytes of a data type
inline size_t getDataTypeSize(mscclpp::DataType dtype) {
switch (dtype) {
case mscclpp::DataType::FP8_E4M3:
case mscclpp::DataType::FP8_E5M2:
return 1;
case mscclpp::DataType::FLOAT16:
case mscclpp::DataType::BFLOAT16:
return 2;
case mscclpp::DataType::INT32:
case mscclpp::DataType::UINT32:
case mscclpp::DataType::FLOAT32:
return 4;
default:
return 0;
}
}

#endif // MSCCLPP_DATATYPE_CONVERSION_HPP_
55 changes: 29 additions & 26 deletions apps/nccl/src/allgather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <mscclpp/nccl.h>

#include <mscclpp/algorithm.hpp>
#include <mscclpp/datatype_conversion.hpp>
#include <mscclpp/env.hpp>
#include <mscclpp/gpu_utils.hpp>

Expand All @@ -23,10 +24,11 @@ void AllgatherAlgo6::initialize(std::shared_ptr<mscclpp::Communicator> comm,
}

ncclResult_t AllgatherAlgo6::allgatherKernelFunc(const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input,
void* output, size_t count, ncclDataType_t dtype, cudaStream_t stream,
void* output, size_t count, mscclpp::DataType dtype,
cudaStream_t stream,
std::unordered_map<std::string, std::shared_ptr<void>>&) {
int nBlocks = 28;
const size_t bytes = count * ncclTypeSize(dtype);
const size_t bytes = count * getDataTypeSize(dtype);
const size_t nElem = bytes / sizeof(int);
int rank = ctx->rank;
if (bytes <= 32 * (1 << 20)) {
Expand Down Expand Up @@ -61,7 +63,7 @@ ncclResult_t AllgatherAlgo6::allgatherKernelFunc(const std::shared_ptr<mscclpp::

std::shared_ptr<mscclpp::AlgorithmCtx> AllgatherAlgo6::initAllgatherContext(std::shared_ptr<mscclpp::Communicator> comm,
const void*, void* output, size_t count,
ncclDataType_t dtype) {
mscclpp::DataType dtype) {
auto ctx = std::make_shared<mscclpp::AlgorithmCtx>();
ctx->rank = comm->bootstrap()->getRank();
ctx->workSize = comm->bootstrap()->getNranks();
Expand All @@ -70,7 +72,7 @@ std::shared_ptr<mscclpp::AlgorithmCtx> AllgatherAlgo6::initAllgatherContext(std:
// setup semaphores
ctx->memorySemaphores = this->memorySemaphores_;

size_t bytes = count * ncclTypeSize(dtype);
size_t bytes = count * getDataTypeSize(dtype);
size_t recvBytes;
CUdeviceptr recvBasePtr;
MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)output));
Expand Down Expand Up @@ -98,7 +100,7 @@ std::shared_ptr<mscclpp::AlgorithmCtx> AllgatherAlgo6::initAllgatherContext(std:
}

mscclpp::AlgorithmCtxKey AllgatherAlgo6::generateAllgatherContextKey(const void*, void* output, size_t,
ncclDataType_t) {
mscclpp::DataType) {
static int tag = 0;
if (disableChannelCache_) {
// always return a new key if channel cache is disabled
Expand All @@ -116,15 +118,15 @@ mscclpp::Algorithm AllgatherAlgo6::build() {
"default_allgather6", "allgather",
[self](std::shared_ptr<mscclpp::Communicator> comm,
std::unordered_map<std::string, std::shared_ptr<void>>& extras) { self->initialize(comm, extras); },
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t count, int dtype,
cudaStream_t stream, std::unordered_map<std::string, std::shared_ptr<void>>& extras) {
return self->allgatherKernelFunc(ctx, input, output, count, static_cast<ncclDataType_t>(dtype), stream, extras);
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t count,
mscclpp::DataType dtype, cudaStream_t stream,
std::unordered_map<std::string, std::shared_ptr<void>>& extras) {
return self->allgatherKernelFunc(ctx, input, output, count, dtype, stream, extras);
},
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t count, int dtype) {
return self->initAllgatherContext(comm, input, output, count, static_cast<ncclDataType_t>(dtype));
},
[self](const void* input, void* output, size_t count, int dtype) {
return self->generateAllgatherContextKey(input, output, count, static_cast<ncclDataType_t>(dtype));
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t count,
mscclpp::DataType dtype) { return self->initAllgatherContext(comm, input, output, count, dtype); },
[self](const void* input, void* output, size_t count, mscclpp::DataType dtype) {
return self->generateAllgatherContextKey(input, output, count, dtype);
});
return allgatherAlgo;
}
Expand All @@ -137,10 +139,11 @@ void AllgatherAlgo8::initialize(std::shared_ptr<mscclpp::Communicator> comm,
}

ncclResult_t AllgatherAlgo8::allgatherKernelFunc(const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input,
void* output, size_t count, ncclDataType_t dtype, cudaStream_t stream,
void* output, size_t count, mscclpp::DataType dtype,
cudaStream_t stream,
std::unordered_map<std::string, std::shared_ptr<void>>&) {
int rank = ctx->rank;
const size_t bytes = count * ncclTypeSize(dtype);
const size_t bytes = count * getDataTypeSize(dtype);
const size_t nElem = bytes / sizeof(int);
if ((char*)input == (char*)output + rank * bytes) {
allgather8<false><<<56, 1024, 0, stream>>>((void*)input, this->scratchBuffer_.get(), (void*)output,
Expand All @@ -161,7 +164,7 @@ ncclResult_t AllgatherAlgo8::allgatherKernelFunc(const std::shared_ptr<mscclpp::

std::shared_ptr<mscclpp::AlgorithmCtx> AllgatherAlgo8::initAllgatherContext(std::shared_ptr<mscclpp::Communicator> comm,
const void* input, void*, size_t count,
ncclDataType_t dtype) {
mscclpp::DataType dtype) {
constexpr int nChannelsPerConnection = 56;

auto ctx = std::make_shared<mscclpp::AlgorithmCtx>();
Expand All @@ -172,7 +175,7 @@ std::shared_ptr<mscclpp::AlgorithmCtx> AllgatherAlgo8::initAllgatherContext(std:
// setup semaphores
ctx->memorySemaphores = std::move(setupMemorySemaphores(comm, this->conns_, nChannelsPerConnection));

size_t bytes = count * ncclTypeSize(dtype);
size_t bytes = count * getDataTypeSize(dtype);
// register the memory for the broadcast operation
mscclpp::RegisteredMemory localMemory = comm->registerMemory((void*)input, bytes, mscclpp::Transport::CudaIpc);
mscclpp::RegisteredMemory scratchMemory =
Expand All @@ -192,7 +195,7 @@ std::shared_ptr<mscclpp::AlgorithmCtx> AllgatherAlgo8::initAllgatherContext(std:
return ctx;
}

mscclpp::AlgorithmCtxKey AllgatherAlgo8::generateAllgatherContextKey(const void*, void*, size_t, ncclDataType_t) {
mscclpp::AlgorithmCtxKey AllgatherAlgo8::generateAllgatherContextKey(const void*, void*, size_t, mscclpp::DataType) {
// always return same key, non-zero copy algo
return mscclpp::AlgorithmCtxKey{nullptr, nullptr, 0, 0, 0};
}
Expand All @@ -203,15 +206,15 @@ mscclpp::Algorithm AllgatherAlgo8::build() {
"default_allgather8", "allgather",
[self](std::shared_ptr<mscclpp::Communicator> comm,
std::unordered_map<std::string, std::shared_ptr<void>>& extras) { self->initialize(comm, extras); },
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t count, int dtype,
cudaStream_t stream, std::unordered_map<std::string, std::shared_ptr<void>>& extras) {
return self->allgatherKernelFunc(ctx, input, output, count, static_cast<ncclDataType_t>(dtype), stream, extras);
},
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t count, int dtype) {
return self->initAllgatherContext(comm, input, output, count, static_cast<ncclDataType_t>(dtype));
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t count,
mscclpp::DataType dtype, cudaStream_t stream,
std::unordered_map<std::string, std::shared_ptr<void>>& extras) {
return self->allgatherKernelFunc(ctx, input, output, count, dtype, stream, extras);
},
[self](const void* input, void* output, size_t count, int dtype) {
return self->generateAllgatherContextKey(input, output, count, static_cast<ncclDataType_t>(dtype));
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t count,
mscclpp::DataType dtype) { return self->initAllgatherContext(comm, input, output, count, dtype); },
[self](const void* input, void* output, size_t count, mscclpp::DataType dtype) {
return self->generateAllgatherContextKey(input, output, count, dtype);
});
return allgatherAlgo;
}
13 changes: 7 additions & 6 deletions apps/nccl/src/allgather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <mscclpp/algorithm.hpp>
#include <mscclpp/concurrency_device.hpp>
#include <mscclpp/core.hpp>
#include <mscclpp/executor.hpp>
#include <mscclpp/gpu.hpp>
#include <mscclpp/memory_channel.hpp>
#include <mscclpp/memory_channel_device.hpp>
Expand Down Expand Up @@ -222,12 +223,12 @@ class AllgatherAlgo6 : public mscclpp::AlgorithmBuilder {

void initialize(std::shared_ptr<mscclpp::Communicator> comm, std::unordered_map<std::string, std::shared_ptr<void>>&);
ncclResult_t allgatherKernelFunc(const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output,
size_t count, [[maybe_unused]] ncclDataType_t dtype, cudaStream_t stream,
size_t count, mscclpp::DataType dtype, cudaStream_t stream,
std::unordered_map<std::string, std::shared_ptr<void>>& extras);

std::shared_ptr<mscclpp::AlgorithmCtx> initAllgatherContext(std::shared_ptr<mscclpp::Communicator> comm, const void*,
void* output, size_t, ncclDataType_t);
mscclpp::AlgorithmCtxKey generateAllgatherContextKey(const void*, void*, size_t, ncclDataType_t);
void* output, size_t, mscclpp::DataType);
mscclpp::AlgorithmCtxKey generateAllgatherContextKey(const void*, void*, size_t, mscclpp::DataType);
};

class AllgatherAlgo8 : public mscclpp::AlgorithmBuilder {
Expand All @@ -240,12 +241,12 @@ class AllgatherAlgo8 : public mscclpp::AlgorithmBuilder {
void initialize(std::shared_ptr<mscclpp::Communicator> comm,
std::unordered_map<std::string, std::shared_ptr<void>>& extras);
ncclResult_t allgatherKernelFunc(const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output,
size_t count, [[maybe_unused]] ncclDataType_t dtype, cudaStream_t stream,
size_t count, mscclpp::DataType dtype, cudaStream_t stream,
std::unordered_map<std::string, std::shared_ptr<void>>& extras);

std::shared_ptr<mscclpp::AlgorithmCtx> initAllgatherContext(std::shared_ptr<mscclpp::Communicator> comm, const void*,
void* output, size_t, ncclDataType_t);
mscclpp::AlgorithmCtxKey generateAllgatherContextKey(const void*, void*, size_t, ncclDataType_t);
void* output, size_t, mscclpp::DataType);
mscclpp::AlgorithmCtxKey generateAllgatherContextKey(const void*, void*, size_t, mscclpp::DataType);

size_t scratchBufferSize_;
std::shared_ptr<char> scratchBuffer_;
Expand Down
Loading