Skip to content

Commit 1465751

Browse files
authored
Support rcmode for write operations (#577)
* Enable rcmode for write * Fix format * Remove stray print
1 parent 172b7c2 commit 1465751

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

p2p/uccl_engine.cc

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ void listener_thread_func(uccl_conn_t* conn) {
110110

111111
uint64_t mr_id = 0;
112112
switch (md.op) {
113-
case UCCL_READ: {
113+
case UCCL_RW_RC: {
114114
tx_msg_t tx_data = md.data.tx_data;
115115
auto local_mem_iter = mem_reg_info.find(tx_data.data_ptr);
116116
if (local_mem_iter == mem_reg_info.end()) {
@@ -188,7 +188,7 @@ void listener_thread_func(uccl_conn_t* conn) {
188188
#endif
189189
break;
190190
}
191-
case UCCL_VECTOR_READ: {
191+
case UCCL_VECTOR_RW_RC: {
192192
size_t count = md.data.vector_data.count;
193193

194194
tx_msg_t* tx_data_array = new tx_msg_t[count];
@@ -468,6 +468,21 @@ int uccl_engine_write(uccl_conn_t* conn, uccl_mr_t* mr, void const* data,
468468
: -1;
469469
}
470470

471+
int uccl_engine_write_rc(uccl_conn_t* conn, uccl_mr_t* mr, void const* data,
472+
size_t size, void* slot_item_ptr,
473+
uint64_t* transfer_id) {
474+
if (!conn || !mr || !data) return -1;
475+
476+
FifoItem slot_item;
477+
slot_item = *static_cast<FifoItem*>(slot_item_ptr);
478+
479+
return conn->engine->endpoint->write_async(conn->conn_id, mr->mr_id,
480+
const_cast<void*>(data), size,
481+
slot_item, transfer_id)
482+
? 0
483+
: -1;
484+
}
485+
471486
int uccl_engine_recv(uccl_conn_t* conn, uccl_mr_t* mr, void* data,
472487
size_t data_size) {
473488
if (!conn || !mr || !data) return -1;
@@ -584,10 +599,16 @@ int uccl_engine_send_tx_md_vector(uccl_conn_t* conn, md_t* md_array,
584599
size_t count) {
585600
if (!conn || !md_array || count == 0) return -1;
586601

602+
// Check UCCL_RCMODE environment variable
603+
bool rc_mode = false;
604+
char const* rc_mode_env = getenv("UCCL_RCMODE");
605+
if (rc_mode_env != nullptr) {
606+
rc_mode = (std::strcmp(rc_mode_env, "1") == 0);
607+
}
587608
// Determine the operation type based on the first item
588-
uccl_msg_type op_type =
589-
(md_array[0].op == UCCL_READ) ? UCCL_VECTOR_READ : UCCL_VECTOR_WRITE;
590-
609+
uccl_msg_type op_type = (rc_mode || md_array[0].op == UCCL_RW_RC)
610+
? UCCL_VECTOR_RW_RC
611+
: UCCL_VECTOR_WRITE;
591612
md_t vector_md;
592613
vector_md.op = op_type;
593614
vector_md.data.vector_data.count = count;

p2p/uccl_engine.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ typedef struct uccl_mr uccl_mr_t;
1717

1818
// UCCL operation types
1919
enum uccl_msg_type {
20-
UCCL_READ = 0,
20+
UCCL_RW_RC = 0, // Used by both READ/WRITE in RCMODE
2121
UCCL_WRITE = 1,
22-
UCCL_VECTOR_READ = 2,
22+
UCCL_VECTOR_RW_RC = 2, // Used by both READ/WRITE in RCMODE
2323
UCCL_VECTOR_WRITE = 3,
2424
UCCL_FIFO = 4,
2525
UCCL_NOTIFY = 5
@@ -145,6 +145,19 @@ int uccl_engine_get_fifo_item(uccl_conn_t* conn, int id, void* fifo_item);
145145
int uccl_engine_write(uccl_conn_t* conn, uccl_mr_t* mr, void const* data,
146146
size_t size, uint64_t* transfer_id);
147147

148+
/**
149+
* Send data with RC mode (Non blocking).
150+
* @param conn Connection handle.
151+
* @param mr Memory region handle.
152+
* @param data Pointer to the data to send.
153+
* @param size Size of the data.
154+
* @param slot_item_ptr Pointer to the slot item.
155+
* @param transfer_id Pointer to store the transfer ID.
156+
* @return 0 on success, non-zero on failure.
157+
*/
158+
int uccl_engine_write_rc(uccl_conn_t* conn, uccl_mr_t* mr, void const* data,
159+
size_t size, void* slot_item_ptr,
160+
uint64_t* transfer_id);
148161
/**
149162
* Receive data (blocking).
150163
* @param conn Connection handle.

0 commit comments

Comments
 (0)