Skip to content

Commit cf86575

Browse files
authored
Merge pull request #2393 from amirsalarsafaei/ping-timeout
feat(pgxpool): acquire ping timeout
2 parents f6980e4 + 0f61c88 commit cf86575

File tree

3 files changed

+104
-1
lines changed

3 files changed

+104
-1
lines changed

pgxpool/helper_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package pgxpool_test
2+
3+
import (
4+
"context"
5+
"net"
6+
"time"
7+
8+
"github.com/jackc/pgx/v5/pgconn"
9+
)
10+
11+
// delayProxy is a that introduces a configurable delay on reads from the database connection.
12+
type delayProxy struct {
13+
net.Conn
14+
readDelay time.Duration
15+
}
16+
17+
func newDelayProxy(conn net.Conn, readDelay time.Duration) *delayProxy {
18+
p := &delayProxy{
19+
Conn: conn,
20+
readDelay: readDelay,
21+
}
22+
23+
return p
24+
}
25+
26+
func (dp *delayProxy) Read(b []byte) (int, error) {
27+
if dp.readDelay > 0 {
28+
time.Sleep(dp.readDelay)
29+
}
30+
31+
return dp.Conn.Read(b)
32+
}
33+
34+
func newDelayProxyDialFunc(readDelay time.Duration) pgconn.DialFunc {
35+
return func(ctx context.Context, network, addr string) (net.Conn, error) {
36+
conn, err := net.Dial(network, addr)
37+
return newDelayProxy(conn, readDelay), err
38+
}
39+
}

pgxpool/pool.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type Pool struct {
9797
maxConnLifetimeJitter time.Duration
9898
maxConnIdleTime time.Duration
9999
healthCheckPeriod time.Duration
100+
pingTimeout time.Duration
100101

101102
healthCheckMu sync.Mutex
102103
healthCheckTimer *time.Timer
@@ -169,6 +170,10 @@ type Config struct {
169170
// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
170171
MaxConnIdleTime time.Duration
171172

173+
// PingTimeout is the maximum amount of time to wait for a connection to pong before considering it as unhealthy and
174+
// destroying it. If zero, the default is no timeout.
175+
PingTimeout time.Duration
176+
172177
// MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
173178
MaxConns int32
174179

@@ -241,6 +246,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
241246
maxConnLifetime: config.MaxConnLifetime,
242247
maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
243248
maxConnIdleTime: config.MaxConnIdleTime,
249+
pingTimeout: config.PingTimeout,
244250
healthCheckPeriod: config.HealthCheckPeriod,
245251
healthCheckChan: make(chan struct{}, 1),
246252
closeChan: make(chan struct{}),
@@ -614,7 +620,14 @@ func (p *Pool) Acquire(ctx context.Context) (c *Conn, err error) {
614620

615621
shouldPingParams := ShouldPingParams{Conn: cr.conn, IdleDuration: res.IdleDuration()}
616622
if p.shouldPing(ctx, shouldPingParams) {
617-
err := cr.conn.Ping(ctx)
623+
pingCtx := ctx
624+
if p.pingTimeout > 0 {
625+
var cancel context.CancelFunc
626+
pingCtx, cancel = context.WithTimeout(ctx, p.pingTimeout)
627+
defer cancel()
628+
}
629+
630+
err := cr.conn.Ping(pingCtx)
618631
if err != nil {
619632
res.Destroy()
620633
continue

pgxpool/pool_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,3 +1281,54 @@ func TestPoolSendBatchBatchCloseTwice(t *testing.T) {
12811281
assert.NoError(t, err)
12821282
}
12831283
}
1284+
1285+
func TestPoolAcquirePingTimeout(t *testing.T) {
1286+
t.Parallel()
1287+
1288+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
1289+
defer cancel()
1290+
1291+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
1292+
require.NoError(t, err)
1293+
1294+
config.PingTimeout = 200 * time.Millisecond
1295+
config.ConnConfig.DialFunc = newDelayProxyDialFunc(500 * time.Millisecond)
1296+
1297+
var conID *uint32
1298+
// Only ping the connection with the original PID to force creation of a new connection
1299+
config.ShouldPing = func(_ context.Context, params pgxpool.ShouldPingParams) bool {
1300+
if conID != nil && params.Conn.PgConn().PID() == *conID {
1301+
return true
1302+
}
1303+
return false
1304+
}
1305+
1306+
// Limit to a single connection to ensure the same connection is reused
1307+
config.MinConns = 1
1308+
config.MaxConns = 1
1309+
1310+
pool, err := pgxpool.NewWithConfig(ctx, config)
1311+
require.NoError(t, err)
1312+
defer pool.Close()
1313+
1314+
c, err := pool.Acquire(ctx)
1315+
require.NoError(t, err)
1316+
require.EqualValues(t, 1, pool.Stat().TotalConns())
1317+
originalPID := c.Conn().PgConn().PID()
1318+
conID = &originalPID
1319+
1320+
c.Release()
1321+
require.EqualValues(t, 1, pool.Stat().TotalConns())
1322+
1323+
c, err = pool.Acquire(ctx)
1324+
require.NoError(t, err)
1325+
require.EqualValues(t, 1, pool.Stat().TotalConns())
1326+
newPID := c.Conn().PgConn().PID()
1327+
1328+
c.Release()
1329+
1330+
require.EqualValues(t, 1, pool.Stat().TotalConns())
1331+
assert.Nil(t, ctx.Err())
1332+
assert.NotEqualValues(t, originalPID, newPID,
1333+
"Expected new connection due to ping timeout, but got same connection")
1334+
}

0 commit comments

Comments
 (0)