Skip to content

Commit e413d8a

Browse files
committed
Track state of rows in TX to be able to delete rows on a move out
1 parent b5103ff commit e413d8a

File tree

1 file changed

+90
-22
lines changed

1 file changed

+90
-22
lines changed

packages/electric-db-collection/src/electric.ts

Lines changed: 90 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ function createElectricSync<T extends Row<unknown>>(
883883
}
884884

885885
rowTagSet.add(tag)
886-
addTagToIndex(parsedTag, rowId, tagIndex, tagLength)
886+
addTagToIndex(parsedTag, rowId, tagIndex, tagLength)
887887
}
888888
}
889889

@@ -1015,6 +1015,7 @@ function createElectricSync<T extends Row<unknown>>(
10151015
begin: () => void,
10161016
write: (message: Omit<ChangeMessage<T>, `key`>) => void,
10171017
transactionStarted: boolean,
1018+
transactionState: Map<RowId, T>
10181019
): boolean => {
10191020
if (tagLength === undefined) {
10201021
debug(
@@ -1038,12 +1039,15 @@ function createElectricSync<T extends Row<unknown>>(
10381039
txStarted = true
10391040
}
10401041

1041-
const rowValue = collection.get(rowId)
1042+
// Get row value from transaction state (uncommitted) or collection
1043+
const rowValue = transactionState.get(rowId) ?? collection.get(rowId)
10421044
if (rowValue !== undefined) {
10431045
write({
10441046
type: `delete`,
10451047
value: rowValue,
10461048
})
1049+
// Remove from transaction state since we're deleting it
1050+
transactionState.delete(rowId)
10471051
}
10481052
}
10491053
}
@@ -1164,6 +1168,10 @@ function createElectricSync<T extends Row<unknown>>(
11641168
syncMode === `progressive` && !hasReceivedUpToDate
11651169
const bufferedMessages: Array<Message<T>> = [] // Buffer change messages during initial sync
11661170

1171+
// Track row state during the current transaction to access uncommitted row values
1172+
// This allows us to handle partial updates correctly by merging with existing state
1173+
const transactionState = new Map<RowId, T>()
1174+
11671175
/**
11681176
* Process a change message: handle tags and write the mutation
11691177
*/
@@ -1178,20 +1186,62 @@ function createElectricSync<T extends Row<unknown>>(
11781186
const hasTags = tags || removedTags
11791187

11801188
const rowId = collection.getKeyFromItem(changeMessage.value)
1189+
const operation = changeMessage.headers.operation
1190+
1191+
if (operation === `insert`) {
1192+
// For insert, store the full row in transaction state
1193+
transactionState.set(rowId, changeMessage.value)
1194+
1195+
if (hasTags) {
1196+
processTagsForChangeMessage(tags, removedTags, rowId)
1197+
}
11811198

1182-
if (changeMessage.headers.operation === `delete`) {
1199+
write({
1200+
type: `insert`,
1201+
value: changeMessage.value,
1202+
metadata: {
1203+
...changeMessage.headers,
1204+
},
1205+
})
1206+
} else if (operation === `update`) {
1207+
// For update, merge with existing state (from transaction state or collection)
1208+
const existingValue =
1209+
transactionState.get(rowId) ?? collection.get(rowId)
1210+
1211+
// Merge the update with existing value (handles partial updates)
1212+
const updatedValue =
1213+
existingValue !== undefined
1214+
? Object.assign({}, existingValue, changeMessage.value)
1215+
: changeMessage.value
1216+
1217+
// Store the merged result in transaction state
1218+
transactionState.set(rowId, updatedValue)
1219+
1220+
if (hasTags) {
1221+
processTagsForChangeMessage(tags, removedTags, rowId)
1222+
}
1223+
1224+
write({
1225+
type: `update`,
1226+
value: updatedValue,
1227+
metadata: {
1228+
...changeMessage.headers,
1229+
},
1230+
})
1231+
} else {
1232+
// Operation is delete
11831233
clearTagsForRow(rowId)
1184-
} else if (hasTags) {
1185-
processTagsForChangeMessage(tags, removedTags, rowId)
1234+
// Remove from transaction state
1235+
transactionState.delete(rowId)
1236+
1237+
write({
1238+
type: `delete`,
1239+
value: changeMessage.value,
1240+
metadata: {
1241+
...changeMessage.headers,
1242+
},
1243+
})
11861244
}
1187-
1188-
write({
1189-
type: changeMessage.headers.operation,
1190-
value: changeMessage.value,
1191-
metadata: {
1192-
...changeMessage.headers,
1193-
},
1194-
})
11951245
}
11961246

11971247
// Create deduplicated loadSubset wrapper for non-eager modes
@@ -1213,7 +1263,7 @@ function createElectricSync<T extends Row<unknown>>(
12131263

12141264
for (const message of messages) {
12151265
// Add message to current batch buffer (for race condition handling)
1216-
if (isChangeMessage(message)) {
1266+
if (isChangeMessage(message) || isMoveOutMessage(message)) {
12171267
currentBatchMessages.setState((currentBuffer) => {
12181268
const newBuffer = [...currentBuffer, message]
12191269
// Limit buffer size for safety
@@ -1281,14 +1331,20 @@ function createElectricSync<T extends Row<unknown>>(
12811331
} else if (isUpToDateMessage(message)) {
12821332
hasUpToDate = true
12831333
} else if (isMoveOutMessage(message)) {
1284-
// Handle move-out event: remove matching tags from rows
1285-
transactionStarted = processMoveOutEvent(
1286-
message.headers.patterns,
1287-
collection,
1288-
begin,
1289-
write,
1290-
transactionStarted,
1291-
)
1334+
// Handle move-out event: buffer if buffering, otherwise process immediately
1335+
if (isBufferingInitialSync()) {
1336+
bufferedMessages.push(message)
1337+
} else {
1338+
// Normal processing: process move-out immediately
1339+
transactionStarted = processMoveOutEvent(
1340+
message.headers.patterns,
1341+
collection,
1342+
begin,
1343+
write,
1344+
transactionStarted,
1345+
transactionState
1346+
)
1347+
}
12921348
} else if (isMustRefetchMessage(message)) {
12931349
debug(
12941350
`${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`,
@@ -1347,11 +1403,22 @@ function createElectricSync<T extends Row<unknown>>(
13471403
} else if (isSnapshotEndMessage(bufferedMsg)) {
13481404
// Extract snapshots from buffered messages (will be committed to store after transaction)
13491405
newSnapshots.push(parseSnapshotMessage(bufferedMsg))
1406+
} else if (isMoveOutMessage(bufferedMsg)) {
1407+
// Process buffered move-out messages during atomic swap
1408+
processMoveOutEvent(
1409+
bufferedMsg.headers.patterns,
1410+
collection,
1411+
begin,
1412+
write,
1413+
transactionStarted,
1414+
transactionState
1415+
)
13501416
}
13511417
}
13521418

13531419
// Commit the atomic swap
13541420
commit()
1421+
transactionState.clear()
13551422

13561423
// Exit buffering phase by marking that we've received up-to-date
13571424
// isBufferingInitialSync() will now return false
@@ -1371,6 +1438,7 @@ function createElectricSync<T extends Row<unknown>>(
13711438
if (transactionStarted && shouldCommit) {
13721439
commit()
13731440
transactionStarted = false
1441+
transactionState.clear()
13741442
}
13751443
}
13761444

0 commit comments

Comments
 (0)