Skip to content

Commit c22ac39

Browse files
authored
Merge branch 'main' into profiling-logs
2 parents d5fbef9 + d3808db commit c22ac39

File tree

5 files changed

+46
-17
lines changed

5 files changed

+46
-17
lines changed

.changeset/wicked-wolves-greet.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/node': patch
3+
---
4+
5+
Throw when database is used after being closed.

.github/workflows/build-packages.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,6 @@ jobs:
4343

4444
- name: Build
4545
run: pnpm build:packages
46+
47+
- name: Build (prod)
48+
run: pnpm build:packages:prod

packages/node/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
"async-mutex": "^0.5.0",
7171
"comlink": "^4.4.2",
7272
"undici": "^7.11.0",
73-
"bson": "^6.10.4"
73+
"bson": "^6.10.4"
7474
},
7575
"devDependencies": {
7676
"@powersync/drizzle-driver": "workspace:*",

packages/node/src/db/RemoteConnection.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,49 +11,73 @@ export class RemoteConnection implements LockContext {
1111

1212
private readonly worker: Worker;
1313
private readonly comlink: Remote<AsyncDatabaseOpener>;
14-
readonly database: Remote<AsyncDatabase>;
14+
private readonly database: Remote<AsyncDatabase>;
15+
16+
private readonly notifyWorkerClosed = new AbortController();
1517

1618
constructor(worker: Worker, comlink: Remote<AsyncDatabaseOpener>, database: Remote<AsyncDatabase>) {
1719
this.worker = worker;
1820
this.comlink = comlink;
1921
this.database = database;
22+
23+
this.worker.once('exit', (_) => {
24+
this.notifyWorkerClosed.abort();
25+
});
2026
}
2127

2228
/**
2329
* Runs the inner function, but appends the stack trace where this function was called. This is useful for workers
2430
* because stack traces from worker errors are otherwise unrelated to the application issue that has caused them.
2531
*/
26-
private async recoverTrace<T>(inner: () => Promise<T>): Promise<T> {
32+
private withRemote<T>(inner: () => Promise<T>): Promise<T> {
2733
const trace = {};
2834
Error.captureStackTrace(trace);
35+
const controller = this.notifyWorkerClosed;
2936

30-
try {
31-
return await inner();
32-
} catch (e) {
33-
if (e instanceof Error && e.stack) {
34-
e.stack += (trace as any).stack;
37+
return new Promise((resolve, reject) => {
38+
if (controller.signal.aborted) {
39+
reject(new Error('Called operation on closed remote'));
3540
}
3641

37-
throw e;
38-
}
42+
function handleAbort() {
43+
reject(new Error('Remote peer closed with request in flight'));
44+
}
45+
46+
function completePromise(action: () => void) {
47+
controller!.signal.removeEventListener('abort', handleAbort);
48+
action();
49+
}
50+
51+
controller.signal.addEventListener('abort', handleAbort);
52+
53+
inner()
54+
.then((data) => completePromise(() => resolve(data)))
55+
.catch((e) => {
56+
if (e instanceof Error && e.stack) {
57+
e.stack += (trace as any).stack;
58+
}
59+
60+
return completePromise(() => reject(e));
61+
});
62+
});
3963
}
4064

4165
executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
42-
return this.recoverTrace(async () => {
66+
return this.withRemote(async () => {
4367
const result = await this.database.executeBatch(query, params ?? []);
4468
return RemoteConnection.wrapQueryResult(result);
4569
});
4670
}
4771

4872
execute(query: string, params?: any[] | undefined): Promise<QueryResult> {
49-
return this.recoverTrace(async () => {
73+
return this.withRemote(async () => {
5074
const result = await this.database.execute(query, params ?? []);
5175
return RemoteConnection.wrapQueryResult(result);
5276
});
5377
}
5478

5579
executeRaw(query: string, params?: any[] | undefined): Promise<any[][]> {
56-
return this.recoverTrace(async () => {
80+
return this.withRemote(async () => {
5781
return await this.database.executeRaw(query, params ?? []);
5882
});
5983
}

packages/node/src/db/WorkerConnectionPool.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,7 @@ export class WorkerConnectionPool extends BaseObserver<DBAdapterListener> implem
213213
try {
214214
return await fn(this.writeConnection);
215215
} finally {
216-
const serializedUpdates = await this.writeConnection.database.executeRaw(
217-
"SELECT powersync_update_hooks('get');",
218-
[]
219-
);
216+
const serializedUpdates = await this.writeConnection.executeRaw("SELECT powersync_update_hooks('get');", []);
220217
const updates = JSON.parse(serializedUpdates[0][0] as string) as string[];
221218

222219
if (updates.length > 0) {

0 commit comments

Comments
 (0)