Skip to content

Commit ab77c07

Browse files
committed
fix bug
1 parent 59d0917 commit ab77c07

File tree

4 files changed

+58
-47
lines changed

4 files changed

+58
-47
lines changed

src/executor/execution_plan.cc

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ std::vector<BufferType> ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c
148148
}
149149
return std::vector<BufferType>(bufferTypes.begin(), bufferTypes.end());
150150
}
151-
size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const {
151+
152+
void ExecutionPlan::Impl::calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag) {
152153
size_t sizePerRank;
153154
if (this->inputChunks.at(rank) != 0)
154155
sizePerRank = inputSize / this->inputChunks.at(rank);
@@ -157,15 +158,18 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, siz
157158
else
158159
throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError);
159160

160-
size_t scratchBufferSize = sizePerRank * this->scratchChunks.at(rank);
161+
this->scratchBufferSize = sizePerRank * this->scratchChunks.at(rank);
161162
if (this->isUsingPacket) {
162-
scratchBufferSize *= 2; /* data + flag */
163+
this->scratchBufferSize *= 2; /* data + flag */
163164
}
164165
if (this->isUsingDoubleScratchBuffer) {
165-
scratchBufferSize *= 2; /* double buffer */
166+
this->scratchBufferSize *= 2; /* double buffer */
166167
}
167-
return scratchBufferSize;
168+
this->scratchBufferOffset = (this->isUsingDoubleScratchBuffer && (flag % 2) == 0) ? (this->scratchBufferSize / 2) : 0;
168169
}
170+
171+
size_t ExecutionPlan::Impl::getScratchBufferSize() const { return this->scratchBufferSize; }
172+
169173
std::vector<Operation> ExecutionPlan::Impl::getOperations(int rank, int threadblock) const {
170174
return this->operations.at(rank)[threadblock];
171175
}
@@ -174,10 +178,9 @@ int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->oper
174178

175179
int ExecutionPlan::Impl::getNThreadsPerBlock() const { return this->nThreadsPerBlock; }
176180

177-
bool ExecutionPlan::Impl::getIsUsingDoubleScratchBuffer() const { return this->isUsingDoubleScratchBuffer; }
178-
179-
void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset,
180-
size_t constDstOffset) {
181+
void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t constSrcOffset,
182+
size_t constDstOffset, int selfRank, size_t inputBufferSize,
183+
size_t outputBufferSize, int flag) {
181184
std::ifstream file(this->planPath);
182185
json obj = json::parse(file);
183186
if (this->name != obj["name"]) {
@@ -202,11 +205,13 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize,
202205

203206
this->inputSize = inputSize;
204207
this->outputSize = outputSize;
205-
this->setupOperations(gpus, contsSrcOffset, constDstOffset);
208+
this->calcScratchBufferSizeAndOffset(selfRank, inputBufferSize, outputBufferSize, flag);
209+
this->setupOperations(gpus, constSrcOffset, constDstOffset);
206210
}
207211

208-
void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset,
209-
size_t constDstOffset) {
212+
void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t constSrcOffset,
213+
size_t constDstOffset, int selfRank, size_t inputBufferSize,
214+
size_t outputBufferSize, int flag) {
210215
std::ifstream file(this->planPath);
211216
json obj = json::parse(file);
212217
if (this->name != obj["name"]) {
@@ -229,7 +234,8 @@ void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t output
229234

230235
this->inputSize = inputSize;
231236
this->outputSize = outputSize;
232-
this->setupOperations(gpus, contsSrcOffset, constDstOffset);
237+
this->calcScratchBufferSizeAndOffset(selfRank, inputBufferSize, outputBufferSize, flag);
238+
this->setupOperations(gpus, constSrcOffset, constDstOffset);
233239
}
234240

235241
// Construct the channel info. Step 1. Flatten SM and PROXY channels into separate vectors.
@@ -299,7 +305,7 @@ void ExecutionPlan::Impl::setupChannels(const json& gpus) {
299305
}
300306
}
301307

302-
void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffset, size_t constDstOffset) {
308+
void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffset, size_t constDstOffset) {
303309
// setup threadblocks and operations
304310
for (const auto& gpu : gpus) {
305311
int rank = gpu["id"];
@@ -334,7 +340,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
334340
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["i_cids"][i]["id"]];
335341
operation.inputOffsets[i] =
336342
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["i_cids"][i]["off"]) +
337-
(srcBufferType != BufferType::SCRATCH ? contsSrcOffset : 0);
343+
(srcBufferType != BufferType::SCRATCH ? constSrcOffset : this->scratchBufferOffset);
338344
chunkIndexes.push_back((uint32_t)op["i_cids"][i]["off"]);
339345
}
340346
}
@@ -345,7 +351,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
345351
for (int i = 0; i < operation.nInputs; i++) {
346352
operation.inputOffsets[i] =
347353
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcs"][i]["off"]) +
348-
(operation.inputBufferType != BufferType::SCRATCH ? contsSrcOffset : 0);
354+
(operation.inputBufferType != BufferType::SCRATCH ? constSrcOffset : this->scratchBufferOffset);
349355
chunkIndexes.push_back((uint32_t)op["srcs"][i]["off"]);
350356
}
351357
}
@@ -358,7 +364,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
358364
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["o_cids"][i]["id"]];
359365
operation.outputOffsets[i] =
360366
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["o_cids"][i]["off"]) +
361-
(dstBufferType != BufferType::SCRATCH ? constDstOffset : 0);
367+
(dstBufferType != BufferType::SCRATCH ? constDstOffset : this->scratchBufferOffset);
362368
chunkIndexes.push_back((uint32_t)op["o_cids"][i]["off"]);
363369
}
364370
}
@@ -369,7 +375,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
369375
for (int i = 0; i < operation.nOutputs; i++) {
370376
operation.outputOffsets[i] =
371377
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dsts"][i]["off"]) +
372-
(operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : 0);
378+
(operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : this->scratchBufferOffset);
373379
chunkIndexes.push_back((uint32_t)op["dsts"][i]["off"]);
374380
}
375381
}
@@ -378,13 +384,19 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
378384
}
379385
if (op.contains("srcoff")) {
380386
operation.srcOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcoff"]);
387+
if (operation.srcBufferType == BufferType::SCRATCH) {
388+
operation.srcOffset += this->scratchBufferOffset;
389+
}
381390
chunkIndexes.push_back((uint32_t)op["srcoff"]);
382391
}
383392
if (op.contains("dstbuff")) {
384393
operation.dstBufferType = convertToBufferType(op["dstbuff"]);
385394
}
386395
if (op.contains("dstoff")) {
387396
operation.dstOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dstoff"]);
397+
if (operation.dstBufferType == BufferType::SCRATCH) {
398+
operation.dstOffset += this->scratchBufferOffset;
399+
}
388400
chunkIndexes.push_back((uint32_t)op["dstoff"]);
389401
}
390402
if (op.contains("cnt")) {

src/executor/executor.cc

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ struct ExecutionContext {
6666
size_t scratchBufferSize;
6767
std::shared_ptr<char> deviceExecutionPlansBuffer;
6868
int nthreadsPerBlock;
69-
bool isUsingDoubleScratchBuffer;
7069
};
7170

7271
struct Executor::Impl {
@@ -83,11 +82,13 @@ struct Executor::Impl {
8382

8483
ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t inputMessageSize,
8584
size_t outputMessageSize, size_t contsSrcOffset, size_t constDstOffset,
86-
size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) {
85+
size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan,
86+
int flag) {
8787
ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name};
8888
if (this->contexts.find(key) != this->contexts.end()) {
8989
plan.impl_->operationsReset();
90-
plan.impl_->lightLoadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset);
90+
plan.impl_->lightLoadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset, rank,
91+
sendBufferSize, recvBufferSize, flag);
9192
this->setupDeviceExecutionPlan(this->contexts[key], rank, plan);
9293
this->contexts[key].deviceExecutionPlansBuffer =
9394
allocExtSharedCuda<char>(this->contexts[key].deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan));
@@ -98,16 +99,16 @@ struct Executor::Impl {
9899
}
99100

100101
plan.impl_->reset();
101-
plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset);
102+
plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset, rank,
103+
sendBufferSize, recvBufferSize, flag);
102104

103105
ExecutionContext context;
104-
size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize, recvBufferSize);
106+
size_t scratchBufferSize = plan.impl_->getScratchBufferSize();
105107
std::shared_ptr<char> scratchBuffer = allocExtSharedCuda<char>(scratchBufferSize);
106108
context.scratchBuffer = scratchBuffer;
107109
context.scratchBufferSize = scratchBufferSize;
108110
context.proxyService = std::make_shared<ProxyService>();
109111
context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock();
110-
context.isUsingDoubleScratchBuffer = plan.impl_->getIsUsingDoubleScratchBuffer();
111112
this->setupConnections(context, rank, plan);
112113
this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan);
113114
this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan);
@@ -305,13 +306,8 @@ struct Executor::Impl {
305306
}
306307

307308
void launchKernel(ExecutionContext& context, int rank, void* sendbuff, void* recvbuff, DataType dataType,
308-
cudaStream_t stream, PacketType packetType) {
309-
static uint32_t flag = 0;
309+
cudaStream_t stream, PacketType packetType, uint32_t flag) {
310310
int nthreadblocks = context.deviceExecutionPlans.size();
311-
char* kernelScratchBufferPtr = context.scratchBuffer.get();
312-
if (context.isUsingDoubleScratchBuffer && (flag % 2)) {
313-
kernelScratchBufferPtr += context.scratchBufferSize / 2;
314-
}
315311
#if defined(ENABLE_NPKIT)
316312
#if defined(__HIP_PLATFORM_AMD__)
317313
if (nthreadblocks > NPKIT_MAX_NUM_GPU_THREADBLOCKS) {
@@ -327,16 +323,14 @@ struct Executor::Impl {
327323
#endif
328324
switch (packetType) {
329325
case PacketType::LL16:
330-
ExecutionKernel::launchKernel<LL16Packet>(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff,
331-
(void*)kernelScratchBufferPtr, dataType,
332-
(DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(),
333-
sharedMemSize, stream, ++flag);
326+
ExecutionKernel::launchKernel<LL16Packet>(
327+
rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(),
328+
dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, flag);
334329
break;
335330
case PacketType::LL8:
336-
ExecutionKernel::launchKernel<LL8Packet>(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff,
337-
(void*)kernelScratchBufferPtr, dataType,
338-
(DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(),
339-
sharedMemSize, stream, ++flag);
331+
ExecutionKernel::launchKernel<LL8Packet>(
332+
rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(),
333+
dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, flag);
340334
break;
341335
default:
342336
throw Error("Invalid packet type", ErrorCode::ExecutorError);
@@ -349,17 +343,18 @@ Executor::Executor(std::shared_ptr<Communicator> comm) : impl_(std::make_unique<
349343
void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize,
350344
[[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan,
351345
cudaStream_t stream, PacketType packetType) {
346+
static uint32_t flag = 1;
352347
size_t sendBytes, recvBytes;
353348
CUdeviceptr sendBasePtr, recvBasePtr;
354349
MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff));
355350
MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff));
356351
size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr;
357352
size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr;
358-
359353
ExecutionContext context =
360354
this->impl_->setupExecutionContext(rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, recvBuffSize,
361-
offsetIn, offsetOut, sendBytes, recvBytes, plan);
362-
this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType);
355+
offsetIn, offsetOut, sendBytes, recvBytes, plan, flag);
356+
this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType, flag);
357+
flag++;
363358
}
364359

365360
Executor::~Executor() = default;

src/include/execution_kernel.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ class ExecutionKernel {
545545
template <typename PacketType>
546546
static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch,
547547
DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream,
548-
uint32_t flag = 0) {
548+
uint32_t flag) {
549549
switch (dataType) {
550550
case DataType::INT32:
551551
executionKernel<int32_t, PacketType><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
@@ -603,7 +603,7 @@ class ExecutionKernel {
603603
template <typename PacketType>
604604
static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch,
605605
DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream,
606-
uint32_t flag = 0);
606+
uint32_t flag);
607607
#endif // !defined(MSCCLPP_DEVICE_HIP)
608608
};
609609
} // namespace mscclpp

src/include/execution_plan.hpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,15 @@ struct ExecutionPlan::Impl {
6565
std::vector<ChannelInfo> getUnpairedChannelInfos(int rank, int worldSize, ChannelType channelType);
6666
std::vector<int> getConnectedPeers(int rank) const;
6767
std::vector<BufferType> getConnectedBufferTypes(int rank) const;
68-
size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const;
68+
size_t getScratchBufferSize() const;
6969
std::vector<Operation> getOperations(int rank, int threadblock) const;
7070
int getThreadblockCount(int rank) const;
7171
int getNThreadsPerBlock() const;
72-
bool getIsUsingDoubleScratchBuffer() const;
7372

74-
void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset);
75-
void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset);
73+
void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset, int rank,
74+
size_t inputBufferSize, size_t outputBufferSize, int flag);
75+
void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset,
76+
int rank, size_t inputBufferSize, size_t outputBufferSize, int flag);
7677
void setupChannels(const nlohmann::json& gpus);
7778
void setupOperations(const nlohmann::json& gpus, size_t contsSrcOffset, size_t constDstOffset);
7879

@@ -98,12 +99,15 @@ struct ExecutionPlan::Impl {
9899
size_t outputSize;
99100
int nThreadsPerBlock;
100101
bool isUsingDoubleScratchBuffer;
102+
size_t scratchBufferSize;
103+
size_t scratchBufferOffset;
101104

102105
private:
103106
std::pair<size_t, u_int32_t> calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const;
104107
size_t getOffset(int rank, size_t inputSize, size_t outputSize, uint32_t chunkIndex, uint32_t alignment = 16) const;
105108
size_t getNChunkSize(int rank, size_t inputSize, size_t outputSize, uint32_t nChunks,
106109
const std::vector<uint32_t> offsets) const;
110+
void calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag);
107111
};
108112

109113
} // namespace mscclpp

0 commit comments

Comments
 (0)