Skip to content

Commit ff11c5a

Browse files
committed
Introducing compilableQueryWatch() util to common, using in Drizzle.
1 parent 42a6294 commit ff11c5a

File tree

3 files changed

+65
-48
lines changed

3 files changed

+65
-48
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { CompilableQuery } from './../types/types.js';
2+
import { AbstractPowerSyncDatabase, SQLWatchOptions } from './AbstractPowerSyncDatabase.js';
3+
import { runOnSchemaChange } from './runOnSchemaChange.js';
4+
5+
export interface CompilableQueryWatchHandler<T> {
6+
onResult: (results: T[]) => void;
7+
onError?: (error: Error) => void;
8+
}
9+
10+
export function compilableQueryWatch<T>(
11+
db: AbstractPowerSyncDatabase,
12+
query: CompilableQuery<T>,
13+
handler: CompilableQueryWatchHandler<T>,
14+
options?: SQLWatchOptions
15+
): void {
16+
const { onResult, onError = (e: Error) => {} } = handler ?? {};
17+
if (!onResult) {
18+
throw new Error('onResult is required');
19+
}
20+
21+
const watchQuery = async (abortSignal: AbortSignal) => {
22+
try {
23+
const toSql = query.compile();
24+
const resolvedTables = await db.resolveTables(toSql.sql, toSql.parameters as [], options);
25+
26+
// Fetch initial data
27+
const result = await query.execute();
28+
onResult(result);
29+
30+
db.onChangeWithCallback(
31+
{
32+
onChange: async () => {
33+
try {
34+
const result = await query.execute();
35+
onResult(result);
36+
} catch (error: any) {
37+
onError(error);
38+
}
39+
},
40+
onError
41+
},
42+
{
43+
...(options ?? {}),
44+
tables: resolvedTables,
45+
// Override the abort signal since we intercept it
46+
signal: abortSignal
47+
}
48+
);
49+
} catch (error: any) {
50+
onError(error);
51+
}
52+
};
53+
54+
runOnSchemaChange(watchQuery, db, options);
55+
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export * from './client/connection/PowerSyncBackendConnector.js';
55
export * from './client/connection/PowerSyncCredentials.js';
66
export * from './client/sync/bucket/BucketStorageAdapter.js';
77
export { runOnSchemaChange } from './client/runOnSchemaChange.js';
8+
export { CompilableQueryWatchHandler, compilableQueryWatch } from './client/compilableQueryWatch.js';
89
export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js';
910
export * from './client/sync/bucket/SqliteBucketStorage.js';
1011
export * from './client/sync/bucket/CrudBatch.js';
Lines changed: 9 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import {
22
AbstractPowerSyncDatabase,
3+
compilableQueryWatch,
4+
CompilableQueryWatchHandler,
35
QueryResult,
4-
runOnSchemaChange,
5-
SQLWatchOptions,
6-
WatchHandler
6+
SQLWatchOptions
77
} from '@powersync/common';
88
import { Query } from 'drizzle-orm';
99
import { DefaultLogger } from 'drizzle-orm/logger';
@@ -18,9 +18,10 @@ import { SQLiteTransaction } from 'drizzle-orm/sqlite-core';
1818
import { BaseSQLiteDatabase } from 'drizzle-orm/sqlite-core/db';
1919
import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
2020
import type { DrizzleConfig } from 'drizzle-orm/utils';
21+
import { toCompilableQuery } from './../utils/compilableQuery';
2122
import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqlite-session';
2223

23-
type WatchQuery = { toSQL(): Query; execute(): Promise<any> };
24+
type WatchQuery<T> = { toSQL(): Query; execute(): Promise<T> };
2425

2526
export interface PowerSyncSQLiteDatabase<TSchema extends Record<string, unknown> = Record<string, never>>
2627
extends BaseSQLiteDatabase<'async', QueryResult, TSchema> {
@@ -31,7 +32,7 @@ export interface PowerSyncSQLiteDatabase<TSchema extends Record<string, unknown>
3132
config?: PowerSyncSQLiteTransactionConfig
3233
): Promise<T>;
3334

34-
watch(query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions): void;
35+
watch<T>(query: WatchQuery<T>, handler?: CompilableQueryWatchHandler<T>, options?: SQLWatchOptions): void;
3536
}
3637

3738
export function wrapPowerSyncWithDrizzle<TSchema extends Record<string, unknown> = Record<string, never>>(
@@ -60,50 +61,10 @@ export function wrapPowerSyncWithDrizzle<TSchema extends Record<string, unknown>
6061
logger
6162
});
6263

63-
const watch = (query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions): void => {
64-
const { onResult, onError = (e: Error) => {} } = handler ?? {};
65-
if (!onResult) {
66-
throw new Error('onResult is required');
67-
}
68-
69-
const watchQuery = async (abortSignal: AbortSignal) => {
70-
try {
71-
const toSql = query.toSQL();
72-
const resolvedTables = await db.resolveTables(toSql.sql, toSql.params, options);
73-
74-
// Fetch initial data
75-
const result = await query.execute();
76-
onResult(result);
77-
78-
db.onChangeWithCallback(
79-
{
80-
onChange: async () => {
81-
try {
82-
const result = await query.execute();
83-
onResult(result);
84-
} catch (error: any) {
85-
onError(error);
86-
}
87-
},
88-
onError
89-
},
90-
{
91-
...(options ?? {}),
92-
tables: resolvedTables,
93-
// Override the abort signal since we intercept it
94-
signal: abortSignal
95-
}
96-
);
97-
} catch (error: any) {
98-
onError(error);
99-
}
100-
};
101-
102-
runOnSchemaChange(watchQuery, db, options);
103-
};
104-
10564
const baseDatabase = new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase<TSchema>;
10665
return Object.assign(baseDatabase, {
107-
watch: (query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions) => watch(query, handler, options)
66+
watch: <T>(query: WatchQuery<T>, handler: CompilableQueryWatchHandler<T>, options?: SQLWatchOptions) => {
67+
compilableQueryWatch(db, toCompilableQuery(query), handler, options);
68+
}
10869
});
10970
}

0 commit comments

Comments
 (0)