Skip to content

Commit 344d4d4

Browse files
authored
feat: Add pg-cluster package (#240)
1 parent f3b6d4d commit 344d4d4

File tree

16 files changed

+538
-3
lines changed

16 files changed

+538
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Package Name | Version | Docs
4545
@databases/mysql-typed | [![NPM version](https://img.shields.io/npm/v/@databases/mysql-typed?style=for-the-badge)](https://www.npmjs.com/package/@databases/mysql-typed) | [https://www.atdatabases.org/docs/mysql-typed](https://www.atdatabases.org/docs/mysql-typed)
4646
@databases/pg | [![NPM version](https://img.shields.io/npm/v/@databases/pg?style=for-the-badge)](https://www.npmjs.com/package/@databases/pg) | [https://www.atdatabases.org/docs/pg](https://www.atdatabases.org/docs/pg)
4747
@databases/pg-bulk | [![NPM version](https://img.shields.io/npm/v/@databases/pg-bulk?style=for-the-badge)](https://www.npmjs.com/package/@databases/pg-bulk) | [https://www.atdatabases.org/docs/pg-bulk](https://www.atdatabases.org/docs/pg-bulk)
48+
@databases/pg-cluster | [![NPM version](https://img.shields.io/npm/v/@databases/pg-cluster?style=for-the-badge)](https://www.npmjs.com/package/@databases/pg-cluster) | [https://www.atdatabases.org/docs/pg-cluster](https://www.atdatabases.org/docs/pg-cluster)
4849
@databases/pg-migrations | [![NPM version](https://img.shields.io/npm/v/@databases/pg-migrations?style=for-the-badge)](https://www.npmjs.com/package/@databases/pg-migrations) | [https://www.atdatabases.org/docs/pg-migrations](https://www.atdatabases.org/docs/pg-migrations)
4950
@databases/pg-test | [![NPM version](https://img.shields.io/npm/v/@databases/pg-test?style=for-the-badge)](https://www.npmjs.com/package/@databases/pg-test) | [https://www.atdatabases.org/docs/pg-test](https://www.atdatabases.org/docs/pg-test)
5051
@databases/pg-typed | [![NPM version](https://img.shields.io/npm/v/@databases/pg-typed?style=for-the-badge)](https://www.npmjs.com/package/@databases/pg-typed) | [https://www.atdatabases.org/docs/pg-typed](https://www.atdatabases.org/docs/pg-typed)

docs/pg-cluster.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
---
2+
id: pg-cluster
3+
title: Postgres Node.js Cluster Connection
4+
sidebar_label: Cluster
5+
---
6+
7+
The `Cluster` object represents group of physical connections or connection pools to the underlying database in a primary & replicas setup.
8+
9+
### `Cluster.query(SQLQuery): Promise<any[]>`
10+
11+
Run an SQL Query and get a promise for an array of results. If your query contains multiple statements, only the results of the final statement are returned.
12+
13+
Write queries are executed in the primary connection, and read-only queries are executed in the replica connections.
14+
15+
```ts
16+
// query executed on a secondary connection
17+
const result = await cluster.query(sql`SELECT 1 + 1 AS a`);
18+
result[0].a;
19+
// => 2
20+
21+
// query executed on the primary connection
22+
await cluster.query(sql`UPDATE users SET active = true`);
23+
```
24+
25+
### `Cluster.query(SQLQuery[]): Promise<any[]>`
26+
27+
If you pass an array of SQLQueries, you will get an array in response where each element of the array is the results of one of the queries.
28+
29+
If there is at least one write query in the argument list, then the queries are executed in the primary connection, otherwise in the replica connections.
30+
31+
```ts
32+
// query executed on a secondary connection
33+
const [resultA, resultB] = await cluster.query([
34+
sql`SELECT 1 + 1 AS a`,
35+
sql`SELECT 1 + 1 AS b`,
36+
]);
37+
resultA[0].a + resultB[0].b;
38+
// => 4
39+
```
40+
41+
### `Cluster.queryStream(SQLQuery): AsyncIterable<any>`
42+
43+
Run an SQL Query and get an async iterable of the results. e.g.
44+
45+
```js
46+
for await (const record of cluster.queryStream(
47+
sql`SELECT * FROM massive_table`,
48+
)) {
49+
console.log(result);
50+
}
51+
```
52+
53+
Write queries are executed in the primary connection, and read-only queries are executed in the replica connections.
54+
55+
### `Cluster.queryNodeStream(SQLQuery): ReadableStream`
56+
57+
Run an SQL Query and get a node.js readable stream of the results. e.g.
58+
59+
```js
60+
const Stringifier = require('newline-json').Stringifier;
61+
62+
cluster
63+
.queryNodeStream(sql`SELECT * FROM massive_table`)
64+
.pipe(new Stringifier())
65+
.pipe(process.stdout);
66+
```
67+
68+
Write queries are executed in the primary connection, and read-only queries are executed in the replica connections.
69+
70+
### `Cluster.tx<T>(fn: (tx: Transaction) => Promise<T>, options?): Promise<T>`
71+
72+
Executes the callback `fn` within a transaction on the primary connection or replica connections (depending on `options.readOnly`).
73+
74+
A transaction wraps a regular task with 3 additional queries:
75+
76+
1. it executes `BEGIN` just before invoking the callback function
77+
2. it executes `ROLLBACK`, if the callback throws an error or returns a rejected promise
78+
3. it executes `COMMIT`, if the callback does throw any error and does not return a rejected promise
79+
80+
```ts
81+
// transaction executed on a replica connection
82+
const result = await cluster.tx(
83+
async (tx) => {
84+
const resultA = await tx.query(sql`SELECT 1 + 1 AS a`);
85+
const resultB = await tx.query(sql`SELECT 1 + 1 AS b`);
86+
return resultA[0].a + resultB[0].b;
87+
},
88+
{readOnly: true},
89+
);
90+
91+
// transaction executed on the primary connection
92+
const resultPrimary = await cluster.tx(async (tx) => {
93+
const resultA = await tx.query(sql`SELECT 1 + 1 AS a`);
94+
const resultB = await tx.query(sql`SELECT 1 + 1 AS b`);
95+
return resultA[0].a + resultB[0].b;
96+
});
97+
// => 4
98+
```
99+
100+
### `Cluster.task(fn): Promise<T>`
101+
102+
This method exists to mimic the API in `ConnectionPool.task`. It executes the `task` method on the primary connection.

docs/pg-connection-pool.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ A ConnectionPool represents a set of automatically managed physical connections
1010

1111
Acquires a connection from the pool. If the pool is 'full' and all connections are currently checked out, this will wait in a FIFO queue until a connection becomes available by it being released back to the pool.
1212

13-
Once a connetion has been acquired, `fn` is called with that connection.
13+
Once a connection has been acquired, `fn` is called with that connection.
1414

1515
When `fn` returns, the connection is returned to the pool.
1616

docs/pg-guide-connections.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,25 @@ process.once('SIGTERM', () => {
8181
});
8282
```
8383

84+
## Cluster connections
85+
86+
In a highly available Postgres cluster with a primary node & multiple replicas nodes (e.g. AWS RDS Aurora), write queries should be sent to the primary node, while read queries that do not require strong consistency can be distributed equally to the read replicas nodes.
87+
88+
To connect to such a cluster you can use the `@databases/pg-cluster` package like this:
89+
90+
```typescript
91+
// database.ts
92+
93+
import createConnectionPool from '@databases/pg';
94+
import createCluster from '@databases/pg-cluster';
95+
96+
const primary = createConnectionPool(process.env.MY_CUSTOM_PRIMARY_ENV_VAR);
97+
const replicas = [createConnectionPool(process.env.MY_CUSTOM_REPLICA_ENV_VAR)];
98+
99+
const db = createCluster(primary, replicas);
100+
export default db;
101+
```
102+
84103
## Connecting to/from cloud providers
85104

86105
You can normally follow the instructions from the cloud providers, but we have prepared the following guides to make things easier for these common platforms:

docs/pg-queryable.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ title: Postgres Queryable
44
sidebar_label: Queryable
55
---
66

7-
There are three types of `Queryable` in postgres:
7+
There are four types of `Queryable` in postgres:
88

9+
- `Cluster` - represents a group of primary and replica connections to a database cluster
910
- `ConnectionPool` - represents a set of automatically managed connections to the database
1011
- `Connection` - represents a single physical connection to the database
1112
- `Transaction` - represents a transaction (or nested transaction) on a single physical connection to the database
1213

13-
All three share a common API, allowing you to write methods that can be used both inside and outside a transaction. e.g.
14+
All four share a common API, allowing you to write methods that can be used both inside and outside a transaction. e.g.
1415

1516
```ts
1617
import {Queryable} from '@databases/pg';

docs/pg-typed.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,34 @@ module.exports = {users, posts};
4444

4545
## Table
4646

47+
### initializer
48+
49+
The objects returned by the `tables` function are initialized with an optional argument represeting the connection(s) for the queries.
50+
51+
The initializer argument is a single `Queryable` (i.e. `ConnectionPool`, `Connection`, `Transaction` or `Cluster`).
52+
53+
```typescript
54+
import db, {users} from './database';
55+
56+
export async function initialize() {
57+
// These 2 queries are not run in a transaction
58+
await users(db).find().all();
59+
await users(db).insert({
60+
61+
favorite_color: `blue`,
62+
});
63+
64+
await db.tx(async db => {
65+
// These 2 queries are run in the same transaction
66+
await users(db).find().all();
67+
await users(db).insert({
68+
69+
favorite_color: `blue`,
70+
});
71+
});
72+
}
73+
```
74+
4775
### insert(...records)
4876

4977
Inserts records into the database table. If you pass multiple records to `insert`, they will all be added "atomically", i.e. either all of the records will be added, or none of them will be added.

docs/sidebars.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
"ids": [
2828
"pg-options",
2929
"pg-queryable",
30+
"pg-cluster",
3031
"pg-connection-pool",
3132
"pg-connection",
3233
"pg-transaction",

packages/pg-cluster/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# @databases/pg-cluster
2+
3+
For documentation, see https://www.atdatabases.org/docs/pg-cluster

packages/pg-cluster/package.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "@databases/pg-cluster",
3+
"version": "0.0.0",
4+
"description": "",
5+
"main": "./lib/index.js",
6+
"types": "./lib/index.d.ts",
7+
"dependencies": {},
8+
"devDependencies": {
9+
"@databases/pg": "^0.0.0"
10+
},
11+
"peerDependencies": {
12+
"@databases/pg": "*"
13+
},
14+
"scripts": {},
15+
"repository": "https://github.com/ForbesLindesay/atdatabases/tree/master/packages/pg-cluster",
16+
"bugs": "https://github.com/ForbesLindesay/atdatabases/issues",
17+
"license": "MIT",
18+
"publishConfig": {
19+
"access": "public"
20+
},
21+
"files": [
22+
"lib/"
23+
],
24+
"homepage": "https://www.atdatabases.org/docs/pg-cluster"
25+
}

packages/pg-cluster/src/Cluster.ts

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import {Readable} from 'stream';
2+
import {
3+
Connection,
4+
pgFormat,
5+
Queryable,
6+
QueryableType,
7+
Transaction,
8+
sql,
9+
SQLQuery,
10+
} from '@databases/pg';
11+
import AbortSignal from '@databases/pg/lib/types/AbortSignal';
12+
import TransactionOptions from '@databases/pg/lib/types/TransactionOptions';
13+
14+
export default class Cluster implements Queryable {
15+
public readonly type: QueryableType = QueryableType.Cluster;
16+
public readonly sql = sql;
17+
18+
private readonly _primary: Queryable;
19+
private readonly _replicas: Queryable[];
20+
21+
constructor(primary: Queryable, replicas: Queryable[]) {
22+
if (!replicas.length) {
23+
throw new Error(
24+
'You must provide at least one replica when using pg-cluster',
25+
);
26+
}
27+
this._primary = primary;
28+
this._replicas = replicas;
29+
}
30+
31+
private _getReplica(): Queryable {
32+
return this._replicas[Math.floor(Math.random() * this._replicas.length)];
33+
}
34+
35+
async query(query: SQLQuery): Promise<any[]>;
36+
async query(query: SQLQuery[]): Promise<any[][]>;
37+
async query(query: SQLQuery | SQLQuery[]): Promise<any[]> {
38+
if (Array.isArray(query)) {
39+
if (query.length === 0) {
40+
return [];
41+
}
42+
const hasWriteableQueries = query.some(isQueryWriteable);
43+
if (hasWriteableQueries) {
44+
return this._primary.query(query);
45+
} else {
46+
return this._getReplica().query(query);
47+
}
48+
} else {
49+
if (isQueryWriteable(query)) {
50+
return this._primary.query(query);
51+
} else {
52+
return this._getReplica().query(query);
53+
}
54+
}
55+
}
56+
57+
queryStream(
58+
query: SQLQuery,
59+
options: {batchSize?: number | undefined; signal?: AbortSignal | undefined},
60+
): AsyncIterable<any> {
61+
if (isQueryWriteable(query)) {
62+
return this._primary.queryStream(query, options);
63+
} else {
64+
return this._getReplica().queryStream(query, options);
65+
}
66+
}
67+
68+
queryNodeStream(
69+
query: SQLQuery,
70+
options?: {
71+
highWaterMark?: number | undefined;
72+
batchSize?: number | undefined;
73+
},
74+
): Readable {
75+
if (isQueryWriteable(query)) {
76+
return this._primary.queryNodeStream(query, options);
77+
} else {
78+
return this._getReplica().queryNodeStream(query, options);
79+
}
80+
}
81+
82+
async task<T>(
83+
fn: (connection: Connection | Transaction) => Promise<T>,
84+
): Promise<T> {
85+
return this._primary.task(fn);
86+
}
87+
88+
async tx<T>(
89+
fn: (connection: Transaction) => Promise<T>,
90+
options?: TransactionOptions,
91+
): Promise<T> {
92+
if (options?.readOnly) {
93+
return this._getReplica().tx(fn, options);
94+
} else {
95+
return this._primary.tx(fn, options);
96+
}
97+
}
98+
99+
async addPostCommitStep(fn: () => Promise<void>): Promise<void> {
100+
await fn();
101+
}
102+
}
103+
104+
const WRITEABLE_REGEX =
105+
/\b(alter|create|delete|drop|insert|truncate|update|vacuum)\b/i;
106+
107+
function isQueryWriteable(query: SQLQuery): boolean {
108+
const formatted = query.format(pgFormat);
109+
return WRITEABLE_REGEX.test(formatted.text);
110+
}

0 commit comments

Comments
 (0)