|
| 1 | +import * as SQLite from '@journeyapps/wa-sqlite'; |
| 2 | +import { BaseObserver, BatchedUpdateNotification } from '@powersync/common'; |
| 3 | +import { Mutex } from 'async-mutex'; |
| 4 | +import { OnTableChangeCallback, WASQLExecuteResult } from '../../../shared/types'; |
| 5 | + |
| 6 | +/** |
| 7 | + * List of currently tested virtual filesystems |
| 8 | + */ |
| 9 | +export enum WASQLiteVFS { |
| 10 | + IDBBatchAtomicVFS = 'IDBBatchAtomicVFS', |
| 11 | + OPFSCoopSyncVFS = 'OPFSCoopSyncVFS', |
| 12 | + AccessHandlePoolVFS = 'AccessHandlePoolVFS' |
| 13 | +} |
| 14 | + |
| 15 | +export type WASQLiteConnectionListener = { |
| 16 | + tablesUpdated: (event: BatchedUpdateNotification) => void; |
| 17 | +}; |
| 18 | + |
| 19 | +// FIXME there are no types for Module |
| 20 | +export type SQLiteModule = Parameters<typeof SQLite.Factory>[0]; |
| 21 | +export type WASQLiteModuleFactoryOptions = { dbFileName: string }; |
| 22 | + |
| 23 | +export type WASQLiteModuleFactory = ( |
| 24 | + options: WASQLiteModuleFactoryOptions |
| 25 | +) => Promise<{ module: SQLiteModule; vfs: SQLiteVFS }>; |
| 26 | + |
| 27 | +export type WASQLiteOpenOptions = { |
| 28 | + dbFileName: string; |
| 29 | + vfs?: WASQLiteVFS; |
| 30 | +}; |
| 31 | + |
| 32 | +export const AsyncWASQLiteModuleFactory = async () => { |
| 33 | + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs'); |
| 34 | + return factory(); |
| 35 | +}; |
| 36 | + |
| 37 | +export const SyncWASQLiteModuleFactory = async () => { |
| 38 | + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite.mjs'); |
| 39 | + return factory(); |
| 40 | +}; |
| 41 | + |
| 42 | +export const DEFAULT_MODULE_FACTORIES = { |
| 43 | + [WASQLiteVFS.IDBBatchAtomicVFS]: async (options: WASQLiteModuleFactoryOptions) => { |
| 44 | + const module = await AsyncWASQLiteModuleFactory(); |
| 45 | + const { IDBBatchAtomicVFS } = await import('@journeyapps/wa-sqlite/src/examples/IDBBatchAtomicVFS.js'); |
| 46 | + return { |
| 47 | + module, |
| 48 | + // @ts-expect-error The types for this static method are missing upstream |
| 49 | + vfs: await IDBBatchAtomicVFS.create(options.dbFileName, module, { lockPolicy: 'exclusive' }) |
| 50 | + }; |
| 51 | + }, |
| 52 | + [WASQLiteVFS.AccessHandlePoolVFS]: async (options: WASQLiteModuleFactoryOptions) => { |
| 53 | + const module = await SyncWASQLiteModuleFactory(); |
| 54 | + // @ts-expect-error The types for this static method are missing upstream |
| 55 | + const { AccessHandlePoolVFS } = await import('@journeyapps/wa-sqlite/src/examples/AccessHandlePoolVFS.js'); |
| 56 | + return { |
| 57 | + module, |
| 58 | + vfs: await AccessHandlePoolVFS.create(options.dbFileName, module) |
| 59 | + }; |
| 60 | + }, |
| 61 | + [WASQLiteVFS.OPFSCoopSyncVFS]: async (options: WASQLiteModuleFactoryOptions) => { |
| 62 | + const module = await SyncWASQLiteModuleFactory(); |
| 63 | + // @ts-expect-error The types for this static method are missing upstream |
| 64 | + const { OPFSCoopSyncVFS } = await import('@journeyapps/wa-sqlite/src/examples/OPFSCoopSyncVFS.js'); |
| 65 | + return { |
| 66 | + module, |
| 67 | + vfs: await OPFSCoopSyncVFS.create(options.dbFileName, module) |
| 68 | + }; |
| 69 | + } |
| 70 | +}; |
| 71 | + |
| 72 | +export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener> { |
| 73 | + private _sqliteAPI: SQLiteAPI | null = null; |
| 74 | + private _dbP: number | null = null; |
| 75 | + private _moduleFactory: WASQLiteModuleFactory; |
| 76 | + |
| 77 | + protected updatedTables: Set<string>; |
| 78 | + protected updateTimer: ReturnType<typeof setTimeout> | null; |
| 79 | + protected statementMutex: Mutex; |
| 80 | + |
| 81 | + constructor(protected options: WASQLiteOpenOptions) { |
| 82 | + super(); |
| 83 | + this.updatedTables = new Set(); |
| 84 | + this.updateTimer = null; |
| 85 | + this.statementMutex = new Mutex(); |
| 86 | + this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs ?? WASQLiteVFS.IDBBatchAtomicVFS]; |
| 87 | + } |
| 88 | + |
| 89 | + protected get sqliteAPI() { |
| 90 | + if (!this._sqliteAPI) { |
| 91 | + throw new Error(`Initialization has not completed`); |
| 92 | + } |
| 93 | + return this._sqliteAPI; |
| 94 | + } |
| 95 | + |
| 96 | + protected get dbP() { |
| 97 | + if (!this._dbP) { |
| 98 | + throw new Error(`Initialization has not completed`); |
| 99 | + } |
| 100 | + return this._dbP; |
| 101 | + } |
| 102 | + |
| 103 | + protected async openDB() { |
| 104 | + this._dbP = await this.sqliteAPI.open_v2(this.options.dbFileName); |
| 105 | + return this._dbP; |
| 106 | + } |
| 107 | + |
| 108 | + protected async openSQLiteAPI(): Promise<SQLiteAPI> { |
| 109 | + const { module, vfs } = await this._moduleFactory({ dbFileName: this.options.dbFileName }); |
| 110 | + const sqlite3 = SQLite.Factory(module); |
| 111 | + sqlite3.vfs_register(vfs, true); |
| 112 | + /** |
| 113 | + * Register the PowerSync core SQLite extension |
| 114 | + */ |
| 115 | + module.ccall('powersync_init_static', 'int', []); |
| 116 | + |
| 117 | + return sqlite3; |
| 118 | + } |
| 119 | + |
| 120 | + async init() { |
| 121 | + this._sqliteAPI = await this.openSQLiteAPI(); |
| 122 | + await this.openDB(); |
| 123 | + |
| 124 | + this.sqliteAPI.update_hook(this.dbP, (updateType: number, dbName: string | null, tableName: string | null) => { |
| 125 | + if (!tableName) { |
| 126 | + return; |
| 127 | + } |
| 128 | + this.updatedTables.add(tableName); |
| 129 | + if (this.updateTimer == null) { |
| 130 | + this.updateTimer = setTimeout(() => this.fireUpdates(), 0); |
| 131 | + } |
| 132 | + }); |
| 133 | + } |
| 134 | + |
| 135 | + fireUpdates() { |
| 136 | + this.updateTimer = null; |
| 137 | + const event: BatchedUpdateNotification = { tables: [...this.updatedTables], groupedUpdates: {}, rawUpdates: [] }; |
| 138 | + this.updatedTables.clear(); |
| 139 | + this.iterateListeners((cb) => cb.tablesUpdated?.(event)); |
| 140 | + } |
| 141 | + |
| 142 | + /** |
| 143 | + * This executes SQL statements in a batch. |
| 144 | + */ |
| 145 | + async executeBatch(sql: string, bindings?: any[][]): Promise<WASQLExecuteResult> { |
| 146 | + return this.acquireExecuteLock(async (): Promise<WASQLExecuteResult> => { |
| 147 | + let affectedRows = 0; |
| 148 | + |
| 149 | + try { |
| 150 | + await this.executeSingleStatement('BEGIN TRANSACTION'); |
| 151 | + |
| 152 | + const wrappedBindings = bindings ? bindings : []; |
| 153 | + for await (const stmt of this.sqliteAPI.statements(this.dbP, sql)) { |
| 154 | + if (stmt === null) { |
| 155 | + return { |
| 156 | + rowsAffected: 0, |
| 157 | + rows: { _array: [], length: 0 } |
| 158 | + }; |
| 159 | + } |
| 160 | + |
| 161 | + //Prepare statement once |
| 162 | + for (const binding of wrappedBindings) { |
| 163 | + // TODO not sure why this is needed currently, but booleans break |
| 164 | + for (let i = 0; i < binding.length; i++) { |
| 165 | + const b = binding[i]; |
| 166 | + if (typeof b == 'boolean') { |
| 167 | + binding[i] = b ? 1 : 0; |
| 168 | + } |
| 169 | + } |
| 170 | + |
| 171 | + if (bindings) { |
| 172 | + this.sqliteAPI.bind_collection(stmt, binding); |
| 173 | + } |
| 174 | + const result = await this.sqliteAPI.step(stmt); |
| 175 | + if (result === SQLite.SQLITE_DONE) { |
| 176 | + //The value returned by sqlite3_changes() immediately after an INSERT, UPDATE or DELETE statement run on a view is always zero. |
| 177 | + affectedRows += this.sqliteAPI.changes(this.dbP); |
| 178 | + } |
| 179 | + |
| 180 | + this.sqliteAPI.reset(stmt); |
| 181 | + } |
| 182 | + } |
| 183 | + |
| 184 | + await this.executeSingleStatement('COMMIT'); |
| 185 | + } catch (err) { |
| 186 | + await this.executeSingleStatement('ROLLBACK'); |
| 187 | + return { |
| 188 | + rowsAffected: 0, |
| 189 | + rows: { _array: [], length: 0 } |
| 190 | + }; |
| 191 | + } |
| 192 | + const result = { |
| 193 | + rowsAffected: affectedRows, |
| 194 | + rows: { _array: [], length: 0 } |
| 195 | + }; |
| 196 | + |
| 197 | + return result; |
| 198 | + }); |
| 199 | + } |
| 200 | + |
| 201 | + /** |
| 202 | + * This executes single SQL statements inside a requested lock. |
| 203 | + */ |
| 204 | + async execute(sql: string | TemplateStringsArray, bindings?: any[]): Promise<WASQLExecuteResult> { |
| 205 | + // Running multiple statements on the same connection concurrently should not be allowed |
| 206 | + return this.acquireExecuteLock(async () => { |
| 207 | + return this.executeSingleStatement(sql, bindings); |
| 208 | + }); |
| 209 | + } |
| 210 | + |
| 211 | + async close() { |
| 212 | + return this.sqliteAPI.close(this.dbP); |
| 213 | + } |
| 214 | + |
| 215 | + registerOnTableChange(callback: OnTableChangeCallback) { |
| 216 | + return this.registerListener({ |
| 217 | + tablesUpdated: (event) => callback(event) |
| 218 | + }); |
| 219 | + } |
| 220 | + |
| 221 | + /** |
| 222 | + * This requests a lock for executing statements. |
| 223 | + * Should only be used internally. |
| 224 | + */ |
| 225 | + protected acquireExecuteLock = <T>(callback: () => Promise<T>): Promise<T> => { |
| 226 | + return this.statementMutex.runExclusive(callback); |
| 227 | + }; |
| 228 | + |
| 229 | + /** |
| 230 | + * This executes a single statement using SQLite3. |
| 231 | + */ |
| 232 | + protected async executeSingleStatement( |
| 233 | + sql: string | TemplateStringsArray, |
| 234 | + bindings?: any[] |
| 235 | + ): Promise<WASQLExecuteResult> { |
| 236 | + const results = []; |
| 237 | + for await (const stmt of this.sqliteAPI.statements(this.dbP, sql as string)) { |
| 238 | + let columns; |
| 239 | + const wrappedBindings = bindings ? [bindings] : [[]]; |
| 240 | + for (const binding of wrappedBindings) { |
| 241 | + // TODO not sure why this is needed currently, but booleans break |
| 242 | + binding.forEach((b, index, arr) => { |
| 243 | + if (typeof b == 'boolean') { |
| 244 | + arr[index] = b ? 1 : 0; |
| 245 | + } |
| 246 | + }); |
| 247 | + |
| 248 | + this.sqliteAPI.reset(stmt); |
| 249 | + if (bindings) { |
| 250 | + this.sqliteAPI.bind_collection(stmt, binding); |
| 251 | + } |
| 252 | + |
| 253 | + const rows = []; |
| 254 | + while ((await this.sqliteAPI.step(stmt)) === SQLite.SQLITE_ROW) { |
| 255 | + const row = this.sqliteAPI.row(stmt); |
| 256 | + rows.push(row); |
| 257 | + } |
| 258 | + |
| 259 | + columns = columns ?? this.sqliteAPI.column_names(stmt); |
| 260 | + if (columns.length) { |
| 261 | + results.push({ columns, rows }); |
| 262 | + } |
| 263 | + } |
| 264 | + |
| 265 | + // When binding parameters, only a single statement is executed. |
| 266 | + if (bindings) { |
| 267 | + break; |
| 268 | + } |
| 269 | + } |
| 270 | + |
| 271 | + const rows: Record<string, any>[] = []; |
| 272 | + for (const resultSet of results) { |
| 273 | + for (const row of resultSet.rows) { |
| 274 | + const outRow: Record<string, any> = {}; |
| 275 | + resultSet.columns.forEach((key, index) => { |
| 276 | + outRow[key] = row[index]; |
| 277 | + }); |
| 278 | + rows.push(outRow); |
| 279 | + } |
| 280 | + } |
| 281 | + |
| 282 | + const result = { |
| 283 | + insertId: this.sqliteAPI.last_insert_id(this.dbP), |
| 284 | + rowsAffected: this.sqliteAPI.changes(this.dbP), |
| 285 | + rows: { |
| 286 | + _array: rows, |
| 287 | + length: rows.length |
| 288 | + } |
| 289 | + }; |
| 290 | + |
| 291 | + return result; |
| 292 | + } |
| 293 | +} |
0 commit comments