Skip to content

Commit 129c287

Browse files
wip: uploads
1 parent a24258e commit 129c287

File tree

5 files changed

+215
-42
lines changed

5 files changed

+215
-42
lines changed

packages/lite-sdk/src/client/storage/BucketStorage.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ export type SavedProgress = {
99
sinceLast: number;
1010
};
1111

12+
export interface LocalState {
13+
lastOpId: bigint;
14+
targetOpId: bigint;
15+
}
16+
1217
export type BucketOperationProgress = Record<string, SavedProgress>;
1318

1419
/**
@@ -54,6 +59,13 @@ export interface BucketStorage {
5459
*/
5560
getBucketStates: () => Promise<Array<BucketState>>;
5661

62+
/**
63+
* Get the current state of the local bucket.
64+
* FIXME maybe a better name?
65+
* @returns The local bucket state, containing bucket name, operation ID, checksums, etc.
66+
*/
67+
getLocalState: () => Promise<LocalState>;
68+
5769
/**
5870
* Synchronize the local database with a checkpoint.
5971
* Validates checksums, applies operations, and updates bucket states.

packages/lite-sdk/src/client/storage/MemoryBucketStorageImpl.ts

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { BucketState, Checkpoint } from '@powersync/service-core';
22
import { Lock } from '../../utils/Lock.js';
33
import { CrudManager } from '../sync/CrudManager.js';
44
import { SystemDependencies } from '../system/SystemDependencies.js';
5-
import type { BucketStorage, SyncDataBatch } from './BucketStorage.js';
5+
import type { BucketStorage, LocalState, SyncDataBatch } from './BucketStorage.js';
66
import type { SyncOperation, SyncOperationsHandler } from './SyncOperationsHandler.js';
77
import { constructKey, toStringOrNull } from './bucketHelpers.js';
88
import { addChecksums, normalizeChecksum, subtractChecksums } from './checksumUtils.js';
@@ -23,6 +23,8 @@ export type MemoryBucketStorageImplOptions = {
2323

2424
export class MemoryBucketStorageImpl implements BucketStorage {
2525
protected ps_buckets: PSBucket[];
26+
// TODO cleanup
27+
protected localBucket: PSBucket;
2628
protected ps_oplog: PSOplog[];
2729
protected ps_updated_rows: PsUpdatedRows[];
2830
protected clientId: string;
@@ -43,6 +45,18 @@ export class MemoryBucketStorageImpl implements BucketStorage {
4345

4446
protected initDefaultState() {
4547
this.ps_buckets = [];
48+
this.localBucket = {
49+
id: 0,
50+
name: '$local',
51+
last_applied_op: 0n,
52+
last_op: 0n,
53+
target_op: 0n,
54+
add_checksum: 0n,
55+
op_checksum: 0n,
56+
pending_delete: false,
57+
count_at_last: 0,
58+
count_since_last: 0
59+
};
4660
this.ps_oplog = [];
4761
this.ps_updated_rows = [];
4862
this.lastSyncedAt = null;
@@ -76,6 +90,15 @@ export class MemoryBucketStorageImpl implements BucketStorage {
7690
});
7791
}
7892

93+
async getLocalState(): Promise<LocalState> {
94+
return await this.lock.runExclusive(async () => {
95+
return {
96+
lastOpId: this.localBucket.last_op,
97+
targetOpId: this.localBucket.target_op
98+
};
99+
});
100+
}
101+
79102
async hasCompletedSync(): Promise<boolean> {
80103
return await this.lock.runExclusive(async () => {
81104
return !!this.ps_buckets.find((b) => b.last_applied_op > 0);
@@ -90,18 +113,9 @@ export class MemoryBucketStorageImpl implements BucketStorage {
90113
if (await this.crudManager?.hasCrud()) {
91114
return;
92115
}
93-
const localBucket = this.ps_buckets.find((b) => b.name === '$local');
94-
if (!localBucket) {
95-
throw new Error('Local bucket not found');
96-
}
97-
localBucket.target_op = normalizeChecksum(writeCheckpoint);
116+
this.localBucket.target_op = normalizeChecksum(writeCheckpoint);
98117
} else {
99-
// Set the target to the max op id
100-
const localBucket = this.ps_buckets.find((b) => b.name === '$local');
101-
if (!localBucket) {
102-
throw new Error('Local bucket not found');
103-
}
104-
localBucket.target_op = normalizeChecksum(MAX_OP_ID);
118+
this.localBucket.target_op = normalizeChecksum(MAX_OP_ID);
105119
}
106120
});
107121
}
@@ -124,14 +138,8 @@ export class MemoryBucketStorageImpl implements BucketStorage {
124138
// SQL: SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = CAST(? as INTEGER)
125139
// TODO: maybe store local state separately
126140
const shouldProceed = await this.lock.runExclusive(async () => {
127-
const localBucket = this.ps_buckets.find((b) => b.name === '$local');
128-
if (!localBucket) {
129-
// Nothing to update
130-
return false;
131-
}
132-
133141
const maxOpIdBigint = BigInt(MAX_OP_ID);
134-
if (localBucket.target_op !== maxOpIdBigint) {
142+
if (this.localBucket.target_op !== maxOpIdBigint) {
135143
// target_op is not MAX_OP_ID, nothing to update
136144
return false;
137145
}
@@ -157,16 +165,10 @@ export class MemoryBucketStorageImpl implements BucketStorage {
157165
// This is typically called after completing uploads which can
158166
// be concurrent.
159167
return await this.lock.runExclusive(async () => {
160-
const localBucket = this.ps_buckets.find((b) => b.name === '$local');
161-
if (!localBucket) {
162-
// Nothing to update
163-
return false;
164-
}
165-
166168
// Update the '$local' bucket's target_op to the new opId
167169
// SQL: UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'
168170
const opIdBigint = normalizeChecksum(opId);
169-
localBucket.target_op = opIdBigint;
171+
this.localBucket.target_op = opIdBigint;
170172
return true;
171173
});
172174
}
@@ -400,10 +402,7 @@ export class MemoryBucketStorageImpl implements BucketStorage {
400402

401403
// Update '$local' bucket if write_checkpoint is provided and it's a complete sync
402404
if (priority == null && checkpoint.write_checkpoint) {
403-
const localBucket = this.ps_buckets.find((b) => b.name === '$local');
404-
if (localBucket) {
405-
localBucket.last_op = normalizeChecksum(checkpoint.write_checkpoint);
406-
}
405+
this.localBucket.last_op = normalizeChecksum(checkpoint.write_checkpoint);
407406
}
408407
});
409408

@@ -452,7 +451,7 @@ export class MemoryBucketStorageImpl implements BucketStorage {
452451
if (priority == null) {
453452
const bucketToCount = new Map(checkpoint.buckets.map((b) => [b.bucket, b.count ?? 0]));
454453
for (const bucket of this.ps_buckets) {
455-
if (bucket.name !== '$local' && bucketToCount.has(bucket.name)) {
454+
if (bucketToCount.has(bucket.name)) {
456455
bucket.count_at_last = bucketToCount.get(bucket.name)!;
457456
bucket.count_since_last = 0;
458457
}
@@ -479,8 +478,7 @@ export class MemoryBucketStorageImpl implements BucketStorage {
479478
if (needsCheck) {
480479
// Check if '$local' bucket has target_op > last_op
481480
// SQL: SELECT 1 FROM ps_buckets WHERE target_op > last_op AND name = '$local'
482-
const localBucket = this.ps_buckets.find((b) => b.name === '$local');
483-
if (localBucket && localBucket.target_op > localBucket.last_op) {
481+
if (this.localBucket.target_op > this.localBucket.last_op) {
484482
return false;
485483
}
486484

packages/lite-sdk/src/client/sync/SyncClient.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { BucketRequest, StreamingSyncLine } from '@powersync/service-core';
1+
import type { BucketRequest, Checkpoint, StreamingSyncLine } from '@powersync/service-core';
22
import { BaseListener, BaseObserverInterface, Disposable } from '../../utils/BaseObserver.js';
33
import type { BucketStorage } from '../storage/BucketStorage.js';
44
import type { SystemDependencies } from '../system/SystemDependencies.js';
@@ -36,11 +36,17 @@ export interface SyncClientListener extends BaseListener {
3636
* Triggers whenever the status' members have changed in value
3737
*/
3838
statusChanged?: ((status: SyncStatus) => void) | undefined;
39+
/**
40+
* Triggers whenever a checkpoint is completed.
41+
* @param checkpoint - The checkpoint that was completed
42+
*/
43+
checkpointCompleted?: ((checkpoint: Checkpoint) => void) | undefined;
3944
}
4045

4146
export type CreateCheckpointResponse = {
4247
targetUpdated: boolean;
4348
targetCheckpoint?: string;
49+
waitForValidation: (signal?: AbortSignal) => Promise<void>;
4450
};
4551

4652
/**
@@ -71,8 +77,21 @@ export interface SyncClient extends BaseObserverInterface<SyncClientListener>, D
7177
* Sets the target write checkpoint.
7278
* @param customCheckpoint - Optional custom checkpoint to set the target to
7379
* defaults to creating a managed PowerSync checkpoint
80+
*
81+
* FIXME
82+
* This is a no-op if there are pending CRUD items.
83+
* This does not answer the generic question "have I synced to this point in time?"
84+
* It answers the question "have I synced to after uploads have been completed?"
7485
*/
7586
checkpoint: (customCheckpoint?: string) => Promise<CreateCheckpointResponse>;
87+
88+
/**
89+
* Triggers a CRUD upload.
90+
* This will perform an upload of the CRUD items if there are any.
91+
* If there are no CRUD items, it will do nothing.
92+
* If there are CRUD items, the {@link CrudManager} will be used to perform the upload.
93+
*/
94+
triggerCrudUpload: () => void;
7695
}
7796

7897
export type StreamOptions = {

packages/lite-sdk/src/client/sync/SyncClientImpl.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { Checkpoint, CheckpointBucket } from '@powersync/service-core';
22
import { BaseObserver } from '../../utils/BaseObserver.js';
33
import type { BucketStorage } from '../storage/BucketStorage.js';
4+
import { normalizeChecksum } from '../storage/checksumUtils.js';
45
import { Connector } from './Connector.js';
56
import { CrudManager } from './CrudManager.js';
67
import type {
@@ -92,10 +93,15 @@ export class SyncClientImpl extends BaseObserver<SyncClientListener> implements
9293
const controller = new AbortController();
9394
this.abortController = controller;
9495

95-
while (!this.abortController.signal.aborted) {
96+
// Don't await this, this will run until the connection is aborted
97+
this.connectInternal(connector, controller.signal);
98+
}
99+
100+
protected async connectInternal(connector: Connector, signal: AbortSignal): Promise<void> {
101+
while (!signal.aborted) {
96102
try {
97103
this.updateSyncStatus({ connecting: true });
98-
await this.syncIteration(connector, controller.signal);
104+
await this.syncIteration(connector, signal);
99105
} catch (error) {
100106
this.updateSyncStatus({
101107
connected: false,
@@ -110,13 +116,16 @@ export class SyncClientImpl extends BaseObserver<SyncClientListener> implements
110116
}
111117
}
112118
}
113-
114119
disconnect() {
115120
this.abortController.abort();
116121
}
117122

118123
async checkpoint(customCheckpoint?: string): Promise<CreateCheckpointResponse> {
119124
let targetCheckpoint: string | undefined = customCheckpoint;
125+
// FIXME, could this be optimized?
126+
// If there are no crud items, we set the target to the custom checkpoint or the max op id
127+
// We then set the local target to the new checkpoint
128+
await this.bucketStorage.handleCrudUploaded(customCheckpoint);
120129
const targetUpdated = await this.bucketStorage.updateLocalTarget(async () => {
121130
if (targetCheckpoint) {
122131
return targetCheckpoint;
@@ -126,11 +135,39 @@ export class SyncClientImpl extends BaseObserver<SyncClientListener> implements
126135
});
127136
return {
128137
targetUpdated,
129-
targetCheckpoint
138+
targetCheckpoint,
139+
waitForValidation: async (signal?: AbortSignal) => {
140+
if (!targetCheckpoint || !targetUpdated) {
141+
// FIXME throw an error in this case?
142+
return;
143+
}
144+
const localBucketState = await this.bucketStorage.getLocalState();
145+
if (localBucketState.lastOpId >= normalizeChecksum(targetCheckpoint)!) {
146+
return;
147+
}
148+
149+
return new Promise((resolve, reject) => {
150+
let disposeListener: (() => void) | null = null;
151+
const signalListener = () => {
152+
reject(new Error('Aborted'));
153+
disposeListener?.();
154+
};
155+
disposeListener = this.registerListener({
156+
checkpointCompleted: (checkpoint) => {
157+
if (checkpoint.write_checkpoint && checkpoint.write_checkpoint >= targetCheckpoint!) {
158+
resolve();
159+
disposeListener?.();
160+
signal?.removeEventListener('abort', signalListener);
161+
}
162+
}
163+
});
164+
signal?.addEventListener('abort', disposeListener, { once: true });
165+
});
166+
}
130167
};
131168
}
132169

133-
protected triggerCrudUpload(): void {
170+
triggerCrudUpload(): void {
134171
if (!this.crudManager) {
135172
return;
136173
}
@@ -422,6 +459,10 @@ export class SyncClientImpl extends BaseObserver<SyncClientListener> implements
422459
lastSyncedAt: new Date()
423460
});
424461

462+
this.iterateListeners((listener) => {
463+
listener.checkpointCompleted?.(checkpoint);
464+
});
465+
425466
return { applied: true, endIteration: false };
426467
}
427468

0 commit comments

Comments
 (0)