Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c4e1c73
Initial Version of all_to_all_v using dynamic execution plan template
seagater Sep 3, 2025
d253e92
Add executeDynamicAllToAllvWithMPI in dynamic_alltoallv_test to suppo…
seagater Sep 3, 2025
dba0f79
Use mscclpp execution engine
seagater Sep 3, 2025
1073321
Skip actual execution. Validate execution plan generation only
seagater Sep 3, 2025
88d88bd
Fix alginment errors; Local copies of data is correct; Need to fix in…
seagater Sep 4, 2025
77be833
Update
seagater Sep 4, 2025
e2e2797
Add DSL alltallv_dynamic to generate dynamic execution plan
seagater Sep 5, 2025
9663820
Revert to the version support basic dynamic execution plan template
seagater Sep 6, 2025
9676009
Support dsl generated dynamic execution plan
seagater Sep 6, 2025
942d254
Add concreate execution plan creation in createExecutionPlan
seagater Sep 6, 2025
a7842dc
Fix json template processing issue
seagater Sep 8, 2025
e8246fa
Remove unused test
seagater Sep 9, 2025
c832d13
Update test/CMakelists.txt for deleted tests
seagater Sep 10, 2025
34ac53a
Update alltoallv DSL APIs and alltoallv execution plan template
seagater Sep 10, 2025
6d51412
Directly use prefix dynamic_* for keys index, size and tbgroup_id in …
seagater Sep 10, 2025
452e7f1
Revise the processing code to support new dynamic execution plan API
seagater Sep 12, 2025
485d6c7
Expand threadblocks section in concrete execution plan
seagater Sep 13, 2025
f0b1672
Skip size consistency check for alltoallv
seagater Sep 14, 2025
76356e1
Add try-catch for all gpu deleters
seagater Sep 15, 2025
80cbba4
Merge branch 'main' into qinghuazhou/all_to_all_v_dsl
Binyang2014 Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 248 additions & 0 deletions include/mscclpp/dynamic_execution_plan.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef MSCCLPP_DYNAMIC_EXECUTION_PLAN_HPP_
#define MSCCLPP_DYNAMIC_EXECUTION_PLAN_HPP_

#include <mscclpp/executor.hpp>
#include <string>
#include <vector>
#include <unordered_map>
#include <memory>

namespace mscclpp {

// Forward declaration
class Communicator;

/// Runtime parameters for dynamic execution plan
struct DynamicRuntimeParams {
int num_ranks; ///< Number of ranks
std::vector<size_t> send_sizes; ///< Send sizes per peer
std::vector<size_t> recv_sizes; ///< Receive sizes per peer
std::vector<size_t> send_offsets; ///< Send buffer offsets per peer
std::vector<size_t> recv_offsets; ///< Receive buffer offsets per peer
std::vector<int> peerRanks; ///< List of peer ranks (for compatibility)
size_t totalSendSize; ///< Total send buffer size
size_t totalRecvSize; ///< Total receive buffer size
int maxThreadBlocks; ///< Maximum thread blocks available
size_t blockSize; ///< Thread block processing size
};

/// Variable substitution context for dynamic plans
struct VariableContext {
std::unordered_map<std::string, std::string> variables;

void setVariable(const std::string& name, const std::string& value) {
variables[name] = value;
}

std::string substituteVariables(const std::string& template_str) const;
};

/// Dynamic threadblock template information
struct DynamicThreadblockInfo {
int tbgroup_id; ///< Thread block group ID
int num_threadblocks; ///< Number of thread blocks for this group
std::vector<int> peer_ranks; ///< Peer ranks handled by this thread block group
};

/// Dynamic operation template
struct DynamicOperationTemplate {
std::string type; ///< Operation type (put, get, copy, etc.)
std::string inputChunk; ///< Input chunk variable
std::string outputChunk; ///< Output chunk variable
std::string peer; ///< Peer rank variable
std::string channel; ///< Channel variable
std::string threadblockCount; ///< Thread block count variable
std::string size; ///< Size variable
std::string step; ///< Step ID variable
};

/// Dynamic GPU template
struct DynamicGpuTemplate {
int id; ///< GPU ID
std::string inputChunks; ///< Input chunks variable
std::string outputChunks; ///< Output chunks variable
int scratchChunks; ///< Scratch chunks count
std::vector<DynamicOperationTemplate> operationTemplates;
};

// Forward declaration
class DynamicExecutionPlan;

/// Utility class for dynamic all-to-allv operations
class DynamicAllToAllv {
public:
/// Constructor
/// @param plan Reference to the dynamic execution plan
DynamicAllToAllv(DynamicExecutionPlan& plan);

/// Execute dynamic all-to-allv with runtime message sizes using MSCCLPP execution engine
/// @param send_buff Send buffer
/// @param send_sizes Send sizes per peer
/// @param send_offsets Send buffer offsets per peer
/// @param recv_buff Receive buffer
/// @param recv_sizes Receive sizes per peer
/// @param recv_offsets Receive buffer offsets per peer
/// @param comm The communicator
/// @param executor The MSCCLPP executor
/// @param stream CUDA stream
void execute(
void* send_buff,
const std::vector<size_t>& send_sizes,
const std::vector<size_t>& send_offsets,
void* recv_buff,
const std::vector<size_t>& recv_sizes,
const std::vector<size_t>& recv_offsets,
std::shared_ptr<Communicator> comm,
std::shared_ptr<Executor> executor,
cudaStream_t stream);

/// Create runtime parameters from send/recv sizes (static method for compatibility)
/// @param sendSizes Send sizes per peer
/// @param recvSizes Receive sizes per peer
/// @return Runtime parameters structure
static DynamicRuntimeParams createRuntimeParams(
const std::vector<size_t>& sendSizes,
const std::vector<size_t>& recvSizes);

/// Execute method for compatibility (static)
/// @param comm The communicator
/// @param dynamicPlan The dynamic execution plan
/// @param sendBuffer Send buffer
/// @param recvBuffer Receive buffer
/// @param sendSizes Send sizes per peer
/// @param recvSizes Receive sizes per peer
/// @param tag Operation tag
/// @return True if successful, false otherwise
static bool execute(
std::shared_ptr<Communicator> comm,
std::shared_ptr<DynamicExecutionPlan> dynamicPlan,
void* sendBuffer, void* recvBuffer,
const std::vector<size_t>& sendSizes,
const std::vector<size_t>& recvSizes,
int tag = 0);

private:
DynamicExecutionPlan& plan_;
int rank_;
};

/// Dynamic execution plan that can be instantiated at runtime
class DynamicExecutionPlan {
public:
/// Constructor
/// @param planPath Path to the dynamic execution plan JSON file
/// @param rank The rank of this process
DynamicExecutionPlan(const std::string& planPath, int rank);

/// Destructor
~DynamicExecutionPlan();

/// Instantiate the dynamic plan with runtime parameters
/// @param params Runtime parameters for instantiation
/// @return Concrete execution plan as JSON string
std::string instantiate(const DynamicRuntimeParams& params);

/// Create a concrete ExecutionPlan object for the given parameters
/// @param params Runtime parameters for instantiation
/// @return Shared pointer to concrete ExecutionPlan
std::shared_ptr<ExecutionPlan> createExecutionPlan(const DynamicRuntimeParams& params);

/// Create a DynamicAllToAllv object
/// @return Unique pointer to DynamicAllToAllv
std::unique_ptr<DynamicAllToAllv> createAllToAllv();

/// Create a concrete execution plan file for the given parameters (for compatibility)
/// @param params Runtime parameters for instantiation
/// @param outputPath Path where to write the concrete plan
/// @return Path to the created concrete plan file
std::string createConcretePlan(const DynamicRuntimeParams& params, const std::string& outputPath);

/// Get the collective name
std::string collective() const { return collective_; }

/// Get minimum message size
size_t minMessageSize() const { return minMessageSize_; }

/// Get maximum message size
size_t maxMessageSize() const { return maxMessageSize_; }

/// Check if this is a dynamic plan
bool isDynamic() const { return isDynamic_; }

/// Get the rank
int getRank() const { return rank_; }

/// Clean up temporary files created by this plan
void cleanup();

private:
void loadFromJson(const std::string& planPath);
int calculateThreadBlocks(size_t messageSize) const;

// Forward declare a JsonType to avoid exposing nlohmann::json in header
class JsonType;

// Core dynamic template processing methods
void processJsonTemplateVariables(JsonType& json_obj, const VariableContext& var_context);
void processDynamicTemplate(JsonType& json_obj, const DynamicRuntimeParams& params);
void processDynamicGpu(JsonType& gpu_json, const DynamicRuntimeParams& params, int gpu_id);
void processDynamicThreadblocks(JsonType& gpu_json, const DynamicRuntimeParams& params, int gpu_id);
void processDynamicThreadblock(JsonType& tb_json, const DynamicRuntimeParams& params,
int gpu_id, int tb_group_id);
void processDynamicOperations(JsonType& tb_json, const DynamicRuntimeParams& params,
int gpu_id, int tb_group_id);
void processDynamicOperation(JsonType& op_json, const DynamicRuntimeParams& params,
int gpu_id, int tb_group_id, int op_index);
void processDynamicBuffers(JsonType& op_json, const std::string& buffer_key,
const DynamicRuntimeParams& params, int gpu_id, int peer_id);
void processDynamicBufferObject(JsonType& buff_obj, const DynamicRuntimeParams& params, int op_index);

// JSON sanitization methods
void sanitizeJsonForSerialization(JsonType& json_obj);
void aggressivelySanitizeJson(JsonType& json_obj);
JsonType createSanitizedExecutionPlan();
void expandThreadblocks(JsonType& json_obj);
void validateAndFixBufferArrays(JsonType& json_obj); // NEW: Add missing method declaration

// Utility methods for dynamic processing
int calculateThreadBlocksForGroup(int tb_group_id, const DynamicRuntimeParams& params) const;
int getPeerRankForOperation(int gpu_id, int tb_group_id, int op_index,
const DynamicRuntimeParams& params) const;
size_t getChunkSizeForPeer(int peer_id, const DynamicRuntimeParams& params, bool is_send) const;
size_t getChunkOffsetForPeer(int peer_id, const DynamicRuntimeParams& params, bool is_send) const;
size_t getChunkIndexForScratchBuffer(int src_rank, int dst_rank) const;

// Template variable setup
void setupStandardVariables(VariableContext& var_context, const DynamicRuntimeParams& params);
void updateOperationWithRuntimeParams(JsonType& op,
const DynamicRuntimeParams& params,
const VariableContext& var_context);
void processOperationTemplates(JsonType& gpu_json,
const DynamicRuntimeParams& params,
const VariableContext& var_context);
void substituteOperationTemplateVariables(JsonType& operation_template,
const DynamicRuntimeParams& params,
const VariableContext& var_context);

int rank_; ///< Current rank
std::string name_; ///< Plan name
std::string collective_; ///< Collective operation name
std::string protocol_; ///< Protocol name
bool isDynamic_; ///< Whether this is a dynamic plan
size_t minMessageSize_; ///< Minimum message size
size_t maxMessageSize_; ///< Maximum message size
int numThreadsPerBlock_; ///< Number of threads per block
std::unordered_map<std::string, std::string> dynamicParams_; ///< Dynamic parameters
std::vector<DynamicGpuTemplate> gpuTemplates_; ///< GPU templates
std::string temp_file_path_; ///< Path to temporary file (for cleanup)

// Use a pointer to avoid including nlohmann/json.hpp in header
std::unique_ptr<JsonType> templateJson_; ///< Original template JSON from DSL
};

} // namespace mscclpp

#endif // MSCCLPP_DYNAMIC_EXECUTION_PLAN_HPP_
27 changes: 24 additions & 3 deletions include/mscclpp/gpu_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,41 @@ Memory safeAlloc(Alloc alloc, size_t nelems, Args&&... args) {
/// @tparam T Type of each element in the allocated memory.
template <class T = void>
struct GpuDeleter {
void operator()(void* ptr) { gpuFree(ptr); }
void operator()(void* ptr) {
try {
gpuFree(ptr);
} catch (const std::exception& e) {
// Suppress cleanup errors during program termination
// This is a known issue when CUDA context is destroyed before memory cleanup
}
}
};

/// A deleter that calls gpuFreeHost for use with std::unique_ptr or std::shared_ptr.
/// @tparam T Type of each element in the allocated memory.
template <class T = void>
struct GpuHostDeleter {
void operator()(void* ptr) { gpuFreeHost(ptr); }
void operator()(void* ptr) {
try {
gpuFreeHost(ptr);
} catch (const std::exception& e) {
// Suppress cleanup errors during program termination
// This is a known issue when CUDA context is destroyed before memory cleanup
}
}
};

#if (CUDA_NVLS_API_AVAILABLE)
template <class T = void>
struct GpuPhysicalDeleter {
void operator()(void* ptr) { gpuFreePhysical(ptr); }
void operator()(void* ptr) {
try {
gpuFreePhysical(ptr);
} catch (const std::exception& e) {
// Suppress cleanup errors during program termination
// This is a known issue when CUDA context is destroyed before memory cleanup
}
}
};
#endif // CUDA_NVLS_API_AVAILABLE

Expand Down
Loading
Loading