Skip to content

Commit d5fbbf8

Browse files
Zhao Xizhaoxi
authored andcommitted
In resource-constrained environments, 10TB-scale 8-parallel processing in cloudberry may encounter specific anomalies related to Motion layer UDP communication. Below are four key scenarios and how the code modifications address them.
Four Anomaly Scenarios 1. Capacity Mismatch: The receiving end’s buffer becomes full, but the sender is unaware. As a result, the sender’s unacknowledged packet queue continues transmitting, leading to unnecessary retransmissions and packet drops. 2. False Deadlock Detection: The peer node processes heartbeat packets but fails to free up buffer capacity. This triggers a false deadlock judgment, incorrectly flagging network anomalies. 3. Unprocessed Packets Require Main Thread Wakeup: When the receive queue is full, incoming data packets are discarded. However, the main thread still needs to be awakened to process backlogged packets in the queue, preventing permanent stalling. 4. Execution Time Mismatch Across Nodes: Issues like data skew, computational performance gaps, or I/O bottlenecks cause significant differences in execution time between nodes. For example, in a hash join, if the inner table’s is not ready, the node cannot process data from other nodes, leading to packet timeouts. *Example Plan*: Packets from to (via ) timeout because the in remains unready, blocking packet processing. Code Modifications and Their Impact The code changes target the above scenarios by enhancing UDP communication feedback, adjusting deadlock checks, and ensuring proper thread wakeup. Key modifications: 1. Addressing Capacity Mismatch: - Added (256) to flag when the receive buffer is full. - When the receive queue is full (), a response with is sent to the sender (). This notifies the sender to pause or adjust transmission, preventing blind retransmissions. 2. Fixing False Deadlock Detection: - Modified to accept as a parameter, enabling ACK polling during deadlock checks. - Extended the initial timeout for deadlock suspicion from to 600 seconds, reducing premature network error reports. - If no response is received after 600 seconds, the buffer capacity is incrementally increased () to alleviate false bottlenecks, with detailed logging before triggering an error. 3. Ensuring Main Thread Wakeup on Full Queue: - In , even when packets are dropped due to a full queue, the main thread is awakened () if the packet matches the waiting query/node/route. This ensures backlogged packets in the queue are processed. 4. Mitigating Node Execution Mismatches: - Added logging for retransmissions after attempts, providing visibility into prolonged packet delays (e.g., due to unready ). - Reset after successful ACK polling, preventing excessive retry counts from triggering false timeouts.
1 parent 1e13f60 commit d5fbbf8

File tree

1 file changed

+67
-12
lines changed

1 file changed

+67
-12
lines changed

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ int
209209
#define UDPIC_FLAGS_DISORDER (32)
210210
#define UDPIC_FLAGS_DUPLICATE (64)
211211
#define UDPIC_FLAGS_CAPACITY (128)
212+
#define UDPIC_FLAGS_FULL (256)
212213

213214
/*
214215
* ConnHtabBin
@@ -835,7 +836,7 @@ static void initUdpManager(mudp_manager_t mptr);
835836
static inline void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged);
836837

837838
static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now);
838-
static void checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *conn);
839+
static void checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *conn);
839840

840841
static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
841842
static void cleanupStartupCache(void);
@@ -5220,6 +5221,12 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun
52205221
shouldSendBuffers |= (handleAckForDisorderPkt(transportStates, &pEntry->entry, &ackConn->mConn, pkt));
52215222
break;
52225223
}
5224+
else if (pkt->flags & UDPIC_FLAGS_FULL)
5225+
{
5226+
if (DEBUG1 >= log_min_messages)
5227+
write_log("Recv buff is full [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
5228+
break;
5229+
}
52235230

52245231
/*
52255232
* don't get out of the loop if pkt->seq equals to
@@ -6099,6 +6106,7 @@ checkExpiration(ChunkTransportState *transportStates,
60996106
if (pollAcks(transportStates, pEntryUdp->txfd, wait_time))
61006107
{
61016108
handleAcks(transportStates, pEntry, false);
6109+
curBuf->nRetry = 0;
61026110
break;
61036111
}
61046112

@@ -6120,6 +6128,12 @@ checkExpiration(ChunkTransportState *transportStates,
61206128
};
61216129
}
61226130

6131+
if (loop_ack > Gp_interconnect_min_retries_before_timeout / 5)
6132+
write_log("Resending packet (seq %d) to %s (pid %d cid %d) with %d retries in %lu seconds",
6133+
curBuf->pkt->seq, curBuf->conn->remoteHostAndPort,
6134+
curBuf->pkt->dstPid, curBuf->pkt->dstContentId, curBuf->nRetry,
6135+
(now - curBuf->sentTime) / 1000 / 1000);
6136+
61236137
currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn);
61246138

61256139
retransmits++;
@@ -6214,7 +6228,7 @@ checkExpiration(ChunkTransportState *transportStates,
62146228
*
62156229
*/
62166230
static void
6217-
checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
6231+
checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
62186232
{
62196233
uint64 deadlockCheckTime;
62206234
ChunkTransportStateEntryUDP *pEntry = NULL;
@@ -6251,17 +6265,31 @@ checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
62516265
ic_control_info.lastDeadlockCheckTime = now;
62526266
ic_statistics.statusQueryMsgNum++;
62536267

6268+
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE && pollAcks(transportStates, pEntry->txfd, 50))
6269+
{
6270+
handleAcks(transportStates, pChunkEntry, false);
6271+
conn->deadlockCheckBeginTime = now;
6272+
}
6273+
62546274
/* check network error. */
6255-
if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
6275+
if ((now - conn->deadlockCheckBeginTime) > ((uint64) 600 * 1000 * 1000))
62566276
{
6257-
ereport(ERROR,
6258-
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
6259-
errmsg("interconnect encountered a network error, please check your network"),
6260-
errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.",
6261-
conn->mConn.remoteHostAndPort,
6262-
conn->conn_info.dstPid,
6263-
conn->conn_info.dstContentId,
6264-
Gp_interconnect_transmit_timeout)));
6277+
write_log("Did not get any response from %s (pid %d cid %d) in 600 seconds.",conn->mConn.remoteHostAndPort,
6278+
conn->conn_info.dstPid,
6279+
conn->conn_info.dstContentId);
6280+
6281+
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER)
6282+
conn->capacity += 1;
6283+
6284+
if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
6285+
ereport(ERROR,
6286+
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
6287+
errmsg("interconnect encountered a network error, please check your network"),
6288+
errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.",
6289+
conn->mConn.remoteHostAndPort,
6290+
conn->conn_info.dstPid,
6291+
conn->conn_info.dstContentId,
6292+
Gp_interconnect_transmit_timeout)));
62656293
}
62666294
}
62676295
}
@@ -6393,7 +6421,7 @@ checkExceptions(ChunkTransportState *transportStates,
63936421

63946422
if ((retry & 0x3) == 2)
63956423
{
6396-
checkDeadlock(pEntry, conn);
6424+
checkDeadlock(transportStates, pEntry, conn);
63976425
checkRxThreadError();
63986426
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
63996427
}
@@ -6543,6 +6571,9 @@ SendChunkUDPIFC(ChunkTransportState *transportStates,
65436571
}
65446572
checkExceptions(transportStates, &pEntry->entry, &conn->mConn, retry++, timeout);
65456573
doCheckExpiration = false;
6574+
if (!doCheckExpiration && icBufferListLength(&conn->unackQueue) == 0 && conn->capacity > 0 && icBufferListLength(&conn->sndQ
6575+
ueue) > 0)
6576+
sendBuffers(transportStates, &pEntry->entry, &conn->mConn);
65466577
}
65476578

65486579
conn->mConn.pBuff = (uint8 *) conn->curBuff->pkt;
@@ -7136,6 +7167,30 @@ handleDataPacket(MotionConn *mConn, icpkthdr *pkt, struct sockaddr_storage *peer
71367167
logPkt("Interconnect error: received a packet when the queue is full ", pkt);
71377168
ic_statistics.disorderedPktNum++;
71387169
conn->stat_count_dropped++;
7170+
7171+
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER && rx_control_info.mainWaitingState.waiting &&
7172+
rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId &&
7173+
rx_control_info.mainWaitingState.waitingQuery == pkt->icId)
7174+
{
7175+
if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
7176+
{
7177+
if (rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE)
7178+
rx_control_info.mainWaitingState.reachRoute = conn->route;
7179+
}
7180+
else if (rx_control_info.mainWaitingState.waitingRoute == conn->route)
7181+
{
7182+
if (DEBUG2 >= log_min_messages)
7183+
write_log("rx thread: main_waiting waking it route %d", rx_control_info.mainWaitingState.waitingRoute);
7184+
rx_control_info.mainWaitingState.reachRoute = conn->route;
7185+
}
7186+
/* WAKE MAIN THREAD HERE */
7187+
*wakeup_mainthread = true;
7188+
}
7189+
7190+
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE)
7191+
{
7192+
setAckSendParam(param, &conn->mConn, UDPIC_FLAGS_FULL, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
7193+
}
71397194
return false;
71407195
}
71417196

0 commit comments

Comments
 (0)