diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java index c6511a1e72b..d6db974c4cf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java @@ -186,33 +186,40 @@ public int compareTo(BinlogOffset that) { String targetGtidSetStr = that.getGtidSet(); if (StringUtils.isNotEmpty(targetGtidSetStr)) { // The target offset uses GTIDs, so we ideally compare using GTIDs ... - if (StringUtils.isNotEmpty(gtidSetStr)) { - // Both have GTIDs, so base the comparison entirely on the GTID sets. - GtidSet gtidSet = new GtidSet(gtidSetStr); - GtidSet targetGtidSet = new GtidSet(targetGtidSetStr); - if (gtidSet.equals(targetGtidSet)) { - long restartSkipEvents = this.getRestartSkipEvents(); - long targetRestartSkipEvents = that.getRestartSkipEvents(); - return Long.compare(restartSkipEvents, targetRestartSkipEvents); - } + if (StringUtils.isEmpty(gtidSetStr)) { + // The target offset did use GTIDs while this did not use GTIDs. So, we assume + // that this offset is older since GTIDs are often enabled but rarely disabled. + // And if they are disabled, + // it is likely that this offset would not include GTIDs as we would be trying + // to read the binlog of a + // server that no longer has GTIDs. And if they are enabled, disabled, and + // re-enabled, per + // https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html + // all properly configured slaves that + // use GTIDs should always have the complete set of GTIDs copied from the master, in + // which case + // again we know that this offset not having GTIDs is before the target offset ... + return -1; + } + // Both have GTIDs, so base the comparison entirely on the GTID sets. + GtidSet gtidSet = new GtidSet(gtidSetStr); + GtidSet targetGtidSet = new GtidSet(targetGtidSetStr); + if (!gtidSet.equals(targetGtidSet)) { // The GTIDs are not an exact match, so figure out if this is a subset of the target // offset // ... return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1; } - // The target offset did use GTIDs while this did not use GTIDs. So, we assume - // that this offset is older since GTIDs are often enabled but rarely disabled. - // And if they are disabled, - // it is likely that this offset would not include GTIDs as we would be trying - // to read the binlog of a - // server that no longer has GTIDs. And if they are enabled, disabled, and re-enabled, - // per - // https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html all properly - // configured slaves that - // use GTIDs should always have the complete set of GTIDs copied from the master, in - // which case - // again we know that this offset not having GTIDs is before the target offset ... - return -1; + + // The GTIDs are the same, so compare the completed events in the transaction ... + long restartSkipEvents = this.getRestartSkipEvents(); + long targetRestartSkipEvents = that.getRestartSkipEvents(); + if (restartSkipEvents != targetRestartSkipEvents) { + return Long.compare(restartSkipEvents, targetRestartSkipEvents); + } + + // The completed events are the same, so compare the row number ... + return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows()); } else if (StringUtils.isNotEmpty(gtidSetStr)) { // This offset has a GTID but the target offset does not, so per the previous paragraph // we diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 0a6417b3e62..38113ed7fbd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -66,6 +66,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.testcontainers.lifecycle.Startables; import java.sql.Connection; @@ -80,6 +81,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -786,6 +788,94 @@ void testReadBinlogFromGtidSet() throws Exception { assertEqualsInOrder(Arrays.asList(expected), actual); } + /** + * In a bad case, it will skip the rest records whitch causes infinite wait for empty data. So + * it should has a timeout to avoid it. + */ + @Test + @Timeout(value = 600, unit = TimeUnit.SECONDS) + void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws Exception { + // Preparations + customerDatabase.createAndInitialize(); + MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); + binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + + // Capture the current binlog offset, and we will start the reader from here + BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection); + + // In this case, the binlog is: + // Event 0: QUERY,BEGIN + // Event 1: TABLE_MAP + // Event 2: Update id = 101 and id = 102 + // ROW 1 : Update id=101 + // ROW 2 : Update id=102 + // Event 3: TABLE_MAP + // Event 4: Update id = 103 and id = 109 + // ROW 1 : Update id=103 + // ROW 2 : Update id=109 + + // When a checkpoint is triggered + // after id=103 ,before id=109 , + // the position restored from checkpoint will be event=4 and row=1 + BinlogOffset checkpointOffset = + BinlogOffset.builder() + .setBinlogFilePosition("", 0) + .setGtidSet(startingOffset.getGtidSet()) + // Because the position restored from checkpoint + // will skip 4 events to drop the first update: + // QUERY / TABLE_MAP / EXT_UPDATE_ROWS / TABLE_MAP + .setSkipEvents(4) + // The position restored from checkpoint + // will skip 1 rows to drop the first + .setSkipRows(1) + .build(); + + // Create a new config to start reading from the offset captured above + MySqlSourceConfig sourceConfig = + getConfig( + StartupOptions.specificOffset(checkpointOffset), + new String[] {"customers"}); + + // Create reader and submit splits + MySqlBinlogSplit split = createBinlogSplit(sourceConfig); + BinlogSplitReader reader = createBinlogReader(sourceConfig); + reader.submitSplit(split); + + // Create some binlog events: + // Event 0: QUERY,BEGIN + // Event 1: TABLE_MAP + // Event 2: Update id = 101 and id = 102 + // ROW 1 : Update id=101 + // ROW 2 : Update id=102 + // Event 3: TABLE_MAP + // Event 4: Update id = 103 and id = 109 + // ROW 1 : Update id=103 + // ROW 2 : Update id=109 + // The event 0-3 will be dropped because skipEvents = 4. + // The row 1 in event 4 will be dropped because skipRows = 1. + // Only the update on 109 will be captured. + updateCustomersTableInBulk( + mySqlConnection, customerDatabase.qualifiedTableName("customers")); + + // Read with binlog split reader and validate + String[] expected = + new String[] { + "-U[109, user_4, Shanghai, 123567891234]", + "+U[109, user_4, Pittsburgh, 123567891234]" + }; + List actual = readBinlogSplits(dataType, reader, expected.length); + + reader.close(); + assertEqualsInOrder(Arrays.asList(expected), actual); + } + @Test void testReadBinlogFromTimestamp() throws Exception { // Preparations diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java new file mode 100644 index 00000000000..58f34c16aaf --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.offset; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Unit test for {@link BinlogOffset}. */ +public class BinlogOffsetTest { + public static final String PART_OF_GTID_SET_1 = "abcd:1-4"; + public static final String PART_OF_GTID_SET_2 = "efgh:1-10"; + public static final String FULL_GTID_SET = + String.join(",", PART_OF_GTID_SET_1, PART_OF_GTID_SET_2); + + @Test + public void testCompareToWithGtidSet() { + // Test same GTID sets in different orders + BinlogOffset offset1 = BinlogOffset.builder().setGtidSet(FULL_GTID_SET).build(); + BinlogOffset offset2 = + BinlogOffset.builder() + .setGtidSet(String.join(",", PART_OF_GTID_SET_2, PART_OF_GTID_SET_1)) + .build(); + assetCompareTo(offset1, offset2, 0); + + // The test uses GTID instead of position for comparison. + offset1 = + BinlogOffset.builder() + .setGtidSet(FULL_GTID_SET) + .setBinlogFilePosition("binlog.001", 123) + .build(); + offset2 = + BinlogOffset.builder() + .setGtidSet(String.join(",", PART_OF_GTID_SET_2, PART_OF_GTID_SET_1)) + .setBinlogFilePosition("binlog.001", 456) + .build(); + assetCompareTo(offset1, offset2, 0); + + // Test different GTID sets where one contains another + BinlogOffset offset3 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build(); + BinlogOffset offset4 = + BinlogOffset.builder() + .setGtidSet("abcd:1-5") // Contains offset3's GTID set + .build(); + + // offset3 should be before offset4 + assetCompareTo(offset3, offset4, -1); + assetCompareTo(offset4, offset3, 1); + + // The test uses GTID instead of position for comparison. + offset3 = + BinlogOffset.builder() + .setGtidSet(PART_OF_GTID_SET_1) + .setBinlogFilePosition("binlog.001", 1000) + .build(); + offset4 = + BinlogOffset.builder() + .setGtidSet("abcd:1-5") // Contains offset3's GTID set + .setBinlogFilePosition("binlog.001", 23) + .build(); + assetCompareTo(offset3, offset4, -1); + assetCompareTo(offset4, offset3, 1); + + // Test completely different GTID sets + BinlogOffset offset5 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build(); + BinlogOffset offset6 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build(); + + // offsets don't contain each other, result is always 1 + assetCompareTo(offset5, offset6, 1); + assetCompareTo(offset6, offset5, 1); + } + + @Test + public void testCompareToWithGtidSetAndSkipEventsAndSkipRows() { + // Test same GTID but different skip events + BinlogOffset offset1 = + BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(5).build(); + BinlogOffset offset2 = + BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(10).build(); + + assetCompareTo(offset1, offset2, -1); + assetCompareTo(offset2, offset1, 1); + + // Test same GTID and skip events but different skip rows + BinlogOffset offset3 = + BinlogOffset.builder() + .setGtidSet(FULL_GTID_SET) + .setSkipEvents(5) + .setSkipRows(10) + .build(); + BinlogOffset offset4 = + BinlogOffset.builder() + .setGtidSet(FULL_GTID_SET) + .setSkipEvents(5) + .setSkipRows(20) + .build(); + + assetCompareTo(offset3, offset4, -1); + assetCompareTo(offset4, offset3, 1); + } + + @Test + public void testCompareToWithGtidSetExistence() { + // Test one offset has GTID set and another doesn't + BinlogOffset offsetWithGtid = + BinlogOffset.builder() + .setGtidSet(PART_OF_GTID_SET_1) + .setBinlogFilePosition("binlog.001", 123) + .build(); + BinlogOffset offsetWithoutGtid = + BinlogOffset.builder().setBinlogFilePosition("binlog.001", 456).build(); + + // When one has GTID and another doesn't, the one without GTID is considered older + assetCompareTo(offsetWithGtid, offsetWithoutGtid, 1); + assetCompareTo(offsetWithoutGtid, offsetWithGtid, -1); + + // Test the reverse scenario + BinlogOffset offsetWithGtid2 = + BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build(); + BinlogOffset offsetWithoutGtid2 = + BinlogOffset.builder() + .setBinlogFilePosition("binlog.002", 789) + .setSkipEvents(5) + .build(); + + assetCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1); + assetCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1); + } + + @Test + public void testCompareToWithFilePosition() { + // Test same file position - should be equal + BinlogOffset offset1 = + BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build(); + BinlogOffset offset2 = + BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build(); + assetCompareTo(offset1, offset2, 0); + + // Test different file names + BinlogOffset offset3 = + BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build(); + BinlogOffset offset4 = + BinlogOffset.builder().setBinlogFilePosition("binlog.002", 123).build(); + assetCompareTo(offset3, offset4, -1); + assetCompareTo(offset4, offset3, 1); + + // Test different positions in same file + BinlogOffset offset5 = + BinlogOffset.builder().setBinlogFilePosition("binlog.001", 100).build(); + BinlogOffset offset6 = + BinlogOffset.builder().setBinlogFilePosition("binlog.001", 200).build(); + assetCompareTo(offset5, offset6, -1); + assetCompareTo(offset6, offset5, 1); + } + + @Test + public void testCompareToWithFilePositionAndSkipEventsAndSkipRows() { + // Test with skip events + BinlogOffset offset1 = + BinlogOffset.builder() + .setBinlogFilePosition("binlog.001", 123) + .setSkipEvents(5) + .build(); + BinlogOffset offset2 = + BinlogOffset.builder() + .setBinlogFilePosition("binlog.001", 123) + .setSkipEvents(10) + .build(); + assetCompareTo(offset1, offset2, -1); + assetCompareTo(offset2, offset1, 1); + + // Test with skip rows + BinlogOffset offset3 = + BinlogOffset.builder() + .setBinlogFilePosition("binlog.001", 123) + .setSkipEvents(5) + .setSkipRows(10) + .build(); + BinlogOffset offset4 = + BinlogOffset.builder() + .setBinlogFilePosition("binlog.001", 123) + .setSkipEvents(5) + .setSkipRows(20) + .build(); + assetCompareTo(offset3, offset4, -1); + assetCompareTo(offset4, offset3, 1); + } + + @Test + public void testCompareToTimestampWithDifferentServerId() { + // Test different server IDs with different timestamps + BinlogOffset offset1 = + BinlogOffset.builder() + .setServerId(1L) + .setTimestampSec(1000L) + .setBinlogFilePosition("binlog.001", 123) + .build(); + BinlogOffset offset2 = + BinlogOffset.builder() + .setServerId(2L) + .setTimestampSec(2000L) + .setBinlogFilePosition("binlog.001", 123) + .build(); + + // Should compare based on timestamp since server IDs are different + assetCompareTo(offset1, offset2, -1); + assetCompareTo(offset2, offset1, 1); + + // Test same timestamps but different server IDs + BinlogOffset offset3 = + BinlogOffset.builder() + .setServerId(1L) + .setTimestampSec(1500L) + .setBinlogFilePosition("binlog.001", 432) + .build(); + BinlogOffset offset4 = + BinlogOffset.builder() + .setServerId(2L) + .setTimestampSec(1500L) + .setBinlogFilePosition("binlog.001", 123) + .build(); + + // Same timestamp, different server IDs - should compare based on timestamp (which are + // equal) + // But since server IDs are different and timestamps are same, it will fall through to file + // position comparison + // Since file positions are same, it will compare skip events (default 0) + assetCompareTo(offset3, offset4, 0); + } + + private void assetCompareTo(BinlogOffset offset1, BinlogOffset offset2, int expected) { + int actual = offset1.compareTo(offset2); + Assertions.assertThat(expected).isEqualTo(actual); + } +}