Skip to content

Commit 5543e7b

Browse files
Add unit tests for DebeziumEventDeserializationSchema and PostgresSchemaDataTypeInference
1 parent 74357f4 commit 5543e7b

File tree

2 files changed

+375
-0
lines changed

2 files changed

+375
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.common.types.DataType;
21+
import org.apache.flink.cdc.common.types.DataTypes;
22+
23+
import io.debezium.time.ZonedTimestamp;
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaBuilder;
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.lang.reflect.Method;
29+
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/** Test cases for {@link PostgresSchemaDataTypeInference}. */
33+
class PostgresSchemaDataTypeInferenceTest {
34+
35+
private final PostgresSchemaDataTypeInference inference = new PostgresSchemaDataTypeInference();
36+
37+
private DataType inferString(Object value, Schema schema) {
38+
try {
39+
Method method =
40+
PostgresSchemaDataTypeInference.class.getDeclaredMethod(
41+
"inferString", Object.class, Schema.class);
42+
method.setAccessible(true);
43+
return (DataType) method.invoke(inference, value, schema);
44+
} catch (Exception e) {
45+
throw new RuntimeException(e);
46+
}
47+
}
48+
49+
@Test
50+
void testInferZonedTimestampWithZeroPrecision() {
51+
Schema zonedTimestampSchema =
52+
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
53+
54+
DataType result = inferString("2020-07-17T18:00:22+00:00", zonedTimestampSchema);
55+
56+
assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
57+
}
58+
59+
@Test
60+
void testInferZonedTimestampWithMillisecondPrecision() {
61+
Schema zonedTimestampSchema =
62+
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
63+
64+
DataType result = inferString("2020-07-17T18:00:22.123+00:00", zonedTimestampSchema);
65+
66+
assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(3));
67+
}
68+
69+
@Test
70+
void testInferZonedTimestampWithMicrosecondPrecision() {
71+
Schema zonedTimestampSchema =
72+
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
73+
74+
DataType result = inferString("2020-07-17T18:00:22.123456+00:00", zonedTimestampSchema);
75+
76+
assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(6));
77+
}
78+
79+
@Test
80+
void testInferZonedTimestampWithNanosecondPrecision() {
81+
Schema zonedTimestampSchema =
82+
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
83+
84+
DataType result = inferString("2020-07-17T18:00:22.123456789+00:00", zonedTimestampSchema);
85+
86+
assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(9));
87+
}
88+
89+
@Test
90+
void testInferZonedTimestampWithDifferentTimezones() {
91+
Schema zonedTimestampSchema =
92+
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
93+
94+
DataType result1 = inferString("2020-07-17T18:00:22+05:30", zonedTimestampSchema);
95+
DataType result2 = inferString("2020-07-17T18:00:22-08:00", zonedTimestampSchema);
96+
DataType result3 = inferString("2020-07-17T18:00:22Z", zonedTimestampSchema);
97+
98+
assertThat(result1).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
99+
assertThat(result2).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
100+
assertThat(result3).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
101+
}
102+
103+
@Test
104+
void testInferZonedTimestampWithNullValue() {
105+
Schema zonedTimestampSchema =
106+
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
107+
108+
DataType result = inferString(null, zonedTimestampSchema);
109+
110+
assertThat(result).isEqualTo(DataTypes.TIMESTAMP_TZ(0));
111+
}
112+
113+
@Test
114+
void testInferNonZonedTimestampString() {
115+
Schema regularStringSchema =
116+
SchemaBuilder.string().name("some.other.schema").optional().build();
117+
118+
DataType result = inferString("some string value", regularStringSchema);
119+
120+
assertThat(result).isEqualTo(DataTypes.STRING());
121+
}
122+
123+
@Test
124+
void testInferZonedTimestampWithVariousPrecisions() {
125+
Schema zonedTimestampSchema =
126+
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
127+
128+
assertThat(inferString("2020-07-17T18:00:22+00:00", zonedTimestampSchema))
129+
.isEqualTo(DataTypes.TIMESTAMP_TZ(0));
130+
131+
assertThat(inferString("2020-07-17T18:00:22.1+00:00", zonedTimestampSchema))
132+
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));
133+
134+
assertThat(inferString("2020-07-17T18:00:22.12+00:00", zonedTimestampSchema))
135+
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));
136+
137+
assertThat(inferString("2020-07-17T18:00:22.123+00:00", zonedTimestampSchema))
138+
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));
139+
140+
assertThat(inferString("2020-07-17T18:00:22.1234+00:00", zonedTimestampSchema))
141+
.isEqualTo(DataTypes.TIMESTAMP_TZ(6));
142+
143+
assertThat(inferString("2020-07-17T18:00:22.123456+00:00", zonedTimestampSchema))
144+
.isEqualTo(DataTypes.TIMESTAMP_TZ(6));
145+
146+
assertThat(inferString("2020-07-17T18:00:22.1234567+00:00", zonedTimestampSchema))
147+
.isEqualTo(DataTypes.TIMESTAMP_TZ(9));
148+
149+
assertThat(inferString("2020-07-17T18:00:22.123456789+00:00", zonedTimestampSchema))
150+
.isEqualTo(DataTypes.TIMESTAMP_TZ(9));
151+
}
152+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.debezium.event;
19+
20+
import org.apache.flink.cdc.common.data.ZonedTimestampData;
21+
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
22+
23+
import io.debezium.time.ZonedTimestamp;
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaBuilder;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.time.OffsetDateTime;
30+
import java.time.ZoneOffset;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
34+
35+
/** Test cases for {@link DebeziumEventDeserializationSchema}. */
36+
class DebeziumEventDeserializationSchemaTest {
37+
38+
private TestDebeziumEventDeserializationSchema deserializer;
39+
40+
@BeforeEach
41+
void setUp() {
42+
deserializer = new TestDebeziumEventDeserializationSchema();
43+
}
44+
45+
@Test
46+
void testConvertToZonedTimestampWithZonedTimestampSchema() throws Exception {
47+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
48+
49+
String timestampStr = "2020-07-17T18:00:22+00:00";
50+
Object result = deserializer.convertToZonedTimestamp(timestampStr, schema);
51+
52+
assertThat(result).isInstanceOf(ZonedTimestampData.class);
53+
ZonedTimestampData zonedTimestampData = (ZonedTimestampData) result;
54+
OffsetDateTime expected = OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER);
55+
assertThat(zonedTimestampData.getZonedDateTime().toOffsetDateTime()).isEqualTo(expected);
56+
}
57+
58+
@Test
59+
void testConvertToZonedTimestampWithDifferentTimezones() throws Exception {
60+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
61+
62+
String timestamp1 = "2020-07-17T18:00:22+05:30";
63+
String timestamp2 = "2020-07-17T18:00:22-08:00";
64+
String timestamp3 = "2020-07-17T18:00:22Z";
65+
66+
ZonedTimestampData result1 =
67+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestamp1, schema);
68+
ZonedTimestampData result2 =
69+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestamp2, schema);
70+
ZonedTimestampData result3 =
71+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestamp3, schema);
72+
73+
assertThat(result1.getZonedDateTime().toOffsetDateTime())
74+
.isEqualTo(OffsetDateTime.parse(timestamp1, ZonedTimestamp.FORMATTER));
75+
assertThat(result2.getZonedDateTime().toOffsetDateTime())
76+
.isEqualTo(OffsetDateTime.parse(timestamp2, ZonedTimestamp.FORMATTER));
77+
assertThat(result3.getZonedDateTime().toOffsetDateTime())
78+
.isEqualTo(OffsetDateTime.parse(timestamp3, ZonedTimestamp.FORMATTER));
79+
}
80+
81+
@Test
82+
void testConvertToZonedTimestampWithMillisecondPrecision() throws Exception {
83+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
84+
85+
String timestampStr = "2020-07-17T18:00:22.123+00:00";
86+
ZonedTimestampData result =
87+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema);
88+
89+
assertThat(result.getZonedDateTime().toOffsetDateTime())
90+
.isEqualTo(OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER));
91+
}
92+
93+
@Test
94+
void testConvertToZonedTimestampWithMicrosecondPrecision() throws Exception {
95+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
96+
97+
String timestampStr = "2020-07-17T18:00:22.123456+00:00";
98+
ZonedTimestampData result =
99+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema);
100+
101+
assertThat(result.getZonedDateTime().toOffsetDateTime())
102+
.isEqualTo(OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER));
103+
}
104+
105+
@Test
106+
void testConvertToZonedTimestampWithNanosecondPrecision() throws Exception {
107+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
108+
109+
String timestampStr = "2020-07-17T18:00:22.123456789+00:00";
110+
ZonedTimestampData result =
111+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema);
112+
113+
assertThat(result.getZonedDateTime().toOffsetDateTime())
114+
.isEqualTo(OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER));
115+
}
116+
117+
@Test
118+
void testConvertToZonedTimestampWithStandardIso8601Format() throws Exception {
119+
Schema schema = SchemaBuilder.string().name("some.other.schema").optional().build();
120+
121+
String timestampStr = "2020-07-17T18:00:22+00:00";
122+
ZonedTimestampData result =
123+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema);
124+
125+
assertThat(result.getZonedDateTime().toOffsetDateTime())
126+
.isEqualTo(OffsetDateTime.parse(timestampStr));
127+
}
128+
129+
@Test
130+
void testConvertToZonedTimestampThrowsExceptionForNonString() {
131+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
132+
133+
assertThatThrownBy(() -> deserializer.convertToZonedTimestamp(12345, schema))
134+
.isInstanceOf(IllegalArgumentException.class)
135+
.hasMessageContaining("Unable to convert to TIMESTAMP WITH TIME ZONE");
136+
}
137+
138+
@Test
139+
void testConvertToZonedTimestampPreservesTimezoneOffset() throws Exception {
140+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
141+
142+
String timestampStr = "2020-07-17T18:00:22+05:30";
143+
ZonedTimestampData result =
144+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema);
145+
146+
OffsetDateTime expected = OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER);
147+
assertThat(result.getZonedDateTime().toOffsetDateTime()).isEqualTo(expected);
148+
assertThat(result.getZonedDateTime().toOffsetDateTime().getOffset())
149+
.isEqualTo(ZoneOffset.of("+05:30"));
150+
}
151+
152+
@Test
153+
void testConvertToZonedTimestampWithUtcTimezone() throws Exception {
154+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
155+
156+
String timestampStr = "2020-07-17T18:00:22Z";
157+
ZonedTimestampData result =
158+
(ZonedTimestampData) deserializer.convertToZonedTimestamp(timestampStr, schema);
159+
160+
OffsetDateTime expected = OffsetDateTime.parse(timestampStr, ZonedTimestamp.FORMATTER);
161+
assertThat(result.getZonedDateTime().toOffsetDateTime()).isEqualTo(expected);
162+
assertThat(result.getZonedDateTime().toOffsetDateTime().getOffset())
163+
.isEqualTo(ZoneOffset.UTC);
164+
}
165+
166+
@Test
167+
void testConvertToZonedTimestampRoundTrip() throws Exception {
168+
Schema schema = SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).optional().build();
169+
170+
String originalTimestamp = "2020-07-17T18:00:22.123456+05:30";
171+
ZonedTimestampData result =
172+
(ZonedTimestampData)
173+
deserializer.convertToZonedTimestamp(originalTimestamp, schema);
174+
175+
OffsetDateTime converted = result.getZonedDateTime().toOffsetDateTime();
176+
String roundTrip = ZonedTimestamp.FORMATTER.format(converted);
177+
178+
assertThat(roundTrip).isEqualTo(originalTimestamp);
179+
}
180+
181+
private static class TestDebeziumEventDeserializationSchema
182+
extends DebeziumEventDeserializationSchema {
183+
184+
public TestDebeziumEventDeserializationSchema() {
185+
super(
186+
new org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference(),
187+
DebeziumChangelogMode.ALL);
188+
}
189+
190+
public Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
191+
return super.convertToZonedTimestamp(dbzObj, schema);
192+
}
193+
194+
@Override
195+
protected boolean isDataChangeRecord(org.apache.kafka.connect.source.SourceRecord record) {
196+
return false;
197+
}
198+
199+
@Override
200+
protected boolean isSchemaChangeRecord(
201+
org.apache.kafka.connect.source.SourceRecord record) {
202+
return false;
203+
}
204+
205+
@Override
206+
protected org.apache.flink.cdc.common.event.TableId getTableId(
207+
org.apache.kafka.connect.source.SourceRecord record) {
208+
return org.apache.flink.cdc.common.event.TableId.tableId("test", "test");
209+
}
210+
211+
@Override
212+
protected java.util.Map<String, String> getMetadata(
213+
org.apache.kafka.connect.source.SourceRecord record) {
214+
return java.util.Collections.emptyMap();
215+
}
216+
217+
@Override
218+
protected java.util.List<org.apache.flink.cdc.common.event.SchemaChangeEvent>
219+
deserializeSchemaChangeRecord(org.apache.kafka.connect.source.SourceRecord record) {
220+
return java.util.Collections.emptyList();
221+
}
222+
}
223+
}

0 commit comments

Comments
 (0)