diff --git a/contrib/interconnect/udp/ic_faultinjection.h b/contrib/interconnect/udp/ic_faultinjection.h index c3aa69a2dcc..f0602a19027 100644 --- a/contrib/interconnect/udp/ic_faultinjection.h +++ b/contrib/interconnect/udp/ic_faultinjection.h @@ -83,6 +83,7 @@ typedef enum { FINC_OS_NET_INTERFACE = 19, FINC_OS_MEM_INTERFACE = 20, FINC_OS_CREATE_THREAD = 21, + FINC_PKT_TOO_LONG = 22, /* These are used to inject network faults. */ FINC_NET_RECV_ERROR = 23, @@ -301,6 +302,13 @@ testmode_sendto(const char *caller_name, int socket, const void *buffer, errno = EFAULT; return -1; + case FINC_PKT_TOO_LONG: + if (!FINC_HAS_FAULT(fault_type) || !is_pkt) + break; + write_log("inject fault to sendto: FINC_PKT_TOO_LONG"); + errno = EMSGSIZE; + return -1; + default: break; } @@ -481,25 +489,6 @@ testmode_getsockname(const char *caller_name, int socket, return getsockname(socket, address, address_len); } -/* - * testmode_getsockopt - * getsockopt function with faults injected - */ -static int -testmode_getsockopt(const char *caller_name, int socket, int level, - int option_name, void *restrict option_value, - socklen_t *restrict option_len) -{ - if (FINC_HAS_FAULT(FINC_OS_NET_INTERFACE) && - testmode_inject_fault(gp_udpic_fault_inject_percent)) - { - write_log("inject fault to getsockopt: FINC_OS_NET_INTERFACE"); - errno = EFAULT; - return -1; - } - - return getsockopt(socket, level, option_name, option_value, option_len); -} /* * testmode_setsockopt @@ -657,9 +646,6 @@ testmode_pthread_create(const char *caller_name, pthread_t *thread, #define getsockname(socket, address, address_len) \ testmode_getsockname(PG_FUNCNAME_MACRO, socket, address, address_len) -#define getsockopt(socket, level, option_name, option_value, option_len) \ - testmode_getsockopt(PG_FUNCNAME_MACRO, socket, level, option_name, option_value, option_len) - #define setsockopt(socket, level, option_name, option_value, option_len) \ testmode_setsockopt(PG_FUNCNAME_MACRO, socket, level, option_name, option_value, option_len) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 548f71116c0..3da9bde0af1 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -210,6 +210,8 @@ int #define UDPIC_FLAGS_DUPLICATE (64) #define UDPIC_FLAGS_CAPACITY (128) +#define UDPIC_MIN_BUF_SIZE (128 * 1024) + /* * ConnHtabBin * @@ -246,17 +248,6 @@ struct ConnHashTable (a)->srcPid == (b)->srcPid && \ (a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId)) - -/* - * Cursor IC table definition. - * - * For cursor case, there may be several concurrent interconnect - * instances on QD. The table is used to track the status of the - * instances, which is quite useful for "ACK the past and NAK the future" paradigm. - * - */ -#define CURSOR_IC_TABLE_SIZE (128) - /* * CursorICHistoryEntry * @@ -289,8 +280,9 @@ struct CursorICHistoryEntry typedef struct CursorICHistoryTable CursorICHistoryTable; struct CursorICHistoryTable { + uint32 size; uint32 count; - CursorICHistoryEntry *table[CURSOR_IC_TABLE_SIZE]; + CursorICHistoryEntry **table; }; /* @@ -713,6 +705,9 @@ typedef struct ICStatistics /* Statistics for UDP interconnect. */ static ICStatistics ic_statistics; +/* Cached sockaddr of the listening udp socket */ +static struct sockaddr_storage udp_dummy_packet_sockaddr; + /* UDP listen fd */ int UDP_listenerFd; @@ -740,10 +735,15 @@ static void setRxThreadError(int eno); static void resetRxThreadError(void); static void SendDummyPacket(void); +static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len); +#if defined(__darwin__) +#define s6_addr32 __u6_addr.__u6_addr32 +static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest); +#endif static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort); -static void setXmitSocketOptions(int txfd); -static uint32 setSocketBufferSize(int fd, int type, int expectedSize, int leastSize); -static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily); +static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type); +static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, + int *txFamily, struct sockaddr_storage *listenerSockaddr); static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates, ExecSlice *sendSlice, int *pOutgoingCount); @@ -848,6 +848,8 @@ static inline void logPkt(char *prefix, icpkthdr *pkt); static void aggregateStatistics(ChunkTransportStateEntry *pChunkEntry); static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout); + +static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, const char *errDetail); static TupleChunkListItem receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry, int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent); @@ -1341,8 +1343,13 @@ estimateRTT(MotionConn *mConn , uint32_t mrtt) static void initCursorICHistoryTable(CursorICHistoryTable *t) { + MemoryContext old; t->count = 0; - memset(t->table, 0, sizeof(t->table)); + t->size = Gp_interconnect_cursor_ic_table_size; + + old = MemoryContextSwitchTo(ic_control_info.memContext); + t->table = palloc0(sizeof(struct CursorICHistoryEntry *) * t->size); + MemoryContextSwitchTo(old); } /* @@ -1354,7 +1361,7 @@ addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid) { MemoryContext old; CursorICHistoryEntry *p; - uint32 index = icId % CURSOR_IC_TABLE_SIZE; + uint32 index = icId % t->size; old = MemoryContextSwitchTo(ic_control_info.memContext); p = palloc0(sizeof(struct CursorICHistoryEntry)); @@ -1384,7 +1391,7 @@ static void updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status) { struct CursorICHistoryEntry *p; - uint8 index = icId % CURSOR_IC_TABLE_SIZE; + uint8 index = icId % t->size; for (p = t->table[index]; p; p = p->next) { @@ -1405,7 +1412,7 @@ static CursorICHistoryEntry * getCursorIcEntry(CursorICHistoryTable *t, uint32 icId) { struct CursorICHistoryEntry *p; - uint8 index = icId % CURSOR_IC_TABLE_SIZE; + uint8 index = icId % t->size; for (p = t->table[index]; p; p = p->next) { @@ -1427,7 +1434,7 @@ pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId) { uint8 index; - for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++) + for (index = 0; index < t->size; index++) { struct CursorICHistoryEntry *p, *q; @@ -1476,7 +1483,7 @@ purgeCursorIcEntry(CursorICHistoryTable *t) { uint8 index; - for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++) + for (index = 0; index < t->size; index++) { struct CursorICHistoryEntry *trash; @@ -1571,49 +1578,40 @@ resetRxThreadError() pg_atomic_write_u32(&ic_control_info.eno, 0); } - /* * setupUDPListeningSocket * Setup udp listening socket. */ static void -setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily) -{ - int errnoSave; - int fd = -1; - const char *fun; +setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct sockaddr_storage *listenerSockaddr) +{ + struct addrinfo *addrs = NULL; + struct addrinfo *addr; + struct addrinfo hints; + int ret; + int ic_socket = PGINVALID_SOCKET; + struct sockaddr_storage ic_socket_addr; + int tries = 0; + struct sockaddr_storage listenerAddr; + socklen_t listenerAddrlen = sizeof(ic_socket_addr); + uint32 socketSendBufferSize; + uint32 socketRecvBufferSize; - - /* - * At the moment, we don't know which of IPv6 or IPv4 is wanted, or even - * supported, so just ask getaddrinfo... - * - * Perhaps just avoid this and try socket with AF_INET6 and AF_INT? - * - * Most implementation of getaddrinfo are smart enough to only return a - * particular address family if that family is both enabled, and at least - * one network adapter has an IP address of that family. - */ - struct addrinfo hints; - struct addrinfo *addrs, - *rp; - int s; - struct sockaddr_storage our_addr; - socklen_t our_addr_len; - char service[32]; - - snprintf(service, 32, "%d", 0); memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ - hints.ai_protocol = 0; /* Any protocol - UDP implied for network use due to SOCK_DGRAM */ + hints.ai_protocol = 0; + hints.ai_addrlen = 0; + hints.ai_addr = NULL; + hints.ai_canonname = NULL; + hints.ai_next = NULL; + hints.ai_flags |= AI_NUMERICHOST; #ifdef USE_ASSERT_CHECKING if (gp_udpic_network_disable_ipv6) hints.ai_family = AF_INET; #endif - fun = "getaddrinfo"; if (Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST) { Assert(interconnect_address && strlen(interconnect_address) > 0); @@ -1630,134 +1628,133 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil (errmsg("getaddrinfo called with wildcard address"))); } - s = getaddrinfo(interconnect_address, service, &hints, &addrs); - if (s != 0) - elog(ERROR, "getaddrinfo says %s", gai_strerror(s)); - /* - * getaddrinfo() returns a list of address structures, one for each valid - * address and family we can use. - * - * Try each address until we successfully bind. If socket (or bind) fails, - * we (close the socket and) try the next address. This can happen if the - * system supports IPv6, but IPv6 is disabled from working, or if it - * supports IPv6 and IPv4 is disabled. + * Restrict what IP address we will listen on to just the one that was + * used to create this QE session. */ + Assert(interconnect_address && strlen(interconnect_address) > 0); + ret = pg_getaddrinfo_all(interconnect_address, NULL, &hints, &addrs); + if (ret || !addrs) + { + ereport(LOG, + (errmsg("could not resolve address for UDP IC socket %s: %s", + interconnect_address, + gai_strerror(ret)))); + goto startup_failed; + } /* - * If there is both an AF_INET6 and an AF_INET choice, we prefer the - * AF_INET6, because on UNIX it can receive either protocol, whereas - * AF_INET can only get IPv4. Otherwise we'd need to bind two sockets, - * one for each protocol. - * - * Why not just use AF_INET6 in the hints? That works perfect if we know - * this machine supports IPv6 and IPv6 is enabled, but we don't know that. + * On some platforms, pg_getaddrinfo_all() may return multiple addresses + * only one of which will actually work (eg, both IPv6 and IPv4 addresses + * when kernel will reject IPv6). Worse, the failure may occur at the + * bind() or perhaps even connect() stage. So we must loop through the + * results till we find a working combination. We will generate DEBUG + * messages, but no error, for bogus combinations. */ - -#ifndef __darwin__ -#ifdef HAVE_IPV6 - if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6) + for (addr = addrs; addr != NULL; addr = addr->ai_next) { - /* - * We got both an INET and INET6 possibility, but we want to prefer - * the INET6 one if it works. Reverse the order we got from - * getaddrinfo so that we try things in our preferred order. If we got - * more possibilities (other AFs??), I don't think we care about them, - * so don't worry if the list is more that two, we just rearrange the - * first two. - */ - struct addrinfo *temp = addrs->ai_next; /* second node */ - addrs->ai_next = addrs->ai_next->ai_next; /* point old first node to - * third node if any */ - temp->ai_next = addrs; /* point second node to first */ - addrs = temp; /* start the list with the old second node */ - elog(DEBUG1, "Have both IPv6 and IPv4 choices"); - } -#endif +#ifdef HAVE_UNIX_SOCKETS + /* Ignore AF_UNIX sockets, if any are returned. */ + if (addr->ai_family == AF_UNIX) + continue; #endif - for (rp = addrs; rp != NULL; rp = rp->ai_next) - { - fun = "socket"; - - /* - * getaddrinfo gives us all the parameters for the socket() call as - * well as the parameters for the bind() call. - */ - elog(DEBUG1, "receive socket ai_family %d ai_socktype %d ai_protocol %d", rp->ai_family, rp->ai_socktype, rp->ai_protocol); - fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (fd == -1) - continue; - elog(DEBUG1, "receive socket %d ai_family %d ai_socktype %d ai_protocol %d", fd, rp->ai_family, rp->ai_socktype, rp->ai_protocol); + ereportif(++tries > 1 && gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + errmsg("trying another address for UDP interconnect socket")); - fun = "fcntl(O_NONBLOCK)"; - if (!pg_set_noblock(fd)) + ic_socket = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (ic_socket == PGINVALID_SOCKET) { - if (fd >= 0) - { - closesocket(fd); - fd = -1; - } + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errcode_for_socket_access(), + errmsg("could not create UDP interconnect socket: %m"))); continue; } - fun = "bind"; - elog(DEBUG1, "bind addrlen %d fam %d", rp->ai_addrlen, rp->ai_addr->sa_family); - if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0) + /* + * Bind the socket to a kernel assigned ephemeral port on the + * interconnect_address. + */ + if (bind(ic_socket, addr->ai_addr, addr->ai_addrlen) < 0) { - *txFamily = rp->ai_family; - break; /* Success */ + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errcode_for_socket_access(), + errmsg("could not bind UDP interconnect socket: %m"))); + closesocket(ic_socket); + ic_socket = PGINVALID_SOCKET; + continue; } - if (fd >= 0) + /* Call getsockname() to eventually obtain the assigned ephemeral port */ + if (getsockname(ic_socket, (struct sockaddr *) &listenerAddr, &listenerAddrlen) < 0) { - closesocket(fd); - fd = -1; + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errcode_for_socket_access(), + errmsg("could not get address of socket for UDP interconnect: %m"))); + closesocket(ic_socket); + ic_socket = PGINVALID_SOCKET; + continue; } - } - if (rp == NULL) - { /* No address succeeded */ - goto error; + /* If we get here, we have a working socket */ + break; } - freeaddrinfo(addrs); /* No longer needed */ + if (!addr || ic_socket == PGINVALID_SOCKET) + goto startup_failed; + + /* Memorize the socket fd, kernel assigned port and address family */ + *listenerSocketFd = ic_socket; + if (listenerAddr.ss_family == AF_INET6) + { + *listenerPort = ntohs(((struct sockaddr_in6 *) &listenerAddr)->sin6_port); + *txFamily = AF_INET6; + } + else + { + *listenerPort = ntohs(((struct sockaddr_in *) &listenerAddr)->sin_port); + *txFamily = AF_INET; + } /* - * Get our socket address (IP and Port), which we will save for others to - * connected to. + * cache the successful sockaddr of the listening socket, so + * we can use this information to connect to the listening socket. */ - MemSet(&our_addr, 0, sizeof(our_addr)); - our_addr_len = sizeof(our_addr); + if (listenerSockaddr != NULL) + memcpy(listenerSockaddr, &listenerAddr, sizeof(struct sockaddr_storage)); - fun = "getsockname"; - if (getsockname(fd, (struct sockaddr *) &our_addr, &our_addr_len) < 0) - goto error; - - Assert(our_addr.ss_family == AF_INET || our_addr.ss_family == AF_INET6); - - *listenerSocketFd = fd; + /* Set up socket non-blocking mode */ + if (!pg_set_noblock(ic_socket)) + { + ereport(LOG, + (errcode_for_socket_access(), + errmsg("could not set UDP interconnect socket to nonblocking mode: %m"))); + goto startup_failed; + } - if (our_addr.ss_family == AF_INET6) - *listenerPort = ntohs(((struct sockaddr_in6 *) &our_addr)->sin6_port); - else - *listenerPort = ntohs(((struct sockaddr_in *) &our_addr)->sin_port); + /* Set up the socket's send and receive buffer sizes. */ + socketRecvBufferSize = setUDPSocketBufferSize(ic_socket, SO_RCVBUF); + if (socketRecvBufferSize == -1) + goto startup_failed; + ic_control_info.socketRecvBufferSize = socketRecvBufferSize; - setXmitSocketOptions(fd); + socketSendBufferSize = setUDPSocketBufferSize(ic_socket, SO_SNDBUF); + if (socketSendBufferSize == -1) + goto startup_failed; + ic_control_info.socketSendBufferSize = socketSendBufferSize; + pg_freeaddrinfo_all(hints.ai_family, addrs); return; -error: - errnoSave = errno; - if (fd >= 0) - closesocket(fd); - errno = errnoSave; +startup_failed: + if (addrs) + pg_freeaddrinfo_all(hints.ai_family, addrs); + if (ic_socket != PGINVALID_SOCKET) + closesocket(ic_socket); ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), - errmsg("interconnect error: Could not set up udp listener socket"), - errdetail("%s: %m", fun))); - return; + errmsg("interconnect error: Could not set up udp interconnect socket: %m"))); } /* @@ -1859,8 +1856,8 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort) /* * setup listening socket and sending socket for Interconnect. */ - setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily); - setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily); + setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, &udp_dummy_packet_sockaddr); + setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL); /* Initialize receive control data. */ resetMainThreadWaiting(&rx_control_info.mainWaitingState); @@ -1968,6 +1965,8 @@ static inline void CleanupMotionUDPIFC(void) ICSenderPort = 0; ICSenderFamily = 0; + memset(&udp_dummy_packet_sockaddr, 0, sizeof(udp_dummy_packet_sockaddr)); + #ifdef USE_ASSERT_CHECKING /* @@ -2227,9 +2226,6 @@ destroyConnHashTable(ConnHashTable *ht) /* * sendControlMessage * Helper function to send a control message. - * - * It is different from sendOnce which retries on interrupts... - * Here, we leave it to retransmit logic to handle these cases. */ static inline void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen) @@ -2250,13 +2246,23 @@ sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerL if (gp_interconnect_full_crc) addCRC(pkt); - n = sendto(fd, (const char *) pkt, pkt->len, 0, addr, peerLen); - - /* - * No need to handle EAGAIN here: no-space just means that we dropped the - * packet: our ordinary retransmit mechanism will handle that case - */ - + /* retry 10 times for sending control message */ + int counter = 0; + while (counter < 10) + { + counter++; + n = sendto(fd, (const char *) pkt, pkt->len, 0, addr, peerLen); + if (n < 0) + { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + continue; + else { + write_log("sendcontrolmessage: got errno %d", errno); + return; + } + } + break; + } if (n < pkt->len) write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq); } @@ -2558,81 +2564,46 @@ freeRxBuffer(RxBufferPool *p, icpkthdr *buf) } /* - * setSocketBufferSize - * Set socket buffer size. + * Set UDP IC send/receive socket buffer size. + * + * We must carefully size the UDP IC socket's send/receive buffers. If the size + * is too small, say 128K, and send queue depth and receive queue depth are + * large, then there might be a lot of dropped/reordered packets. We start + * trying from a size of 2MB (unless Gp_udp_bufsize_k is specified), and + * gradually back off to UDPIC_MIN_BUF_SIZE. For a given size setting to be + * successful, the corresponding UDP kernel buffer size params must be adequate. + * */ static uint32 -setSocketBufferSize(int fd, int type, int expectedSize, int leastSize) +setUDPSocketBufferSize(int ic_socket, int buffer_type) { - int bufSize; - int errnoSave; - socklen_t skLen = 0; - const char *fun; + int expected_size; + int curr_size; + ACCEPT_TYPE_ARG3 option_len = 0; - fun = "getsockopt"; - skLen = sizeof(bufSize); - if (getsockopt(fd, SOL_SOCKET, type, (char *) &bufSize, &skLen) < 0) - goto error; + Assert(buffer_type == SO_SNDBUF || buffer_type == SO_RCVBUF); - elog(DEBUG1, "UDP-IC: xmit default buffer size %d bytes", bufSize); + expected_size = (Gp_udp_bufsize_k ? Gp_udp_bufsize_k * 1024 : 2048 * 1024); - /* - * We'll try the expected size first, and fall back to least size if that - * doesn't work. - */ - - bufSize = expectedSize; - fun = "setsockopt"; - while (setsockopt(fd, SOL_SOCKET, type, (const char *) &bufSize, skLen) < 0) + curr_size = expected_size; + option_len = sizeof(curr_size); + while (setsockopt(ic_socket, SOL_SOCKET, buffer_type, (const char *) &curr_size, option_len) < 0) { - bufSize = bufSize >> 1; - if (bufSize < leastSize) - goto error; + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errmsg("UDP-IC: setsockopt %s failed to set buffer size = %d bytes: %m", + buffer_type == SO_SNDBUF ? "send": "receive", + curr_size))); + curr_size = curr_size >> 1; + if (curr_size < UDPIC_MIN_BUF_SIZE) + return -1; } - elog(DEBUG1, "UDP-IC: xmit use buffer size %d bytes", bufSize); - - return bufSize; - -error: - errnoSave = errno; - if (fd >= 0) - closesocket(fd); - errno = errnoSave; - ereport(ERROR, - (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), - errmsg("interconnect error: Could not set up udp listener socket"), - errdetail("%s: %m", fun))); - /* Make GCC not complain. */ - return 0; -} - -/* - * setXmitSocketOptions - * Set transmit socket options. - */ -static void -setXmitSocketOptions(int txfd) -{ - uint32 bufSize = 0; - - /* - * The Gp_udp_bufsize_k guc should be set carefully. - * - * If it is small, such as 128K, and send queue depth and receive queue - * depth are large, then it is possible OS can not handle all of the UDP - * packets GPDB delivered to it. OS will introduce a lot of packet losses - * and disordered packets. - * - * In order to set Gp_udp_bufsize_k to a larger value, the OS UDP buffer - * should be set to a large enough value. - * - */ - bufSize = (Gp_udp_bufsize_k != 0 ? Gp_udp_bufsize_k * 1024 : 2048 * 1024); - - ic_control_info.socketRecvBufferSize = setSocketBufferSize(txfd, SO_RCVBUF, bufSize, 128 * 1024); - ic_control_info.socketSendBufferSize = setSocketBufferSize(txfd, SO_SNDBUF, bufSize, 128 * 1024); + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errmsg("UDP-IC: socket %s current buffer size = %d bytes", + buffer_type == SO_SNDBUF ? "send": "receive", + curr_size))); + return curr_size; } #if defined(USE_ASSERT_CHECKING) || defined(AMS_VERBOSE_LOGGING) @@ -3307,30 +3278,8 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS */ if (pEntry->txfd_family == AF_INET6) { - struct sockaddr_storage temp; - const struct sockaddr_in *in = (const struct sockaddr_in *) &conn->peer; - struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp; - - memset(&temp, 0, sizeof(temp)); - elog(DEBUG1, "We are inet6, remote is inet. Converting to v4 mapped address."); - - /* Construct a V4-to-6 mapped address. */ - temp.ss_family = AF_INET6; - in6_new->sin6_family = AF_INET6; - in6_new->sin6_port = in->sin_port; - in6_new->sin6_flowinfo = 0; - - memset(&in6_new->sin6_addr, '\0', sizeof(in6_new->sin6_addr)); - /* in6_new->sin6_addr.s6_addr16[5] = 0xffff; */ - ((uint16 *) &in6_new->sin6_addr)[5] = 0xffff; - /* in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; */ - memcpy(((char *) &in6_new->sin6_addr) + 12, &(in->sin_addr), 4); - in6_new->sin6_scope_id = 0; - - /* copy it back */ - memcpy(&conn->peer, &temp, sizeof(struct sockaddr_in6)); - conn->peer_len = sizeof(struct sockaddr_in6); + ConvertToIPv4MappedAddr(&conn->peer, &conn->peer_len); } else { @@ -3545,30 +3494,65 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) set_test_mode(); #endif + /* Prune the QD's history table if it is too large */ if (Gp_role == GP_ROLE_DISPATCH) { - DistributedTransactionId distTransId = 0; - TransactionId localTransId = 0; - TransactionId subtransId = 0; - - GetAllTransactionXids(&(distTransId), - &(localTransId), - &(subtransId)); + CursorICHistoryTable *ich_table = &rx_control_info.cursorHistoryTable; + DistributedTransactionId distTransId = getDistributedTransactionId(); - /* - * Prune only when we are not in the save transaction and there is a - * large number of entries in the table - */ - if (distTransId != rx_control_info.lastDXatId && rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE)) + if (ich_table->count > (2 * ich_table->size)) { - if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) - elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, sliceTable->ic_instance_id); - pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id); - } + /* + * distTransId != lastDXatId + * Means the last transaction is finished, it's ok to make a prune. + */ + if (distTransId != rx_control_info.lastDXatId) + { + if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) + elog(DEBUG1, "prune cursor history table (count %d), icid %d, prune_id %d", + ich_table->count, sliceTable->ic_instance_id, sliceTable->ic_instance_id); + pruneCursorIcEntry(ich_table, sliceTable->ic_instance_id); + } + /* + * distTransId == lastDXatId and they are not InvalidTransactionId(0) + * Means current (non Read-Only) transaction isn't finished, should not prune. + */ + else if (rx_control_info.lastDXatId != InvalidTransactionId) + { + ; + } + /* + * distTransId == lastDXatId and they are InvalidTransactionId(0) + * Means they are the same transaction or different Read-Only transactions. + * + * For the latter, it's hard to get a perfect timepoint to prune: prune eagerly may + * cause problems (pruned current Txn's Ic instances), but prune in low frequency + * causes memory leak. + * + * So, we choose a simple algorithm to prune it here. And if it mistakenly prune out + * the still-in-used Ic instance (with lower id), the query may hang forever. + * Then user have to set a bigger gp_interconnect_cursor_ic_table_size value and + * try the query again, it is a workaround. + * + * More backgrounds please see: https://github.com/greenplum-db/gpdb/pull/16458 + */ + else + { + if (sliceTable->ic_instance_id > ich_table->size) + { + uint32 prune_id = sliceTable->ic_instance_id - ich_table->size; + Assert(prune_id < sliceTable->ic_instance_id); - addCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id, gp_command_count); + if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) + elog(DEBUG1, "prune cursor history table (count %d), icid %d, prune_id %d", + ich_table->count, sliceTable->ic_instance_id, prune_id); + pruneCursorIcEntry(ich_table, prune_id); + } + } + } - /* save the latest transaction id. */ + addCursorIcEntry(ich_table, sliceTable->ic_instance_id, gp_command_count); + /* save the latest transaction id */ rx_control_info.lastDXatId = distTransId; } @@ -5371,34 +5355,25 @@ prepareXmit(MotionConn *mConn) } /* - * sendOnce - * Send a packet. + * sendtoWithRetry + * Retry sendto logic and send the packets. */ -static void -sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn) +static ssize_t +sendtoWithRetry(int socket, const void *message, size_t length, + int flags, const struct sockaddr *dest_addr, + socklen_t dest_len, int retry, const char *errDetail) { int32 n; - ChunkTransportStateEntryUDP *pEntry = NULL; - MotionConnUDP *conn = NULL; - - pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry); - Assert(pEntry); - - conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); - -#ifdef USE_ASSERT_CHECKING - if (testmode_inject_fault(gp_udpic_dropxmit_percent)) - { -#ifdef AMS_VERBOSE_LOGGING - write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid); -#endif - return; - } -#endif + int count = 0; xmit_retry: - n = sendto(pEntry->txfd, buf->pkt, buf->pkt->len, 0, - (struct sockaddr *) &conn->peer, conn->peer_len); + /* + * If given retry count is positive, retry up to the limited times. + * Otherwise, retry for unlimited times until succeed. + */ + if (retry > 0 && ++count > retry) + return n; + n = sendto(socket, message, length, flags, dest_addr, dest_len); if (n < 0) { int save_errno = errno; @@ -5406,8 +5381,15 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE if (errno == EINTR) goto xmit_retry; - if (errno == EAGAIN) /* no space ? not an error. */ - return; + /* + * EAGAIN: no space ? not an error. + * + * EFAULT: In Linux system call, it only happens when copying a socket + * address into kernel space failed, which is less likely to happen, + * but mocked heavily by our fault injection in regression tests. + */ + if (errno == EAGAIN || errno == EFAULT) + return n; /* * If Linux iptables (nf_conntrack?) drops an outgoing packet, it may @@ -5419,20 +5401,65 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("Interconnect error writing an outgoing packet: %m"), - errdetail("error during sendto() for Remote Connection: contentId=%d at %s", - conn->mConn.remoteContentId, conn->mConn.remoteHostAndPort))); - return; + errdetail("error during sendto() %s", errDetail))); + return n; + } + + /* + * If the OS can detect an MTU issue on the host network interfaces, we + * would get EMSGSIZE here. So, bail with a HINT about checking MTU. + */ + if (errno == EMSGSIZE) + { + ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), + errmsg("Interconnect error writing an outgoing packet: %m"), + errdetail("error during sendto() call (error:%d).\n" + "%s", save_errno, errDetail), + errhint("check if interface MTU is equal across the cluster and lower than gp_max_packet_size"))); } ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("Interconnect error writing an outgoing packet: %m"), errdetail("error during sendto() call (error:%d).\n" - "For Remote Connection: contentId=%d at %s", - save_errno, conn->mConn.remoteContentId, - conn->mConn.remoteHostAndPort))); + "%s", save_errno, errDetail))); /* not reached */ } + return n; +} + +/* + * sendOnce + * Send a packet. + */ +static void +sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn) +{ + int32 n; + ChunkTransportStateEntryUDP *pEntry = NULL; + MotionConnUDP *conn = NULL; + + pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry); + Assert(pEntry); + + conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); + +#ifdef USE_ASSERT_CHECKING + if (testmode_inject_fault(gp_udpic_dropxmit_percent)) + { +#ifdef AMS_VERBOSE_LOGGING + write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid); +#endif + return; + } +#endif + + char errDetail[100]; + snprintf(errDetail, sizeof(errDetail), "For Remote Connection: contentId=%d at %s", + conn->mConn.remoteContentId, + conn->mConn.remoteHostAndPort); + n = sendtoWithRetry(pEntry->txfd, buf->pkt, buf->pkt->len, 0, + (struct sockaddr *) &conn->peer, conn->peer_len, -1, errDetail); if (n != buf->pkt->len) { if (DEBUG1 >= log_min_messages) @@ -5444,7 +5471,6 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE logPkt("PKT DETAILS ", buf->pkt); #endif } - return; } @@ -5928,7 +5954,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged) ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("interconnect encountered a network error, please check your network"), - errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries in %d seconds.", + errdetail("Failed to send packet (seq %u) to %s (pid %d cid %d) after %u retries in %d seconds.", buf->pkt->seq, buf->conn->remoteHostAndPort, buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry, Gp_interconnect_transmit_timeout))); @@ -5949,7 +5975,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged) { ereport(WARNING, (errmsg("interconnect may encountered a network error, please check your network"), - errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries.", + errdetail("Failing to send packet (seq %u) to %s (pid %d cid %d) after %u retries.", buf->pkt->seq, buf->conn->remoteHostAndPort, buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry))); @@ -7993,109 +8019,119 @@ WaitInterconnectQuitUDPIFC(void) } /* - * Send a dummy packet to interconnect thread to exit poll() immediately + * If the socket was created AF_INET6, but the address we want to + * send to is IPv4 (AF_INET), we need to change the address + * format. On Linux, this is not necessary: glibc automatically + * handles this. But on MAC OSX and Solaris, we need to convert + * the IPv4 address to IPv4-mapped IPv6 address in AF_INET6 format. + * + * The comment above relies on getaddrinfo() via function getSockAddr to get + * the correct V4-mapped address. We need to be careful here as we need to + * ensure that the platform we are using is POSIX 1003-2001 compliant. + * Just to be on the safeside, we'll be keeping this function for + * now to be used for all platforms and not rely on POSIX. + * + * Since this can be called in a signal handler, we avoid the use of + * async-signal unsafe functions such as memset/memcpy */ static void -SendDummyPacket(void) +ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len) { - int sockfd = -1; - int ret; - struct addrinfo *addrs = NULL; - struct addrinfo *rp; - struct addrinfo hint; - uint16 udp_listener; - char port_str[32] = {0}; - char *dummy_pkt = "stop it"; - int counter; + const struct sockaddr_in *in = (const struct sockaddr_in *) sockaddr; + struct sockaddr_storage temp = {0}; + struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp; - /* - * Get address info from interconnect udp listener port - */ - udp_listener = GetListenPortUDP(); - snprintf(port_str, sizeof(port_str), "%d", udp_listener); + /* Construct a IPv4-to-IPv6 mapped address. */ + temp.ss_family = AF_INET6; + in6_new->sin6_family = AF_INET6; + in6_new->sin6_port = in->sin_port; + in6_new->sin6_flowinfo = 0; - MemSet(&hint, 0, sizeof(hint)); - hint.ai_socktype = SOCK_DGRAM; - hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */ + ((uint16 *) &in6_new->sin6_addr)[5] = 0xffff; - /* Never do name resolution */ -#ifdef AI_NUMERICSERV - hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; -#else - hint.ai_flags = AI_NUMERICHOST; + in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; + in6_new->sin6_scope_id = 0; + + /* copy it back */ + *sockaddr = temp; + *o_len = sizeof(struct sockaddr_in6); +} + +#if defined(__darwin__) +/* macos does not accept :: as the destination, we will need to covert this to the IPv6 loopback */ +static void +ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest) +{ + char address[INET6_ADDRSTRLEN]; + /* we want to terminate our own process, so this should be local */ + const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &udp_dummy_packet_sockaddr; + inet_ntop(AF_INET6, &in6->sin6_addr, address, sizeof(address)); + if (strcmp("::", address) == 0) + ((struct sockaddr_in6 *)dest)->sin6_addr = in6addr_loopback; +} #endif - ret = pg_getaddrinfo_all(interconnect_address, port_str, &hint, &addrs); - if (ret || !addrs) - { - elog(LOG, "send dummy packet failed, pg_getaddrinfo_all(): %m"); - goto send_error; - } +/* + * Send a dummy packet to interconnect thread to exit poll() immediately + */ +static void +SendDummyPacket(void) +{ + int ret; + char *dummy_pkt = "stop it"; + int counter; + struct sockaddr_storage dest; + socklen_t dest_len; - for (rp = addrs; rp != NULL; rp = rp->ai_next) - { - /* Create socket according to pg_getaddrinfo_all() */ - sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sockfd < 0) - continue; + Assert(udp_dummy_packet_sockaddr.ss_family == AF_INET || udp_dummy_packet_sockaddr.ss_family == AF_INET6); + Assert(ICSenderFamily == AF_INET || ICSenderFamily == AF_INET6); - if (!pg_set_noblock(sockfd)) - { - if (sockfd >= 0) - { - closesocket(sockfd); - sockfd = -1; - } - continue; - } - break; + dest = udp_dummy_packet_sockaddr; + dest_len = (ICSenderFamily == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + + if (ICSenderFamily == AF_INET6) + { +#if defined(__darwin__) + if (udp_dummy_packet_sockaddr.ss_family == AF_INET6) + ConvertIPv6WildcardToLoopback(&dest); +#endif + if (udp_dummy_packet_sockaddr.ss_family == AF_INET) + ConvertToIPv4MappedAddr(&dest, &dest_len); } - if (rp == NULL) + if (ICSenderFamily == AF_INET && udp_dummy_packet_sockaddr.ss_family == AF_INET6) { - elog(LOG, "send dummy packet failed, create socket failed: %m"); - goto send_error; + /* the size of AF_INET6 is bigger than the side of IPv4, so + * converting from IPv6 to IPv4 may potentially not work. */ + ereport(LOG, errmsg("sending dummy packet failed: cannot send from AF_INET to receiving on AF_INET6")); + return; } /* - * Send a dummy package to the interconnect listener, try 10 times + * Send a dummy package to the interconnect listener, try 10 times. + * We don't want to close the socket at the end of this function, since + * the socket will eventually close during the motion layer cleanup. */ - counter = 0; while (counter < 10) { counter++; - ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen); + ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest, dest_len); if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; else { - elog(LOG, "send dummy packet failed, sendto failed: %m"); - goto send_error; + ereport(LOG, errmsg("send dummy packet failed, sendto failed: %m")); + return; } } break; } if (counter >= 10) - { - elog(LOG, "send dummy packet failed, sendto failed: %m"); - goto send_error; - } - - pg_freeaddrinfo_all(hint.ai_family, addrs); - closesocket(sockfd); - return; - -send_error: - - if (addrs) - pg_freeaddrinfo_all(hint.ai_family, addrs); - if (sockfd != -1) - closesocket(sockfd); - return; + ereport(LOG, errmsg("send dummy packet failed, sendto failed with 10 times: %m")); } void logChunkParseDetails(MotionConn *conn, uint32 ic_instance_id) diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c index 7984a9d61a4..c6540a8a8d5 100644 --- a/src/backend/cdb/cdbvars.c +++ b/src/backend/cdb/cdbvars.c @@ -201,6 +201,7 @@ int Gp_interconnect_queue_depth = 4; /* max number of messages * we drop. */ int Gp_interconnect_snd_queue_depth = 2; int Gp_interconnect_mem_size = 10; +int Gp_interconnect_cursor_ic_table_size = 128; int Gp_interconnect_timer_period = 5; int Gp_interconnect_timer_checking_period = 20; int Gp_interconnect_default_rtt = 20; diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 18d33966de2..dce6ae61505 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -3782,6 +3782,17 @@ struct config_int ConfigureNamesInt_gp[] = NULL, NULL, NULL }, + { + {"gp_interconnect_cursor_ic_table_size", PGC_USERSET, GP_ARRAY_TUNING, + gettext_noop("Sets the size of Cursor History Table in the UDP interconnect"), + gettext_noop("You can try to increase it when a UDF which contains many concurrent " + "cursor queries hangs. The default value is 128.") + }, + &Gp_interconnect_cursor_ic_table_size, + 128, 128, 102400, + NULL, NULL, NULL + }, + { {"gp_interconnect_timer_period", PGC_USERSET, GP_ARRAY_TUNING, gettext_noop("Sets the timer period (in ms) for UDP interconnect"), diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h index fb94a3960b5..abbddb3cd8b 100644 --- a/src/include/cdb/cdbvars.h +++ b/src/include/cdb/cdbvars.h @@ -363,6 +363,16 @@ extern int Gp_interconnect_queue_depth; * */ extern int Gp_interconnect_snd_queue_depth; + +/* + * Cursor IC table size. + * + * For cursor case, there may be several concurrent interconnect + * instances on QD. The table is used to track the status of the + * instances, which is quite useful for "ACK the past and NAK the future" paradigm. + * + */ +extern int Gp_interconnect_cursor_ic_table_size; extern int Gp_interconnect_timer_period; extern int Gp_interconnect_timer_checking_period; extern int Gp_interconnect_default_rtt; diff --git a/src/include/utils/sync_guc_name.h b/src/include/utils/sync_guc_name.h index ca9f7ef45f8..6d09f49155f 100644 --- a/src/include/utils/sync_guc_name.h +++ b/src/include/utils/sync_guc_name.h @@ -74,6 +74,7 @@ "gp_initial_bad_row_limit", "gp_interconnect_address_type", "gp_interconnect_cache_future_packets", + "gp_interconnect_cursor_ic_table_size", "gp_interconnect_debug_retry_interval", "gp_interconnect_default_rtt", "gp_interconnect_fc_method", diff --git a/src/test/regress/expected/icudp/icudp_full.out b/src/test/regress/expected/icudp/icudp_full.out index 2f7a73594e8..62782df6e64 100644 --- a/src/test/regress/expected/icudp/icudp_full.out +++ b/src/test/regress/expected/icudp/icudp_full.out @@ -544,6 +544,14 @@ SELECT system_call_fault_injection_test(); (1 row) +-- inject faults for errMsgSize when packet is too long. +SET gp_udpic_fault_inject_bitmap = 4194304; +SELECT system_call_fault_injection_test(); + system_call_fault_injection_test +---------------------------------- + +(1 row) + -- disable ipv6 may increase the code coverage. SET gp_udpic_network_disable_ipv6 = 1; SELECT system_call_fault_injection_test(); diff --git a/src/test/regress/init_file b/src/test/regress/init_file index 728f6d8ea42..d300d253b94 100644 --- a/src/test/regress/init_file +++ b/src/test/regress/init_file @@ -88,6 +88,9 @@ m/^WARNING: table ".*" contains rows in segment .*, which is outside the # of s # The following output is generated by \d on foreign tables, so ignore it. m/Distributed by: \(.*\)/ m/Distributed randomly/ +# The following output is an interconnect network warning, but still not error out, so ignore it. +m/WARNING: interconnect may encountered a network error, please check your network/ +m/Failing to send packet/ # directory_table test output is sensitive to the user running the tests m/^NOTICE:.*storage user mapping for .* does not exist for storage server/ diff --git a/src/test/regress/sql/icudp/icudp_full.sql b/src/test/regress/sql/icudp/icudp_full.sql index 1436a31cf55..6fd07f29aba 100644 --- a/src/test/regress/sql/icudp/icudp_full.sql +++ b/src/test/regress/sql/icudp/icudp_full.sql @@ -276,6 +276,10 @@ $$; SET gp_udpic_fault_inject_bitmap = 524288; SELECT system_call_fault_injection_test(); +-- inject faults for errMsgSize when packet is too long. +SET gp_udpic_fault_inject_bitmap = 4194304; +SELECT system_call_fault_injection_test(); + -- disable ipv6 may increase the code coverage. SET gp_udpic_network_disable_ipv6 = 1; SELECT system_call_fault_injection_test();