Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,33 +186,40 @@ public int compareTo(BinlogOffset that) {
String targetGtidSetStr = that.getGtidSet();
if (StringUtils.isNotEmpty(targetGtidSetStr)) {
// The target offset uses GTIDs, so we ideally compare using GTIDs ...
if (StringUtils.isNotEmpty(gtidSetStr)) {
// Both have GTIDs, so base the comparison entirely on the GTID sets.
GtidSet gtidSet = new GtidSet(gtidSetStr);
GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
if (gtidSet.equals(targetGtidSet)) {
long restartSkipEvents = this.getRestartSkipEvents();
long targetRestartSkipEvents = that.getRestartSkipEvents();
return Long.compare(restartSkipEvents, targetRestartSkipEvents);
}
if (StringUtils.isEmpty(gtidSetStr)) {
// The target offset did use GTIDs while this did not use GTIDs. So, we assume
// that this offset is older since GTIDs are often enabled but rarely disabled.
// And if they are disabled,
// it is likely that this offset would not include GTIDs as we would be trying
// to read the binlog of a
// server that no longer has GTIDs. And if they are enabled, disabled, and
// re-enabled, per
// https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html
// all properly configured slaves that
// use GTIDs should always have the complete set of GTIDs copied from the master, in
// which case
// again we know that this offset not having GTIDs is before the target offset ...
return -1;
}
// Both have GTIDs, so base the comparison entirely on the GTID sets.
GtidSet gtidSet = new GtidSet(gtidSetStr);
GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
if (!gtidSet.equals(targetGtidSet)) {
// The GTIDs are not an exact match, so figure out if this is a subset of the target
// offset
// ...
return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
}
// The target offset did use GTIDs while this did not use GTIDs. So, we assume
// that this offset is older since GTIDs are often enabled but rarely disabled.
// And if they are disabled,
// it is likely that this offset would not include GTIDs as we would be trying
// to read the binlog of a
// server that no longer has GTIDs. And if they are enabled, disabled, and re-enabled,
// per
// https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html all properly
// configured slaves that
// use GTIDs should always have the complete set of GTIDs copied from the master, in
// which case
// again we know that this offset not having GTIDs is before the target offset ...
return -1;

// The GTIDs are the same, so compare the completed events in the transaction ...
long restartSkipEvents = this.getRestartSkipEvents();
long targetRestartSkipEvents = that.getRestartSkipEvents();
if (restartSkipEvents != targetRestartSkipEvents) {
return Long.compare(restartSkipEvents, targetRestartSkipEvents);
}

// The completed events are the same, so compare the row number ...
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
} else if (StringUtils.isNotEmpty(gtidSetStr)) {
// This offset has a GTID but the target offset does not, so per the previous paragraph
// we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
Expand All @@ -80,6 +81,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -786,6 +788,94 @@ void testReadBinlogFromGtidSet() throws Exception {
assertEqualsInOrder(Arrays.asList(expected), actual);
}

/**
* In a bad case, it will skip the rest records whitch causes infinite wait for empty data. So
* it should has a timeout to avoid it.
*/
@Test
@Timeout(value = 600, unit = TimeUnit.SECONDS)
void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws Exception {
// Preparations
customerDatabase.createAndInitialize();
MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));

// Capture the current binlog offset, and we will start the reader from here
BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection);

// In this case, the binlog is:
// Event 0: QUERY,BEGIN
// Event 1: TABLE_MAP
// Event 2: Update id = 101 and id = 102
// ROW 1 : Update id=101
// ROW 2 : Update id=102
// Event 3: TABLE_MAP
// Event 4: Update id = 103 and id = 109
// ROW 1 : Update id=103
// ROW 2 : Update id=109

// When a checkpoint is triggered
// after id=103 ,before id=109 ,
// the position restored from checkpoint will be event=4 and row=1
BinlogOffset checkpointOffset =
BinlogOffset.builder()
.setBinlogFilePosition("", 0)
.setGtidSet(startingOffset.getGtidSet())
// Because the position restored from checkpoint
// will skip 4 events to drop the first update:
// QUERY / TABLE_MAP / EXT_UPDATE_ROWS / TABLE_MAP
.setSkipEvents(4)
// The position restored from checkpoint
// will skip 1 rows to drop the first
.setSkipRows(1)
.build();

// Create a new config to start reading from the offset captured above
MySqlSourceConfig sourceConfig =
getConfig(
StartupOptions.specificOffset(checkpointOffset),
new String[] {"customers"});

// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
BinlogSplitReader reader = createBinlogReader(sourceConfig);
reader.submitSplit(split);

// Create some binlog events:
// Event 0: QUERY,BEGIN
// Event 1: TABLE_MAP
// Event 2: Update id = 101 and id = 102
// ROW 1 : Update id=101
// ROW 2 : Update id=102
// Event 3: TABLE_MAP
// Event 4: Update id = 103 and id = 109
// ROW 1 : Update id=103
// ROW 2 : Update id=109
// The event 0-3 will be dropped because skipEvents = 4.
// The row 1 in event 4 will be dropped because skipRows = 1.
// Only the update on 109 will be captured.
updateCustomersTableInBulk(
mySqlConnection, customerDatabase.qualifiedTableName("customers"));

// Read with binlog split reader and validate
String[] expected =
new String[] {
"-U[109, user_4, Shanghai, 123567891234]",
"+U[109, user_4, Pittsburgh, 123567891234]"
};
List<String> actual = readBinlogSplits(dataType, reader, expected.length);

reader.close();
assertEqualsInOrder(Arrays.asList(expected), actual);
}

@Test
void testReadBinlogFromTimestamp() throws Exception {
// Preparations
Expand Down
Loading