Skip to content

Commit 8f24e77

Browse files
Fix integration tests to expect TIMESTAMP_TZ instead of TIMESTAMP_LTZ
Update PostgresFullTypesITCase to expect TIMESTAMP_TZ for PostgreSQL TIMESTAMPTZ columns, matching the corrected schema inference behavior.
1 parent 664e296 commit 8f24e77

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.common.data.DateData;
2525
import org.apache.flink.cdc.common.data.DecimalData;
2626
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
27+
import org.apache.flink.cdc.common.data.ZonedTimestampData;
2728
import org.apache.flink.cdc.common.data.RecordData;
2829
import org.apache.flink.cdc.common.data.TimeData;
2930
import org.apache.flink.cdc.common.data.TimestampData;
@@ -68,6 +69,7 @@
6869
import java.time.Instant;
6970
import java.time.LocalDateTime;
7071
import java.time.LocalTime;
72+
import java.time.OffsetDateTime;
7173
import java.time.ZoneId;
7274
import java.util.ArrayList;
7375
import java.util.HashMap;
@@ -298,7 +300,8 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {
298300
TimestampData.fromLocalDateTime(
299301
LocalDateTime.parse("2020-07-17T18:00:22.123456")),
300302
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
301-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
303+
ZonedTimestampData.fromOffsetDateTime(
304+
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
302305
};
303306

304307
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
@@ -354,7 +357,8 @@ public void testTimeTypesWithTemporalModeMicroSeconds() throws Exception {
354357
TimestampData.fromLocalDateTime(
355358
LocalDateTime.parse("2020-07-17T18:00:22.123456")),
356359
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
357-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
360+
ZonedTimestampData.fromOffsetDateTime(
361+
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
358362
};
359363

360364
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
@@ -409,7 +413,8 @@ public void testTimeTypesWithTemporalModeConnect() throws Exception {
409413
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
410414
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
411415
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
412-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
416+
ZonedTimestampData.fromOffsetDateTime(
417+
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
413418
};
414419

415420
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
@@ -1042,7 +1047,7 @@ private Instant toInstant(String ts) {
10421047
DataTypes.TIMESTAMP(3),
10431048
DataTypes.TIMESTAMP(6),
10441049
DataTypes.TIMESTAMP(),
1045-
DataTypes.TIMESTAMP_LTZ(0));
1050+
DataTypes.TIMESTAMP_TZ(0));
10461051

10471052
private static final RowType HSTORE_TYPES_WITH_ADAPTIVE =
10481053
RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,8 @@ protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
377377
String str = (String) dbzObj;
378378
// ZonedTimestamp type is encoded in string type with timezone offset
379379
// Format: ISO-8601 with timezone offset (e.g., "2020-07-17T18:00:22+00:00")
380+
// According to Debezium documentation, PostgreSQL TIMESTAMPTZ is ALWAYS encoded as String
381+
// with ZonedTimestamp.SCHEMA_NAME, regardless of time.precision.mode
380382
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
381383
// Parse using Debezium's ZonedTimestamp formatter
382384
OffsetDateTime offsetDateTime = OffsetDateTime.parse(str, ZonedTimestamp.FORMATTER);
@@ -391,7 +393,11 @@ protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
391393
"Unable to convert to TIMESTAMP WITH TIME ZONE from unexpected value '"
392394
+ dbzObj
393395
+ "' of type "
394-
+ dbzObj.getClass().getName());
396+
+ dbzObj.getClass().getName()
397+
+ " with schema name '"
398+
+ (schema != null ? schema.name() : "null")
399+
+ "'. PostgreSQL TIMESTAMPTZ should always be encoded as String with "
400+
+ ZonedTimestamp.SCHEMA_NAME);
395401
}
396402

397403
protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {

0 commit comments

Comments
 (0)