Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions contrib/interconnect/udp/ic_udpifc.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ int
#define UDPIC_FLAGS_DISORDER (32)
#define UDPIC_FLAGS_DUPLICATE (64)
#define UDPIC_FLAGS_CAPACITY (128)
#define UDPIC_FLAGS_FULL (256)

/*
* ConnHtabBin
Expand Down Expand Up @@ -835,7 +836,7 @@ static void initUdpManager(mudp_manager_t mptr);
static inline void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged);

static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now);
static void checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *conn);
static void checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *conn);

static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
static void cleanupStartupCache(void);
Expand Down Expand Up @@ -5220,6 +5221,12 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun
shouldSendBuffers |= (handleAckForDisorderPkt(transportStates, &pEntry->entry, &ackConn->mConn, pkt));
break;
}
else if (pkt->flags & UDPIC_FLAGS_FULL)
{
if (DEBUG1 >= log_min_messages)
write_log("Recv buff is full [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
break;
}

/*
* don't get out of the loop if pkt->seq equals to
Expand Down Expand Up @@ -6099,6 +6106,7 @@ checkExpiration(ChunkTransportState *transportStates,
if (pollAcks(transportStates, pEntryUdp->txfd, wait_time))
{
handleAcks(transportStates, pEntry, false);
curBuf->nRetry = 0;
break;
}

Expand All @@ -6120,6 +6128,12 @@ checkExpiration(ChunkTransportState *transportStates,
};
}

if (loop_ack > Gp_interconnect_min_retries_before_timeout / 5)
write_log("Resending packet (seq %d) to %s (pid %d cid %d) with %d retries in %lu seconds",
curBuf->pkt->seq, curBuf->conn->remoteHostAndPort,
curBuf->pkt->dstPid, curBuf->pkt->dstContentId, curBuf->nRetry,
(now - curBuf->sentTime) / 1000 / 1000);

currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn);

retransmits++;
Expand Down Expand Up @@ -6214,7 +6228,7 @@ checkExpiration(ChunkTransportState *transportStates,
*
*/
static void
checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
{
uint64 deadlockCheckTime;
ChunkTransportStateEntryUDP *pEntry = NULL;
Expand Down Expand Up @@ -6251,17 +6265,31 @@ checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
ic_control_info.lastDeadlockCheckTime = now;
ic_statistics.statusQueryMsgNum++;

if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE && pollAcks(transportStates, pEntry->txfd, 50))
{
handleAcks(transportStates, pChunkEntry, false);
conn->deadlockCheckBeginTime = now;
}

/* check network error. */
if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 100 * 1000))
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect encountered a network error, please check your network"),
errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.",
conn->mConn.remoteHostAndPort,
conn->conn_info.dstPid,
conn->conn_info.dstContentId,
Gp_interconnect_transmit_timeout)));
write_log("Did not get any response from %s (pid %d cid %d) in 600 seconds.",conn->mConn.remoteHostAndPort,
conn->conn_info.dstPid,
conn->conn_info.dstContentId);

if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER)
conn->capacity += 1;

if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect encountered a network error, please check your network"),
errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.",
conn->mConn.remoteHostAndPort,
conn->conn_info.dstPid,
conn->conn_info.dstContentId,
Gp_interconnect_transmit_timeout)));
}
}
}
Expand Down Expand Up @@ -6393,7 +6421,7 @@ checkExceptions(ChunkTransportState *transportStates,

if ((retry & 0x3) == 2)
{
checkDeadlock(pEntry, conn);
checkDeadlock(transportStates, pEntry, conn);
checkRxThreadError();
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
}
Expand Down Expand Up @@ -6543,6 +6571,9 @@ SendChunkUDPIFC(ChunkTransportState *transportStates,
}
checkExceptions(transportStates, &pEntry->entry, &conn->mConn, retry++, timeout);
doCheckExpiration = false;

if (!doCheckExpiration && icBufferListLength(&conn->unackQueue) == 0 && conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0)
sendBuffers(transportStates, &pEntry->entry, &conn->mConn);
}

conn->mConn.pBuff = (uint8 *) conn->curBuff->pkt;
Expand Down Expand Up @@ -7136,6 +7167,30 @@ handleDataPacket(MotionConn *mConn, icpkthdr *pkt, struct sockaddr_storage *peer
logPkt("Interconnect error: received a packet when the queue is full ", pkt);
ic_statistics.disorderedPktNum++;
conn->stat_count_dropped++;

if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER && rx_control_info.mainWaitingState.waiting &&
rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId &&
rx_control_info.mainWaitingState.waitingQuery == pkt->icId)
{
if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
{
if (rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE)
rx_control_info.mainWaitingState.reachRoute = conn->route;
}
else if (rx_control_info.mainWaitingState.waitingRoute == conn->route)
{
if (DEBUG2 >= log_min_messages)
write_log("rx thread: main_waiting waking it route %d", rx_control_info.mainWaitingState.waitingRoute);
rx_control_info.mainWaitingState.reachRoute = conn->route;
}
/* WAKE MAIN THREAD HERE */
*wakeup_mainthread = true;
}

if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE)
{
setAckSendParam(param, &conn->mConn, UDPIC_FLAGS_FULL, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
}
return false;
}

Expand Down
Loading