Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 181 additions & 2 deletions docs/v3/advanced/database-maintenance.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,192 @@ For example, you could run the retention flow daily at 2 AM to clean up old flow
- Review query plans with `EXPLAIN ANALYZE`

### "Connection limit reached"
- Implement connection pooling immediately
- Implement connection pooling with PgBouncer (see below)
- Check for connection leaks: connections in 'idle' state for hours
- Reduce Prefect worker/agent connection counts

## Using PgBouncer for connection pooling

For high-scale Prefect deployments with many workers or API instances, using PgBouncer as an external connection pooler can significantly reduce database connection overhead and improve performance.

### Why use PgBouncer?

When you have multiple Prefect servers and workers, each maintains its own connection pool to the database. With 10 API servers each using a pool of 5 connections, you're using 50 connections even if most are idle. PgBouncer consolidates these into a single, efficient pool.

### Setting up PgBouncer

1. **Install PgBouncer** (example using Docker):

```yaml
# docker-compose.yml
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: prefect
POSTGRES_PASSWORD: prefect
POSTGRES_DB: prefect

pgbouncer:
image: edoburu/pgbouncer:latest
environment:
DB_HOST: postgres
DB_PORT: 5432
DB_USER: prefect
DB_PASSWORD: prefect
DB_NAME: prefect
POOL_MODE: transaction # Recommended for Prefect
MAX_CLIENT_CONN: 1000
DEFAULT_POOL_SIZE: 25
LISTEN_PORT: 6432 # PgBouncer listening port
ports:
- "6432:6432"
depends_on:
- postgres
```

2. **Configure PgBouncer** (if not using Docker):

Create `/etc/pgbouncer/pgbouncer.ini`:

```ini
[databases]
prefect = host=localhost port=5432 dbname=prefect

[pgbouncer]
listen_addr = *
listen_port = 6432
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
reserve_pool_size = 5
server_reset_query = DISCARD ALL
```

### Configuring Prefect for PgBouncer

To use PgBouncer with Prefect, configure SQLAlchemy to use NullPool by setting `pool_size` to None:

```python
# In your Prefect configuration
from prefect.server.database.configurations import AsyncPostgresConfiguration

config = AsyncPostgresConfiguration(
connection_url="postgresql+asyncpg://prefect:prefect@localhost:6432/prefect",
sqlalchemy_pool_size=None, # This enables NullPool
statement_cache_size=0 # Disable for transaction mode
)
```

For environment variable configuration:

```bash
# Connect through PgBouncer (note port 6432)
export PREFECT_SERVER_DATABASE_CONNECTION_URL="postgresql+asyncpg://prefect:prefect@localhost:6432/prefect"

# Disable statement caching for transaction mode compatibility
export PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_STATEMENT_CACHE_SIZE=0

# IMPORTANT: pool_size cannot be set to None via environment variables due to
# Pydantic parsing limitations. You must configure it in code or use a settings file.
```

### Important considerations

1. **Pool Mode**: Use `transaction` mode for Prefect. This mode releases connections back to the pool after each transaction, providing the best connection reuse.

2. **Statement Caching**: Disable prepared statement caching when using transaction mode, as prepared statements don't persist across different backend connections.

3. **Connection Limits**: Set PgBouncer's `default_pool_size` based on your PostgreSQL `max_connections` and expected load. A good starting point is 25-50 connections.

4. **Monitoring**: Monitor both PgBouncer and PostgreSQL connections:

```sql
-- Check PgBouncer stats (connect to PgBouncer on port 6432)
SHOW POOLS;
SHOW STATS;

-- Check actual PostgreSQL connections (connect to PostgreSQL directly)
SELECT count(*), state, usename
FROM pg_stat_activity
GROUP BY state, usename;
```

### Example production configuration

For a production Prefect deployment with high availability:

```yaml
# docker-compose.yml
services:
prefect-api:
image: prefecthq/prefect:3-latest
deploy:
replicas: 3
environment:
PREFECT_SERVER_DATABASE_CONNECTION_URL: "postgresql+asyncpg://prefect:${DB_PASSWORD}@pgbouncer:6432/prefect"
PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE: "null" # Disable SQLAlchemy pooling
PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_STATEMENT_CACHE_SIZE: "0"
depends_on:
- pgbouncer

prefect-worker:
image: prefecthq/prefect:3-latest
deploy:
replicas: 5
environment:
PREFECT_API_URL: "http://prefect-api:4200/api"
PREFECT_SERVER_DATABASE_CONNECTION_URL: "postgresql+asyncpg://prefect:${DB_PASSWORD}@pgbouncer:6432/prefect"
PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE: "null"
depends_on:
- pgbouncer
```

### Troubleshooting PgBouncer

**"prepared statement does not exist" errors**
- Ensure `statement_cache_size=0` is set in your configuration
- Verify PgBouncer is in `transaction` mode, not `session` mode

**"too many clients" errors**
- Increase `max_client_conn` in PgBouncer configuration
- Check for connection leaks in your application

**Performance degradation**
- Monitor PgBouncer's `SHOW POOLS` for `cl_waiting` (clients waiting for connections)
- Increase `default_pool_size` if connections are maxed out
- Check PostgreSQL for slow queries that hold connections

### Verified working configuration

Here's a complete, tested configuration for using Prefect with PgBouncer:

```python
# Example: Creating a Prefect database configuration with PgBouncer
from prefect.server.database.configurations import AsyncPostgresConfiguration
from prefect.settings import temporary_settings, PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE

# Use temporary settings to ensure pool_size is None
with temporary_settings({PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE: None}):
config = AsyncPostgresConfiguration(
connection_url="postgresql+asyncpg://user:pass@pgbouncer:6432/prefect",
sqlalchemy_pool_size=None, # Uses NullPool
statement_cache_size=0 # Required for transaction mode
)

# This configuration will:
# - Use NullPool (no connection pooling in SQLAlchemy)
# - Disable prepared statement caching
# - Let PgBouncer handle all connection pooling
```

This configuration has been tested with PgBouncer in transaction mode and correctly handles connection pooling at the PgBouncer level rather than the application level.

## Further reading

- [PostgreSQL documentation on VACUUM](https://www.postgresql.org/docs/current/sql-vacuum.html)
- [PostgreSQL routine maintenance](https://www.postgresql.org/docs/current/routine-vacuuming.html)
- [Monitoring PostgreSQL](https://www.postgresql.org/docs/current/monitoring-stats.html)
- [pg_repack extension](https://github.com/reorg/pg_repack)
- [pg_repack extension](https://github.com/reorg/pg_repack)
- [PgBouncer documentation](https://www.pgbouncer.org/)
- [PgBouncer configuration](https://www.pgbouncer.org/config.html)
4 changes: 2 additions & 2 deletions docs/v3/api-ref/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -951,9 +951,9 @@ Settings for controlling SQLAlchemy connection behavior
**TOML dotted key path**: `server.database.sqlalchemy.connect_args`

### `pool_size`
Controls connection pool size of database connection pools from the Prefect backend.
Controls connection pool size of database connection pools from the Prefect backend. Set to None/null to use NullPool for external connection poolers like PgBouncer.

**Type**: `integer`
**Type**: `integer | None`

**Default**: `5`

Expand Down
13 changes: 10 additions & 3 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -811,14 +811,21 @@
"supported_environment_variables": []
},
"pool_size": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": 5,
"description": "Controls connection pool size of database connection pools from the Prefect backend.",
"description": "Controls connection pool size of database connection pools from the Prefect backend. Set to None/null to use NullPool for external connection poolers like PgBouncer.",
"supported_environment_variables": [
"PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE",
"PREFECT_SQLALCHEMY_POOL_SIZE"
],
"title": "Pool Size",
"type": "integer"
"title": "Pool Size"
},
"pool_recycle": {
"default": 3600,
Expand Down
39 changes: 24 additions & 15 deletions src/prefect/server/database/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ def __init__(
)
self.sqlalchemy_pool_size: Optional[int] = (
sqlalchemy_pool_size
or get_current_settings().server.database.sqlalchemy.pool_size
if sqlalchemy_pool_size is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we ever use this as init kwargs within the codebase? I think it would be weird if passing None as an init kwarg is the one value that isn't respected, but I don't think we actually use these init kwargs anyway

else get_current_settings().server.database.sqlalchemy.pool_size
)
self.sqlalchemy_max_overflow: Optional[int] = (
sqlalchemy_max_overflow
Expand Down Expand Up @@ -233,11 +234,13 @@ async def engine(self) -> AsyncEngine:
self.timeout,
)
if cache_key not in ENGINES:
kwargs: dict[str, Any] = (
get_current_settings().server.database.sqlalchemy.model_dump(
mode="json", exclude={"connect_args"}
)
)
# Get base sqlalchemy settings
sqlalchemy_settings = get_current_settings().server.database.sqlalchemy

# Start with non-pool related settings
kwargs: dict[str, Any] = {
"pool_recycle": sqlalchemy_settings.pool_recycle,
}
connect_args: dict[str, Any] = {}

if self.timeout is not None:
Expand Down Expand Up @@ -285,23 +288,29 @@ async def engine(self) -> AsyncEngine:
if connect_args:
kwargs["connect_args"] = connect_args

if self.sqlalchemy_pool_size is not None:
kwargs["pool_size"] = self.sqlalchemy_pool_size
if self.sqlalchemy_pool_size is None:
# Use NullPool for external connection poolers like PgBouncer
from sqlalchemy.pool import NullPool

if self.sqlalchemy_max_overflow is not None:
kwargs["poolclass"] = NullPool
# NullPool doesn't support pool-related parameters, so don't add them
else:
# Regular pool configuration
kwargs["pool_size"] = self.sqlalchemy_pool_size
kwargs["pool_timeout"] = sqlalchemy_settings.pool_timeout
kwargs["max_overflow"] = self.sqlalchemy_max_overflow

engine = create_async_engine(
self.connection_url,
echo=self.echo,
# "pre-ping" connections upon checkout to ensure they have not been
# closed on the server side
pool_pre_ping=True,
kwargs["pool_pre_ping"] = True
# Use connections in LIFO order to help reduce connections
# after spiky load and in general increase the likelihood
# that a given connection pulled from the pool will be
# usable.
pool_use_lifo=True,
kwargs["pool_use_lifo"] = True

engine = create_async_engine(
self.connection_url,
echo=self.echo,
**kwargs,
)

Expand Down
7 changes: 4 additions & 3 deletions src/prefect/settings/models/server/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,18 @@ class SQLAlchemySettings(PrefectBaseSettings):
"""

model_config: ClassVar[SettingsConfigDict] = build_settings_config(
("server", "database", "sqlalchemy")
("server", "database", "sqlalchemy"),
env_parse_none_str="null", # Allow 'null' string to be parsed as None
)

connect_args: SQLAlchemyConnectArgsSettings = Field(
default_factory=SQLAlchemyConnectArgsSettings,
description="Settings for controlling SQLAlchemy connection behavior",
)

pool_size: int = Field(
pool_size: Optional[int] = Field(
default=5,
description="Controls connection pool size of database connection pools from the Prefect backend.",
description="Controls connection pool size of database connection pools from the Prefect backend. Set to None/null to use NullPool for external connection poolers like PgBouncer.",
validation_alias=AliasChoices(
AliasPath("pool_size"),
"prefect_server_database_sqlalchemy_pool_size",
Expand Down
45 changes: 45 additions & 0 deletions tests/server/database/test_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from uuid import UUID

import pytest
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession

from prefect.server.database import PrefectDBInterface, dependencies
Expand Down Expand Up @@ -209,3 +210,47 @@ async def coroutine_with_injected_db(

assert asyncio.iscoroutinefunction(coroutine_with_injected_db)
assert asyncio.run(coroutine_with_injected_db(42)) is self.db


class TestNullPoolConfiguration:
"""Test NullPool configuration for PgBouncer support."""

async def test_null_pool_size_uses_nullpool(self):
"""Test that setting pool_size to None uses NullPool."""
# Use temporary settings with pool_size=None
from prefect.settings import (
PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE,
temporary_settings,
)

with temporary_settings({PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE: None}):
# Clear any cached engines
from prefect.server.database.configurations import ENGINES

ENGINES.clear()

# Use a unique connection URL to avoid engine caching
import uuid

unique_db = f"test_nullpool_{uuid.uuid4().hex[:8]}"
config = AsyncPostgresConfiguration(
connection_url=f"postgresql+asyncpg://user:pass@localhost:5433/{unique_db}",
)

engine = await config.engine()
# When using NullPool, SQLAlchemy doesn't wrap it in AsyncAdaptedQueuePool
assert isinstance(engine.pool, sa.pool.NullPool)

await engine.dispose()

async def test_integer_pool_size_uses_regular_pool(self):
"""Test that setting pool_size to an integer uses regular pool."""
config = AsyncPostgresConfiguration(
connection_url="postgresql+asyncpg://user:pass@localhost/db",
sqlalchemy_pool_size=5,
)

engine = await config.engine()
assert not isinstance(engine.pool, sa.pool.NullPool)

await engine.dispose()
Loading