Skip to content

Commit 726c909

Browse files
committed
Cherry pick #450 repl: track updates to PRIMARY KEY correctly
1 parent 4c2f04f commit 726c909

File tree

2 files changed

+180
-23
lines changed

2 files changed

+180
-23
lines changed

pkg/repl/client.go

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -387,31 +387,77 @@ func (c *Client) processRowsEvent(ev *replication.BinlogEvent, e *replication.Ro
387387
return nil // ignore event, it could be to a _new table.
388388
}
389389
eventType := parseEventType(ev.Header.EventType)
390-
var i = 0
391-
for _, row := range e.Rows {
392-
if eventType == eventTypeUpdate {
393-
// For update events there are always before and after images (i.e. e.Rows is always in pairs.)
394-
// We only need to capture one of the events, and since in MINIMAL RBR row
395-
// image the PK is only included in the before, we chose that one.
396-
i++
397-
if i%2 == 0 {
398-
continue
390+
391+
if eventType == eventTypeUpdate {
392+
// For update events there are always before and after images (i.e. e.Rows is always in pairs.)
393+
// With MINIMAL row image, the PK is only included in the before image for non-PK updates.
394+
// For PK updates, both before and after images will contain the PK columns since they changed.
395+
for i := 0; i < len(e.Rows); i += 2 {
396+
beforeRow := e.Rows[i]
397+
afterRow := e.Rows[i+1]
398+
399+
// Always process the before image (guaranteed to have PK in minimal mode)
400+
beforeKey, err := sub.table.PrimaryKeyValues(beforeRow)
401+
if err != nil {
402+
return err
403+
}
404+
if len(beforeKey) == 0 {
405+
return fmt.Errorf("no primary key found for before row: %#v", beforeRow)
406+
}
407+
408+
// With MINIMAL row image, we need to reconstruct the after key
409+
// by combining the before key with any changed PK columns from the after image
410+
afterKey := make([]any, len(beforeKey))
411+
copy(afterKey, beforeKey) // Start with the before key
412+
413+
// Check if any PK columns were updated by examining the after row
414+
isPKUpdate := false
415+
afterRowSlice := afterRow
416+
417+
for pkIdx, pkCol := range sub.table.KeyColumns {
418+
// Find the position of this PK column in the table columns
419+
for colIdx, col := range sub.table.Columns {
420+
if col == pkCol {
421+
// If this column exists in the after image and is not nil, use it
422+
if colIdx < len(afterRowSlice) && afterRowSlice[colIdx] != nil {
423+
if fmt.Sprintf("%v", beforeKey[pkIdx]) != fmt.Sprintf("%v", afterRowSlice[colIdx]) {
424+
afterKey[pkIdx] = afterRowSlice[colIdx]
425+
isPKUpdate = true
426+
}
427+
}
428+
break
429+
}
430+
}
431+
}
432+
433+
if isPKUpdate {
434+
// This is a primary key update - track both delete and insert
435+
sub.keyHasChanged(beforeKey, true) // delete old key
436+
sub.keyHasChanged(afterKey, false) // insert new key
437+
} else {
438+
// Same PK, just a regular update
439+
sub.keyHasChanged(beforeKey, false)
399440
}
400441
}
401-
key, err := sub.table.PrimaryKeyValues(row)
402-
if err != nil {
403-
return err
404-
}
405-
if len(key) == 0 {
406-
return fmt.Errorf("no primary key found for row: %#v", row)
407-
}
408-
switch eventType {
409-
case eventTypeInsert, eventTypeUpdate:
410-
sub.keyHasChanged(key, false)
411-
case eventTypeDelete:
412-
sub.keyHasChanged(key, true)
413-
default:
414-
c.logger.Errorf("unknown event type: %v", ev.Header.EventType)
442+
} else {
443+
// For INSERT and DELETE events, process each row normally
444+
for _, row := range e.Rows {
445+
key, err := sub.table.PrimaryKeyValues(row)
446+
if err != nil {
447+
return err
448+
}
449+
if len(key) == 0 {
450+
// In theory this is unreachable since we mandate a PK on tables
451+
return fmt.Errorf("no primary key found for row: %#v", row)
452+
}
453+
switch eventType {
454+
case eventTypeInsert:
455+
sub.keyHasChanged(key, false)
456+
case eventTypeDelete:
457+
sub.keyHasChanged(key, true)
458+
default:
459+
c.logger.Errorf("unknown event type: %v", ev.Header.EventType)
460+
}
415461
}
416462
}
417463
return nil

pkg/repl/client_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/cashapp/spirit/pkg/table"
1919
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
2021
)
2122

2223
func TestMain(m *testing.M) {
@@ -547,6 +548,116 @@ func TestSetDDLNotificationChannel(t *testing.T) {
547548
})
548549
}
549550

551+
// TestCompositePKUpdate tests that we correctly handle
552+
// the case when a PRIMARY KEY is moved.
553+
// See: https://github.com/block/spirit/issues/417
554+
func TestCompositePKUpdate(t *testing.T) {
555+
db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
556+
assert.NoError(t, err)
557+
defer db.Close()
558+
559+
// Drop tables if they exist
560+
testutils.RunSQL(t, "DROP TABLE IF EXISTS composite_pk_src, composite_pk_dst")
561+
562+
// Create a table with composite primary key similar to customer's message_groups table
563+
testutils.RunSQL(t, `CREATE TABLE composite_pk_src (
564+
organization_id BIGINT NOT NULL,
565+
from_id BIGINT NOT NULL DEFAULT 0,
566+
id BIGINT NOT NULL,
567+
message VARCHAR(255) NOT NULL,
568+
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
569+
PRIMARY KEY (organization_id, from_id, id),
570+
UNIQUE KEY idx_id (id)
571+
)`)
572+
573+
testutils.RunSQL(t, `CREATE TABLE composite_pk_dst (
574+
organization_id BIGINT NOT NULL,
575+
from_id BIGINT NOT NULL DEFAULT 0,
576+
id BIGINT NOT NULL,
577+
message VARCHAR(255) NOT NULL,
578+
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
579+
PRIMARY KEY (organization_id, from_id, id),
580+
UNIQUE KEY idx_id (id)
581+
)`)
582+
583+
// Insert initial test data in *both* source and destination tables
584+
testutils.RunSQL(t, `INSERT INTO composite_pk_src (organization_id, from_id, id, message) VALUES
585+
(1, 100, 1, 'message 1'),
586+
(1, 200, 2, 'message 2'),
587+
(1, 300, 3, 'message 3'),
588+
(2, 100, 4, 'message 4'),
589+
(2, 200, 5, 'message 5')`)
590+
testutils.RunSQL(t, `INSERT INTO composite_pk_dst SELECT * FROM composite_pk_src`)
591+
592+
// Set up table info
593+
t1 := table.NewTableInfo(db, "test", "composite_pk_src")
594+
assert.NoError(t, t1.SetInfo(t.Context()))
595+
t2 := table.NewTableInfo(db, "test", "composite_pk_dst")
596+
assert.NoError(t, t2.SetInfo(t.Context()))
597+
598+
// Create replication client
599+
logger := logrus.New()
600+
cfg, err := mysql2.ParseDSN(testutils.DSN())
601+
assert.NoError(t, err)
602+
client := NewClient(db, cfg.Addr, cfg.User, cfg.Passwd, &ClientConfig{
603+
Logger: logger,
604+
Concurrency: 4,
605+
TargetBatchTime: time.Second,
606+
ServerID: NewServerID(),
607+
})
608+
609+
// Add subscription - note that keyAboveWatermark is disabled for composite PKs
610+
assert.NoError(t, client.AddSubscription(t1, t2, nil))
611+
assert.NoError(t, client.Run(t.Context()))
612+
defer client.Close()
613+
614+
// Update the from_id (part of the primary key)
615+
testutils.RunSQL(t, `UPDATE composite_pk_src SET from_id = 999 WHERE id IN (1, 3)`)
616+
assert.NoError(t, client.BlockWait(t.Context()))
617+
618+
// The update should result in changes being tracked
619+
// With binlog_row_image=minimal and PK updates, we expect 4 changes (2 deletes + 2 inserts)
620+
deltaLen := client.GetDeltaLen()
621+
require.Equal(t, 4, deltaLen, "Should have tracked 4 changes for PK update (2 deletes + 2 inserts)")
622+
623+
// Flush the changes
624+
// This should update the destination table correctly
625+
assert.NoError(t, client.Flush(t.Context()))
626+
627+
// Verify the data was replicated correctly
628+
var count int
629+
630+
// Check that rows with new from_id exist in destination
631+
err = db.QueryRow(`SELECT COUNT(*) FROM composite_pk_dst
632+
WHERE organization_id = 1 AND from_id = 999 AND id IN (1, 3)`).Scan(&count)
633+
assert.NoError(t, err)
634+
assert.Equal(t, 2, count, "Rows with updated from_id should exist in destination")
635+
636+
// Check that rows with old from_id don't exist in destination
637+
err = db.QueryRow(`SELECT COUNT(*) FROM composite_pk_dst
638+
WHERE (organization_id = 1 AND from_id = 100 AND id = 1)
639+
OR (organization_id = 1 AND from_id = 300 AND id = 3)`).Scan(&count)
640+
assert.NoError(t, err)
641+
assert.Equal(t, 0, count, "Rows with old from_id should not exist in destination")
642+
643+
// Verify total row count
644+
err = db.QueryRow("SELECT COUNT(*) FROM composite_pk_dst").Scan(&count)
645+
assert.NoError(t, err)
646+
assert.Equal(t, 5, count, "Should have all 5 rows in destination")
647+
648+
// Now test another PK update
649+
testutils.RunSQL(t, `UPDATE composite_pk_src SET from_id = 888 WHERE id = 5`)
650+
assert.NoError(t, client.BlockWait(t.Context()))
651+
assert.Positive(t, client.GetDeltaLen(), "Should have tracked changes for second PK update")
652+
assert.NoError(t, client.Flush(t.Context()))
653+
654+
// Verify the second update
655+
err = db.QueryRow(`SELECT COUNT(*) FROM composite_pk_dst
656+
WHERE organization_id = 2 AND from_id = 888 AND id = 5`).Scan(&count)
657+
assert.NoError(t, err)
658+
assert.Equal(t, 1, count, "Row with updated from_id=888 should exist in destination")
659+
}
660+
550661
func TestAllChangesFlushed(t *testing.T) {
551662
srcTable, dstTable := setupTestTables(t)
552663

0 commit comments

Comments
 (0)