From a791d1b8abd648d9080eeacc408f34fa2f3423cc Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Thu, 6 Nov 2025 19:07:38 +0000 Subject: [PATCH 1/3] first attempt at ep low latency connectionless --- ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO | 11 ++++ .../deep_ep.egg-info/SOURCES.txt | 11 ++++ .../deep_ep.egg-info/dependency_links.txt | 1 + .../deep_ep.egg-info/requires.txt | 1 + .../deep_ep.egg-info/top_level.txt | 1 + ep/include/proxy_ctx.hpp | 9 +++ ep/include/rdma.hpp | 12 ++++ ep/src/proxy.cpp | 5 +- ep/src/rdma.cpp | 59 ++++++++++++++++--- 9 files changed, 99 insertions(+), 11 deletions(-) create mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO create mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt create mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt create mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt create mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO b/ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO new file mode 100644 index 000000000..23d15d580 --- /dev/null +++ b/ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO @@ -0,0 +1,11 @@ +Metadata-Version: 2.4 +Name: deep_ep +Version: 0.1.0 +Summary: A wrapper package for uccl.ep with additional functionality +Author: whn09 +Requires-Python: >=3.6 +Requires-Dist: uccl +Dynamic: author +Dynamic: requires-dist +Dynamic: requires-python +Dynamic: summary diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt new file mode 100644 index 000000000..848c59d74 --- /dev/null +++ b/ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt @@ -0,0 +1,11 @@ +README.md +setup.py +deep_ep/__init__.py +deep_ep/buffer.py +deep_ep/test_internode.py +deep_ep/utils.py +deep_ep.egg-info/PKG-INFO +deep_ep.egg-info/SOURCES.txt +deep_ep.egg-info/dependency_links.txt +deep_ep.egg-info/requires.txt +deep_ep.egg-info/top_level.txt \ No newline at end of file diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt new file mode 100644 index 000000000..62ef3c50c --- /dev/null +++ b/ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt @@ -0,0 +1 @@ +uccl diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt new file mode 100644 index 000000000..53d742763 --- /dev/null +++ b/ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt @@ -0,0 +1 @@ +deep_ep diff --git a/ep/include/proxy_ctx.hpp b/ep/include/proxy_ctx.hpp index f87f70ce0..42eea6618 100644 --- a/ep/include/proxy_ctx.hpp +++ b/ep/include/proxy_ctx.hpp @@ -53,6 +53,15 @@ struct ProxyCtx { uint32_t dst_ack_qpn; struct ibv_ah* dst_ah = nullptr; + // Connectionless SRD support: multiple AHs and QPNs for different remote NICs + // These are used in fast mode when use_normal_mode = false + std::vector dst_ah_per_nic; // AH for each remote NIC + std::vector dst_qpn_per_nic; // QPN for each remote NIC + std::vector dst_ack_qpn_per_nic; // Ack QPN for each remote NIC + std::vector remote_addr_per_nic; // Remote addr for each NIC + std::vector remote_rkey_per_nic; // Remote rkey for each NIC + std::vector remote_len_per_nic; // Remote len for each NIC + // Remote memory uintptr_t remote_addr = 0; // Base address of remote rdma_buffer uint64_t remote_len = 0; diff --git a/ep/include/rdma.hpp b/ep/include/rdma.hpp index 92176e1d6..b0476377a 100644 --- a/ep/include/rdma.hpp +++ b/ep/include/rdma.hpp @@ -29,6 +29,14 @@ struct RDMAConnectionInfo { // #ifdef EFA uint32_t num_rings; uint32_t data_qp_num[kChannelPerProxy]; + + // Connectionless SRD: support for multiple NICs per node + // In fast mode (use_normal_mode=false), we can use a single QP + // with different AH and QPN for each remote NIC + uint32_t num_nics; // Number of NICs on this node + uint8_t gid_per_nic[MAX_NUM_GPUS][16]; // GID for each NIC + uint32_t qp_num_per_nic[MAX_NUM_GPUS]; // QPN for each NIC + uint32_t ack_qp_num_per_nic[MAX_NUM_GPUS]; // Ack QPN for each NIC // #endif }; @@ -301,6 +309,10 @@ void modify_qp_to_rtr(ProxyCtx& S, RDMAConnectionInfo* remote, void modify_qp_to_rts(ProxyCtx& S, RDMAConnectionInfo* local_info); void modify_qp_to_init(ProxyCtx& S); + +// Create Address Handle for connectionless SRD communication +struct ibv_ah* create_ah(ProxyCtx& S, uint8_t* remote_gid); + void local_poll_completions(ProxyCtx& S, std::unordered_set& acked_wrs, int thread_idx, std::vector& ctx_by_tag); diff --git a/ep/src/proxy.cpp b/ep/src/proxy.cpp index 405dc73ac..caec38aae 100644 --- a/ep/src/proxy.cpp +++ b/ep/src/proxy.cpp @@ -44,7 +44,7 @@ LocalBarrier* map_local_barrier_shm(std::string const& name, bool* out_owner) { perror("shm_open(existing)"); return nullptr; } - struct stat st {}; + struct stat st{}; int tries = 1000; while (tries-- > 0) { if (fstat(fd, &st) == 0 && static_cast(st.st_size) >= kSize) @@ -279,6 +279,7 @@ void Proxy::init_common() { } } usleep(50 * 1000); + if (cfg_.use_normal_mode) { // if (cfg_.thread_idx != 0) { // return; @@ -435,7 +436,7 @@ void Proxy::run_dual() { void Proxy::notify_gpu_completion(uint64_t& my_tail) { if (acked_wrs_.empty()) return; - // Mark all acked command slots in each ring's bitmask + // Mark all acked command slots in each ring's bitmask #ifdef USE_MSCCLPP_FIFO_BACKEND // FIFO path: pop in order using the pending deque and the completion set. for (size_t rb_idx = 0; rb_idx < cfg_.d2h_queues.size(); ++rb_idx) { diff --git a/ep/src/rdma.cpp b/ep/src/rdma.cpp index 5d040b139..e515e1e47 100644 --- a/ep/src/rdma.cpp +++ b/ep/src/rdma.cpp @@ -420,6 +420,12 @@ void create_per_thread_qp(ProxyCtx& S, void* gpu_buffer, size_t size, local_info->psn = 0; local_info->ack_psn = 0; fill_local_gid(S, local_info); + + // Initialize multi-NIC info for fast mode (will be populated later) + local_info->num_nics = 0; + memset(local_info->gid_per_nic, 0, sizeof(local_info->gid_per_nic)); + memset(local_info->qp_num_per_nic, 0, sizeof(local_info->qp_num_per_nic)); + memset(local_info->ack_qp_num_per_nic, 0, sizeof(local_info->ack_qp_num_per_nic)); } void modify_qp_to_init(ProxyCtx& S) { @@ -492,9 +498,25 @@ struct ibv_ah* create_ah(ProxyCtx& S, uint8_t* remote_gid) { void modify_qp_to_rtr(ProxyCtx& S, RDMAConnectionInfo* remote, bool use_normal_mode) { #ifdef EFA + // For connectionless SRD, store primary connection info S.dst_qpn = remote->qp_num; S.dst_ack_qpn = remote->recv_ack_qp_num; S.dst_ah = create_ah(S, remote->gid); + + // In fast mode (use_normal_mode=false), support multiple NICs per node + // Store AH and QPN for each remote NIC for connectionless SRD + if (!use_normal_mode && remote->num_nics > 0) { + S.dst_ah_per_nic.resize(remote->num_nics); + S.dst_qpn_per_nic.resize(remote->num_nics); + S.dst_ack_qpn_per_nic.resize(remote->num_nics); + + for (uint32_t nic_idx = 0; nic_idx < remote->num_nics; ++nic_idx) { + // Create AH for each NIC + S.dst_ah_per_nic[nic_idx] = create_ah(S, remote->gid_per_nic[nic_idx]); + S.dst_qpn_per_nic[nic_idx] = remote->qp_num_per_nic[nic_idx]; + S.dst_ack_qpn_per_nic[nic_idx] = remote->ack_qp_num_per_nic[nic_idx]; + } + } #endif if (use_normal_mode) { @@ -1022,17 +1044,35 @@ static void post_rdma_async_batched_fast_mode( qpx->comp_mask = 0; qpx->wr_flags = IBV_SEND_SIGNALED; + // Connectionless SRD: select which remote NIC to target + // If multi-NIC info is available, round-robin to distribute load + struct ibv_ah* selected_ah = ctx->dst_ah; + uint32_t selected_qpn = ctx->dst_qpn; + uintptr_t selected_remote_addr = ctx->remote_addr; + uint32_t selected_remote_rkey = ctx->remote_rkey; + uint64_t selected_remote_len = ctx->remote_len; + + if (!ctx->dst_ah_per_nic.empty()) { + // Use the work request ID to select a NIC for load balancing + size_t nic_idx = wrs_to_post[i] % ctx->dst_ah_per_nic.size(); + selected_ah = ctx->dst_ah_per_nic[nic_idx]; + selected_qpn = ctx->dst_qpn_per_nic[nic_idx]; + selected_remote_addr = ctx->remote_addr_per_nic[nic_idx]; + selected_remote_rkey = ctx->remote_rkey_per_nic[nic_idx]; + selected_remote_len = ctx->remote_len_per_nic[nic_idx]; + } + uint64_t remote_addr = - ctx->remote_addr + (cmd.req_rptr ? cmd.req_rptr : 0); - uint64_t remote_end = ctx->remote_addr + ctx->remote_len; + selected_remote_addr + (cmd.req_rptr ? cmd.req_rptr : 0); + uint64_t remote_end = selected_remote_addr + selected_remote_len; - if (remote_addr < ctx->remote_addr || + if (remote_addr < selected_remote_addr || remote_addr + cmd.bytes > remote_end) { fprintf(stderr, "[ERROR] Remote write OOB: addr=0x%llx len=%u (base=0x%llx, " "size=%zu), cmd.req_rptr: 0x%llx\n", (unsigned long long)remote_addr, cmd.bytes, - (unsigned long long)ctx->remote_addr, (size_t)ctx->remote_len, + (unsigned long long)selected_remote_addr, (size_t)selected_remote_len, (unsigned long long)cmd.req_rptr); cudaError_t err = cudaDeviceSynchronize(); if (err != cudaSuccess) { @@ -1052,7 +1092,7 @@ static void post_rdma_async_batched_fast_mode( get_low_latency(cmd.cmd_type), cmd.expert_idx, 1, my_rank) .GetImmData(); - ibv_wr_rdma_write_imm(qpx, ctx->remote_rkey, remote_addr, htonl(imm)); + ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, htonl(imm)); #else if (cmd.atomic_offset > 0 && cmd.atomic_val > 0) { int v = static_cast(cmd.atomic_val); @@ -1064,20 +1104,21 @@ static void post_rdma_async_batched_fast_mode( AtomicsImm::Pack(true, false, cmd.atomic_val, cmd.atomic_offset, get_low_latency(cmd.cmd_type)) .GetImmData(); - ibv_wr_rdma_write_imm(qpx, ctx->remote_rkey, remote_addr, htonl(imm)); + ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, htonl(imm)); } else if (j + 1 == k) { uint32_t imm = WriteImm::Pack(get_is_combine(cmd.cmd_type), get_low_latency(cmd.cmd_type), cmd.expert_idx, k, my_rank) .GetImmData(); - ibv_wr_rdma_write_imm(qpx, ctx->remote_rkey, remote_addr, htonl(imm)); + ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, htonl(imm)); } else { - ibv_wr_rdma_write(qpx, ctx->remote_rkey, remote_addr); + ibv_wr_rdma_write(qpx, selected_remote_rkey, remote_addr); } #endif uintptr_t laddr = cmd.req_lptr + reinterpret_cast(ctx->mr->addr); - ibv_wr_set_ud_addr(qpx, ctx->dst_ah, ctx->dst_qpn, QKEY); + + ibv_wr_set_ud_addr(qpx, selected_ah, selected_qpn, QKEY); ibv_wr_set_sge(qpx, ctx->mr->lkey, laddr, static_cast(cmd.bytes)); } From c79e8de69aed51e46568ff4778fd4c2e9f311e15 Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Thu, 6 Nov 2025 19:08:30 +0000 Subject: [PATCH 2/3] remove extra wrapper fileds --- ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO | 11 ----------- ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt | 11 ----------- .../deep_ep.egg-info/dependency_links.txt | 1 - ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt | 1 - ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt | 1 - 5 files changed, 25 deletions(-) delete mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO delete mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt delete mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt delete mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt delete mode 100644 ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO b/ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO deleted file mode 100644 index 23d15d580..000000000 --- a/ep/deep_ep_wrapper/deep_ep.egg-info/PKG-INFO +++ /dev/null @@ -1,11 +0,0 @@ -Metadata-Version: 2.4 -Name: deep_ep -Version: 0.1.0 -Summary: A wrapper package for uccl.ep with additional functionality -Author: whn09 -Requires-Python: >=3.6 -Requires-Dist: uccl -Dynamic: author -Dynamic: requires-dist -Dynamic: requires-python -Dynamic: summary diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt deleted file mode 100644 index 848c59d74..000000000 --- a/ep/deep_ep_wrapper/deep_ep.egg-info/SOURCES.txt +++ /dev/null @@ -1,11 +0,0 @@ -README.md -setup.py -deep_ep/__init__.py -deep_ep/buffer.py -deep_ep/test_internode.py -deep_ep/utils.py -deep_ep.egg-info/PKG-INFO -deep_ep.egg-info/SOURCES.txt -deep_ep.egg-info/dependency_links.txt -deep_ep.egg-info/requires.txt -deep_ep.egg-info/top_level.txt \ No newline at end of file diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt deleted file mode 100644 index 8b1378917..000000000 --- a/ep/deep_ep_wrapper/deep_ep.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt deleted file mode 100644 index 62ef3c50c..000000000 --- a/ep/deep_ep_wrapper/deep_ep.egg-info/requires.txt +++ /dev/null @@ -1 +0,0 @@ -uccl diff --git a/ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt b/ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt deleted file mode 100644 index 53d742763..000000000 --- a/ep/deep_ep_wrapper/deep_ep.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -deep_ep From 6fdb3ec945a2528db568c7116d9d36fc9e19fc4b Mon Sep 17 00:00:00 2001 From: MaoZiming Date: Thu, 6 Nov 2025 19:21:03 +0000 Subject: [PATCH 3/3] format and clean --- ep/include/proxy_ctx.hpp | 13 ++++++------- ep/include/rdma.hpp | 17 ++++++----------- ep/src/proxy.cpp | 6 +++--- ep/src/rdma.cpp | 33 +++++++++++++++------------------ 4 files changed, 30 insertions(+), 39 deletions(-) diff --git a/ep/include/proxy_ctx.hpp b/ep/include/proxy_ctx.hpp index 42eea6618..744cd4662 100644 --- a/ep/include/proxy_ctx.hpp +++ b/ep/include/proxy_ctx.hpp @@ -54,13 +54,12 @@ struct ProxyCtx { struct ibv_ah* dst_ah = nullptr; // Connectionless SRD support: multiple AHs and QPNs for different remote NICs - // These are used in fast mode when use_normal_mode = false - std::vector dst_ah_per_nic; // AH for each remote NIC - std::vector dst_qpn_per_nic; // QPN for each remote NIC - std::vector dst_ack_qpn_per_nic; // Ack QPN for each remote NIC - std::vector remote_addr_per_nic; // Remote addr for each NIC - std::vector remote_rkey_per_nic; // Remote rkey for each NIC - std::vector remote_len_per_nic; // Remote len for each NIC + std::vector dst_ah_per_nic; + std::vector dst_qpn_per_nic; + std::vector dst_ack_qpn_per_nic; + std::vector remote_addr_per_nic; + std::vector remote_rkey_per_nic; + std::vector remote_len_per_nic; // Remote memory uintptr_t remote_addr = 0; // Base address of remote rdma_buffer diff --git a/ep/include/rdma.hpp b/ep/include/rdma.hpp index b0476377a..acb24288c 100644 --- a/ep/include/rdma.hpp +++ b/ep/include/rdma.hpp @@ -20,7 +20,7 @@ struct RDMAConnectionInfo { uint32_t ack_qp_num; uint32_t recv_ack_qp_num; uint32_t ack_psn; - uint32_t rkey; // Memory region key + uint32_t rkey; // Memory region keyf uintptr_t addr; // Buffer address uint64_t len; uint16_t lid; // Local ID @@ -29,15 +29,11 @@ struct RDMAConnectionInfo { // #ifdef EFA uint32_t num_rings; uint32_t data_qp_num[kChannelPerProxy]; - - // Connectionless SRD: support for multiple NICs per node - // In fast mode (use_normal_mode=false), we can use a single QP - // with different AH and QPN for each remote NIC - uint32_t num_nics; // Number of NICs on this node - uint8_t gid_per_nic[MAX_NUM_GPUS][16]; // GID for each NIC - uint32_t qp_num_per_nic[MAX_NUM_GPUS]; // QPN for each NIC - uint32_t ack_qp_num_per_nic[MAX_NUM_GPUS]; // Ack QPN for each NIC - // #endif + + uint32_t num_nics; + uint8_t gid_per_nic[MAX_NUM_GPUS][16]; + uint32_t qp_num_per_nic[MAX_NUM_GPUS]; + uint32_t ack_qp_num_per_nic[MAX_NUM_GPUS]; }; struct PendingUpdate { @@ -310,7 +306,6 @@ void modify_qp_to_rts(ProxyCtx& S, RDMAConnectionInfo* local_info); void modify_qp_to_init(ProxyCtx& S); -// Create Address Handle for connectionless SRD communication struct ibv_ah* create_ah(ProxyCtx& S, uint8_t* remote_gid); void local_poll_completions(ProxyCtx& S, diff --git a/ep/src/proxy.cpp b/ep/src/proxy.cpp index caec38aae..fe3f684a8 100644 --- a/ep/src/proxy.cpp +++ b/ep/src/proxy.cpp @@ -44,7 +44,7 @@ LocalBarrier* map_local_barrier_shm(std::string const& name, bool* out_owner) { perror("shm_open(existing)"); return nullptr; } - struct stat st{}; + struct stat st {}; int tries = 1000; while (tries-- > 0) { if (fstat(fd, &st) == 0 && static_cast(st.st_size) >= kSize) @@ -279,7 +279,7 @@ void Proxy::init_common() { } } usleep(50 * 1000); - + if (cfg_.use_normal_mode) { // if (cfg_.thread_idx != 0) { // return; @@ -436,7 +436,7 @@ void Proxy::run_dual() { void Proxy::notify_gpu_completion(uint64_t& my_tail) { if (acked_wrs_.empty()) return; - // Mark all acked command slots in each ring's bitmask + // Mark all acked command slots in each ring's bitmask #ifdef USE_MSCCLPP_FIFO_BACKEND // FIFO path: pop in order using the pending deque and the completion set. for (size_t rb_idx = 0; rb_idx < cfg_.d2h_queues.size(); ++rb_idx) { diff --git a/ep/src/rdma.cpp b/ep/src/rdma.cpp index e515e1e47..bc82f632a 100644 --- a/ep/src/rdma.cpp +++ b/ep/src/rdma.cpp @@ -420,12 +420,12 @@ void create_per_thread_qp(ProxyCtx& S, void* gpu_buffer, size_t size, local_info->psn = 0; local_info->ack_psn = 0; fill_local_gid(S, local_info); - - // Initialize multi-NIC info for fast mode (will be populated later) + local_info->num_nics = 0; memset(local_info->gid_per_nic, 0, sizeof(local_info->gid_per_nic)); memset(local_info->qp_num_per_nic, 0, sizeof(local_info->qp_num_per_nic)); - memset(local_info->ack_qp_num_per_nic, 0, sizeof(local_info->ack_qp_num_per_nic)); + memset(local_info->ack_qp_num_per_nic, 0, + sizeof(local_info->ack_qp_num_per_nic)); } void modify_qp_to_init(ProxyCtx& S) { @@ -498,20 +498,16 @@ struct ibv_ah* create_ah(ProxyCtx& S, uint8_t* remote_gid) { void modify_qp_to_rtr(ProxyCtx& S, RDMAConnectionInfo* remote, bool use_normal_mode) { #ifdef EFA - // For connectionless SRD, store primary connection info S.dst_qpn = remote->qp_num; S.dst_ack_qpn = remote->recv_ack_qp_num; S.dst_ah = create_ah(S, remote->gid); - - // In fast mode (use_normal_mode=false), support multiple NICs per node - // Store AH and QPN for each remote NIC for connectionless SRD + if (!use_normal_mode && remote->num_nics > 0) { S.dst_ah_per_nic.resize(remote->num_nics); S.dst_qpn_per_nic.resize(remote->num_nics); S.dst_ack_qpn_per_nic.resize(remote->num_nics); - + for (uint32_t nic_idx = 0; nic_idx < remote->num_nics; ++nic_idx) { - // Create AH for each NIC S.dst_ah_per_nic[nic_idx] = create_ah(S, remote->gid_per_nic[nic_idx]); S.dst_qpn_per_nic[nic_idx] = remote->qp_num_per_nic[nic_idx]; S.dst_ack_qpn_per_nic[nic_idx] = remote->ack_qp_num_per_nic[nic_idx]; @@ -1044,16 +1040,13 @@ static void post_rdma_async_batched_fast_mode( qpx->comp_mask = 0; qpx->wr_flags = IBV_SEND_SIGNALED; - // Connectionless SRD: select which remote NIC to target - // If multi-NIC info is available, round-robin to distribute load struct ibv_ah* selected_ah = ctx->dst_ah; uint32_t selected_qpn = ctx->dst_qpn; uintptr_t selected_remote_addr = ctx->remote_addr; uint32_t selected_remote_rkey = ctx->remote_rkey; uint64_t selected_remote_len = ctx->remote_len; - + if (!ctx->dst_ah_per_nic.empty()) { - // Use the work request ID to select a NIC for load balancing size_t nic_idx = wrs_to_post[i] % ctx->dst_ah_per_nic.size(); selected_ah = ctx->dst_ah_per_nic[nic_idx]; selected_qpn = ctx->dst_qpn_per_nic[nic_idx]; @@ -1072,7 +1065,8 @@ static void post_rdma_async_batched_fast_mode( "[ERROR] Remote write OOB: addr=0x%llx len=%u (base=0x%llx, " "size=%zu), cmd.req_rptr: 0x%llx\n", (unsigned long long)remote_addr, cmd.bytes, - (unsigned long long)selected_remote_addr, (size_t)selected_remote_len, + (unsigned long long)selected_remote_addr, + (size_t)selected_remote_len, (unsigned long long)cmd.req_rptr); cudaError_t err = cudaDeviceSynchronize(); if (err != cudaSuccess) { @@ -1092,7 +1086,8 @@ static void post_rdma_async_batched_fast_mode( get_low_latency(cmd.cmd_type), cmd.expert_idx, 1, my_rank) .GetImmData(); - ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, htonl(imm)); + ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, + htonl(imm)); #else if (cmd.atomic_offset > 0 && cmd.atomic_val > 0) { int v = static_cast(cmd.atomic_val); @@ -1104,20 +1099,22 @@ static void post_rdma_async_batched_fast_mode( AtomicsImm::Pack(true, false, cmd.atomic_val, cmd.atomic_offset, get_low_latency(cmd.cmd_type)) .GetImmData(); - ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, htonl(imm)); + ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, + htonl(imm)); } else if (j + 1 == k) { uint32_t imm = WriteImm::Pack(get_is_combine(cmd.cmd_type), get_low_latency(cmd.cmd_type), cmd.expert_idx, k, my_rank) .GetImmData(); - ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, htonl(imm)); + ibv_wr_rdma_write_imm(qpx, selected_remote_rkey, remote_addr, + htonl(imm)); } else { ibv_wr_rdma_write(qpx, selected_remote_rkey, remote_addr); } #endif uintptr_t laddr = cmd.req_lptr + reinterpret_cast(ctx->mr->addr); - + ibv_wr_set_ud_addr(qpx, selected_ah, selected_qpn, QKEY); ibv_wr_set_sge(qpx, ctx->mr->lkey, laddr, static_cast(cmd.bytes));