From bce727101b86ad0cd808a12d0cc5d975d968fa76 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Tue, 25 Nov 2025 19:55:59 +0800 Subject: [PATCH] [WIP][FLINK-38726][fluss] Bump Fluss version to 0.8.0-incubating --- .../pom.xml | 16 ++-- .../fluss/factory/FlussDataSinkFactory.java | 4 +- .../connectors/fluss/sink/CdcAsFlussRow.java | 10 +-- .../connectors/fluss/sink/FlussDataSink.java | 2 +- .../sink/FlussEventSerializationSchema.java | 16 ++-- .../fluss/sink/FlussMetaDataApplier.java | 16 ++-- .../connectors/fluss/sink/v2/FlussEvent.java | 2 +- .../fluss/sink/v2/FlussEventSerializer.java | 2 +- .../fluss/sink/v2/FlussRowWithOp.java | 4 +- .../connectors/fluss/sink/v2/FlussSink.java | 2 +- .../fluss/sink/v2/FlussSinkWriter.java | 24 +++--- .../sink/v2/metrics/WarppedFlussCounter.java | 4 +- .../sink/v2/metrics/WrappedFlussGauge.java | 4 +- .../v2/metrics/WrapperFlussHistogram.java | 8 +- .../sink/v2/metrics/WrapperFlussMeter.java | 4 +- .../metrics/WrapperFlussMetricRegistry.java | 16 ++-- .../fluss/utils/FlussConversions.java | 85 +++++++++---------- .../connectors/fluss/FlussPipelineITCase.java | 20 ++--- .../FlussEventSerializationSchemaTest.java | 8 +- .../fluss/sink/FlussMetadataApplierTest.java | 64 +++++++------- .../fluss/sink/v2/FlussSinkITCase.java | 8 +- .../flink-cdc-pipeline-e2e-tests/pom.xml | 4 +- .../cdc/pipeline/tests/FlussE2eITCase.java | 4 +- 23 files changed, 163 insertions(+), 164 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml index 0ebfbc9d231..3bfc8c27897 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml @@ -33,12 +33,12 @@ limitations under the License. - 0.7.0 + 0.8.0-incubating - com.alibaba.fluss + org.apache.fluss fluss-client ${fluss.version} @@ -60,34 +60,34 @@ limitations under the License. - com.alibaba.fluss + org.apache.fluss fluss-server ${fluss.version} test - com.alibaba.fluss + org.apache.fluss fluss-server ${fluss.version} test-jar test - com.alibaba.fluss + org.apache.fluss fluss-test-utils ${fluss.version} test - com.alibaba.fluss + org.apache.fluss fluss-flink-common ${fluss.version} test-jar test - com.alibaba.fluss + org.apache.fluss fluss-flink-1.20 ${fluss.version} test @@ -123,7 +123,7 @@ limitations under the License. false - com.alibaba.fluss:* + org.apache.fluss:* diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java index 754eb693a39..58e469d2de6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java @@ -23,8 +23,8 @@ import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.connectors.fluss.sink.FlussDataSink; -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.config.Configuration; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; import java.util.HashMap; import java.util.HashSet; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java index 5891d58a721..81ddedfaaaf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java @@ -24,11 +24,11 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; -import com.alibaba.fluss.row.BinaryString; -import com.alibaba.fluss.row.Decimal; -import com.alibaba.fluss.row.InternalRow; -import com.alibaba.fluss.row.TimestampLtz; -import com.alibaba.fluss.row.TimestampNtz; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; import java.util.Map; import java.util.stream.Collectors; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java index e03caf8e6d7..4491e503211 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java @@ -25,7 +25,7 @@ import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussSink; -import com.alibaba.fluss.config.Configuration; +import org.apache.fluss.config.Configuration; import java.util.List; import java.util.Map; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java index d946916b6c5..263cc54c85b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java @@ -28,10 +28,10 @@ import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer; import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp; -import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.table.Table; -import com.alibaba.fluss.metadata.TablePath; -import com.alibaba.fluss.types.DataType; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.DataType; import java.io.IOException; import java.util.Collections; @@ -129,12 +129,12 @@ private TablePath getTablePath(TableId tableId) { private static class TableSchemaInfo { org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema; - com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema; + org.apache.fluss.metadata.Schema downStreamFlusstreamSchema; Map indexMapping; private TableSchemaInfo( org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema, - com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema) { + org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) { this.upstreamCdcSchema = upstreamCdcSchema; this.downStreamFlusstreamSchema = downStreamFlusstreamSchema; this.indexMapping = @@ -144,8 +144,8 @@ private TableSchemaInfo( } static Map sanityCheckAndGenerateIndexMapping( - com.alibaba.fluss.metadata.Schema inferredFlussSchema, - com.alibaba.fluss.metadata.Schema currentFlussnewSchema) { + org.apache.fluss.metadata.Schema inferredFlussSchema, + org.apache.fluss.metadata.Schema currentFlussnewSchema) { List inferredSchemaColumnNames = inferredFlussSchema.getColumnNames(); Map reverseIndex = new HashMap<>(); for (int i = 0; i < inferredSchemaColumnNames.size(); i++) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java index 9ab7a6f78d1..7ddb3842a67 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java @@ -26,14 +26,14 @@ import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.table.api.ValidationException; -import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.client.admin.Admin; -import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.metadata.DatabaseDescriptor; -import com.alibaba.fluss.metadata.TableDescriptor; -import com.alibaba.fluss.metadata.TableInfo; -import com.alibaba.fluss.metadata.TablePath; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java index 75df49fa6d5..a31d07bd404 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java @@ -17,7 +17,7 @@ package org.apache.flink.cdc.connectors.fluss.sink.v2; -import com.alibaba.fluss.metadata.TablePath; +import org.apache.fluss.metadata.TablePath; import java.util.List; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java index dadf0d2ee14..c90cdc91369 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java @@ -17,7 +17,7 @@ package org.apache.flink.cdc.connectors.fluss.sink.v2; -import com.alibaba.fluss.client.Connection; +import org.apache.fluss.client.Connection; import java.io.IOException; import java.io.Serializable; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java index 0dedaf6cc52..2042dbdc440 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java @@ -17,13 +17,13 @@ package org.apache.flink.cdc.connectors.fluss.sink.v2; -import com.alibaba.fluss.row.InternalRow; +import org.apache.fluss.row.InternalRow; import javax.annotation.Nullable; import java.util.Objects; -import static com.alibaba.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkNotNull; /* This file is based on source code of Apache Fluss Project (https://fluss.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java index 4e6d789e90b..0bd74f0601e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java @@ -22,7 +22,7 @@ import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; -import com.alibaba.fluss.config.Configuration; +import org.apache.fluss.config.Configuration; import java.io.IOException; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java index dab9e6ed56e..8a3754e0e9e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java @@ -23,18 +23,18 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; -import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.client.table.Table; -import com.alibaba.fluss.client.table.writer.AppendWriter; -import com.alibaba.fluss.client.table.writer.TableWriter; -import com.alibaba.fluss.client.table.writer.UpsertWriter; -import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.metadata.TablePath; -import com.alibaba.fluss.metrics.Gauge; -import com.alibaba.fluss.metrics.Metric; -import com.alibaba.fluss.metrics.MetricNames; -import com.alibaba.fluss.row.InternalRow; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.AppendWriter; +import org.apache.fluss.client.table.writer.TableWriter; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.row.InternalRow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WarppedFlussCounter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WarppedFlussCounter.java index 8f3c1e3d6c0..2bf45c198e2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WarppedFlussCounter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WarppedFlussCounter.java @@ -26,9 +26,9 @@ /** An implementation of Flink's {@link Counter} which wraps Fluss's Counter. */ public class WarppedFlussCounter implements Counter { - private final com.alibaba.fluss.metrics.Counter flussCounter; + private final org.apache.fluss.metrics.Counter flussCounter; - public WarppedFlussCounter(com.alibaba.fluss.metrics.Counter flussCounter) { + public WarppedFlussCounter(org.apache.fluss.metrics.Counter flussCounter) { this.flussCounter = flussCounter; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java index 1b5df21d05b..45d7adaaf74 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java @@ -26,9 +26,9 @@ /** An implementation of Flink's {@link Gauge} which wraps Fluss's Gauge. */ public class WrappedFlussGauge implements Gauge { - private final com.alibaba.fluss.metrics.Gauge flussGauge; + private final org.apache.fluss.metrics.Gauge flussGauge; - public WrappedFlussGauge(com.alibaba.fluss.metrics.Gauge flussGauge) { + public WrappedFlussGauge(org.apache.fluss.metrics.Gauge flussGauge) { this.flussGauge = flussGauge; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java index a6ce22838ac..7d768bee76a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java @@ -27,9 +27,9 @@ /** An implementation of Flink's {@link Histogram} which wraps Fluss's Histogram. */ public class WrapperFlussHistogram implements Histogram { - private final com.alibaba.fluss.metrics.Histogram flussHistogram; + private final org.apache.fluss.metrics.Histogram flussHistogram; - public WrapperFlussHistogram(com.alibaba.fluss.metrics.Histogram flussHistogram) { + public WrapperFlussHistogram(org.apache.fluss.metrics.Histogram flussHistogram) { this.flussHistogram = flussHistogram; } @@ -53,10 +53,10 @@ public HistogramStatistics getStatistics() { private static class FlinkHistogramStatistics extends HistogramStatistics { - private final com.alibaba.fluss.metrics.HistogramStatistics flussHistogramStatistics; + private final org.apache.fluss.metrics.HistogramStatistics flussHistogramStatistics; public FlinkHistogramStatistics( - com.alibaba.fluss.metrics.HistogramStatistics flussHistogramStatistics) { + org.apache.fluss.metrics.HistogramStatistics flussHistogramStatistics) { this.flussHistogramStatistics = flussHistogramStatistics; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java index 40bac46a80a..0b415b20241 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java @@ -26,9 +26,9 @@ /** An implementation of Flink's {@link Meter} which wraps Fluss's Meter. */ public class WrapperFlussMeter implements Meter { - private final com.alibaba.fluss.metrics.Meter flussMeter; + private final org.apache.fluss.metrics.Meter flussMeter; - public WrapperFlussMeter(com.alibaba.fluss.metrics.Meter flussMeter) { + public WrapperFlussMeter(org.apache.fluss.metrics.Meter flussMeter) { this.flussMeter = flussMeter; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java index 6b33202ef15..63908623d2c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java @@ -19,14 +19,14 @@ import org.apache.flink.metrics.MetricGroup; -import com.alibaba.fluss.metrics.CharacterFilter; -import com.alibaba.fluss.metrics.Counter; -import com.alibaba.fluss.metrics.Gauge; -import com.alibaba.fluss.metrics.Histogram; -import com.alibaba.fluss.metrics.Meter; -import com.alibaba.fluss.metrics.Metric; -import com.alibaba.fluss.metrics.groups.AbstractMetricGroup; -import com.alibaba.fluss.metrics.registry.MetricRegistry; +import org.apache.fluss.metrics.CharacterFilter; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.groups.AbstractMetricGroup; +import org.apache.fluss.metrics.registry.MetricRegistry; import java.util.Collections; import java.util.HashMap; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java index eb311dd0e7d..8c999d24b44 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java @@ -39,9 +39,9 @@ import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.util.CollectionUtil; -import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.metadata.Schema; -import com.alibaba.fluss.metadata.TableDescriptor; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; import javax.annotation.Nullable; @@ -85,7 +85,7 @@ public static TableDescriptor toFlussTable( .build(); } - public static com.alibaba.fluss.metadata.Schema toFlussSchema( + public static org.apache.fluss.metadata.Schema toFlussSchema( org.apache.flink.cdc.common.schema.Schema cdcSchema) { Schema.Builder schemBuilder = Schema.newBuilder(); if (!CollectionUtil.isNullOrEmpty(cdcSchema.primaryKeys())) { @@ -108,7 +108,7 @@ public static com.alibaba.fluss.metadata.Schema toFlussSchema( } @VisibleForTesting - private static com.alibaba.fluss.types.DataType toFlussType( + private static org.apache.fluss.types.DataType toFlussType( org.apache.flink.cdc.common.types.DataType flinkDataType) { return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE); } @@ -137,117 +137,116 @@ public static Boolean sameCdcColumnsIgnoreCommentAndDefaultValue( private static class CdcTypeToFlussType implements org.apache.flink.cdc.common.types.DataTypeVisitor< - com.alibaba.fluss.types.DataType> { + org.apache.fluss.types.DataType> { @Override - public com.alibaba.fluss.types.DataType visit(CharType charType) { - return new com.alibaba.fluss.types.CharType( - charType.isNullable(), charType.getLength()); + public org.apache.fluss.types.DataType visit(CharType charType) { + return new org.apache.fluss.types.CharType(charType.isNullable(), charType.getLength()); } @Override - public com.alibaba.fluss.types.DataType visit(VarCharType varCharType) { + public org.apache.fluss.types.DataType visit(VarCharType varCharType) { // fluss not support varchar type - return new com.alibaba.fluss.types.StringType(varCharType.isNullable()); + return new org.apache.fluss.types.StringType(varCharType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(BooleanType booleanType) { - return new com.alibaba.fluss.types.BooleanType(booleanType.isNullable()); + public org.apache.fluss.types.DataType visit(BooleanType booleanType) { + return new org.apache.fluss.types.BooleanType(booleanType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(BinaryType binaryType) { - return new com.alibaba.fluss.types.BinaryType( + public org.apache.fluss.types.DataType visit(BinaryType binaryType) { + return new org.apache.fluss.types.BinaryType( binaryType.isNullable(), binaryType.getLength()); } @Override - public com.alibaba.fluss.types.DataType visit(VarBinaryType varBinaryType) { + public org.apache.fluss.types.DataType visit(VarBinaryType varBinaryType) { // fluss not support varbinary type - return new com.alibaba.fluss.types.BytesType(varBinaryType.isNullable()); + return new org.apache.fluss.types.BytesType(varBinaryType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(DecimalType decimalType) { - return new com.alibaba.fluss.types.DecimalType( + public org.apache.fluss.types.DataType visit(DecimalType decimalType) { + return new org.apache.fluss.types.DecimalType( decimalType.isNullable(), decimalType.getPrecision(), decimalType.getScale()); } @Override - public com.alibaba.fluss.types.DataType visit(TinyIntType tinyIntType) { - return new com.alibaba.fluss.types.TinyIntType(tinyIntType.isNullable()); + public org.apache.fluss.types.DataType visit(TinyIntType tinyIntType) { + return new org.apache.fluss.types.TinyIntType(tinyIntType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(SmallIntType smallIntType) { - return new com.alibaba.fluss.types.SmallIntType(smallIntType.isNullable()); + public org.apache.fluss.types.DataType visit(SmallIntType smallIntType) { + return new org.apache.fluss.types.SmallIntType(smallIntType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(IntType intType) { - return new com.alibaba.fluss.types.IntType(intType.isNullable()); + public org.apache.fluss.types.DataType visit(IntType intType) { + return new org.apache.fluss.types.IntType(intType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(BigIntType bigIntType) { - return new com.alibaba.fluss.types.BigIntType(bigIntType.isNullable()); + public org.apache.fluss.types.DataType visit(BigIntType bigIntType) { + return new org.apache.fluss.types.BigIntType(bigIntType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(FloatType floatType) { - return new com.alibaba.fluss.types.FloatType(floatType.isNullable()); + public org.apache.fluss.types.DataType visit(FloatType floatType) { + return new org.apache.fluss.types.FloatType(floatType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(DoubleType doubleType) { - return new com.alibaba.fluss.types.DoubleType(doubleType.isNullable()); + public org.apache.fluss.types.DataType visit(DoubleType doubleType) { + return new org.apache.fluss.types.DoubleType(doubleType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(DateType dateType) { - return new com.alibaba.fluss.types.DateType(dateType.isNullable()); + public org.apache.fluss.types.DataType visit(DateType dateType) { + return new org.apache.fluss.types.DateType(dateType.isNullable()); } @Override - public com.alibaba.fluss.types.DataType visit(TimeType timeType) { - return new com.alibaba.fluss.types.TimeType( + public org.apache.fluss.types.DataType visit(TimeType timeType) { + return new org.apache.fluss.types.TimeType( timeType.isNullable(), timeType.getPrecision()); } @Override - public com.alibaba.fluss.types.DataType visit(TimestampType timestampType) { - return new com.alibaba.fluss.types.TimestampType( + public org.apache.fluss.types.DataType visit(TimestampType timestampType) { + return new org.apache.fluss.types.TimestampType( timestampType.isNullable(), timestampType.getPrecision()); } @Override - public com.alibaba.fluss.types.DataType visit(ZonedTimestampType zonedTimestampType) { + public org.apache.fluss.types.DataType visit(ZonedTimestampType zonedTimestampType) { throw new UnsupportedOperationException( "Unsupported data type in fluss " + zonedTimestampType); } @Override - public com.alibaba.fluss.types.DataType visit( + public org.apache.fluss.types.DataType visit( LocalZonedTimestampType localZonedTimestampType) { - return new com.alibaba.fluss.types.LocalZonedTimestampType( + return new org.apache.fluss.types.LocalZonedTimestampType( localZonedTimestampType.isNullable(), localZonedTimestampType.getPrecision()); } @Override - public com.alibaba.fluss.types.DataType visit(ArrayType arrayType) { + public org.apache.fluss.types.DataType visit(ArrayType arrayType) { throw new UnsupportedOperationException( "Unsupported data type in fluss version under 0.7: " + arrayType); } @Override - public com.alibaba.fluss.types.DataType visit(MapType mapType) { + public org.apache.fluss.types.DataType visit(MapType mapType) { throw new UnsupportedOperationException( "Unsupported data type in fluss version under 0.7: " + mapType); } @Override - public com.alibaba.fluss.types.DataType visit(RowType rowType) { + public org.apache.fluss.types.DataType visit(RowType rowType) { throw new UnsupportedOperationException( "Unsupported data type in fluss version under 0.7: " + rowType); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java index a625f0fd9b9..68f6c902e50 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java @@ -45,10 +45,10 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.config.MemorySize; -import com.alibaba.fluss.metadata.DataLakeFormat; -import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.server.testutils.FlussClusterExtension; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -62,12 +62,12 @@ import java.util.List; import java.util.Map; -import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; -import static com.alibaba.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; -import static com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2; import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; +import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for Fluss Pipeline. */ @@ -542,8 +542,8 @@ private void checkResult(TableId tableId, List expectedRows) { assertResultsIgnoreOrder(rowIter, expectedRows, true); } - private static com.alibaba.fluss.config.Configuration initConfig() { - com.alibaba.fluss.config.Configuration conf = new com.alibaba.fluss.config.Configuration(); + private static org.apache.fluss.config.Configuration initConfig() { + org.apache.fluss.config.Configuration conf = new org.apache.fluss.config.Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); // set a shorter interval for testing purpose conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)); @@ -560,7 +560,7 @@ private static com.alibaba.fluss.config.Configuration initConfig() { conf.setString("security.sasl.enabled.mechanisms", "plain"); conf.setString( "security.sasl.plain.jaas.config", - "com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required " + "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required " + " user_root=\"password\" " + " user_guest=\"password2\";"); return conf; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java index 7097a72ee45..bc0e4a1ca7e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java @@ -41,10 +41,10 @@ import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; -import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.metadata.TablePath; -import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.testutils.FlussClusterExtension; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java index 69e26bd5978..c7e4f09ffc9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java @@ -24,15 +24,15 @@ import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.IntType; -import com.alibaba.fluss.client.Connection; -import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.client.admin.Admin; -import com.alibaba.fluss.exception.InvalidConfigException; -import com.alibaba.fluss.metadata.TableDescriptor; -import com.alibaba.fluss.metadata.TableInfo; -import com.alibaba.fluss.metadata.TablePath; -import com.alibaba.fluss.server.testutils.FlussClusterExtension; -import com.alibaba.fluss.types.RowType; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.RowType; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -47,7 +47,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; -import static com.alibaba.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR; +import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -129,28 +129,28 @@ void testCreateTableAllTypes(boolean primaryKeyTable) throws Exception { DataTypes.TIMESTAMP_LTZ(6) }; - com.alibaba.fluss.types.DataType[] flussDataTypes = - new com.alibaba.fluss.types.DataType[] { - com.alibaba.fluss.types.DataTypes.BINARY(10), + org.apache.fluss.types.DataType[] flussDataTypes = + new org.apache.fluss.types.DataType[] { + org.apache.fluss.types.DataTypes.BINARY(10), // fluss not support binary, will be mapped to bytes - com.alibaba.fluss.types.DataTypes.BYTES(), - com.alibaba.fluss.types.DataTypes.BYTES(), - com.alibaba.fluss.types.DataTypes.BOOLEAN(), - com.alibaba.fluss.types.DataTypes.TINYINT(), - com.alibaba.fluss.types.DataTypes.SMALLINT(), - new com.alibaba.fluss.types.IntType(false), - com.alibaba.fluss.types.DataTypes.BIGINT(), - com.alibaba.fluss.types.DataTypes.FLOAT(), - com.alibaba.fluss.types.DataTypes.DOUBLE(), - com.alibaba.fluss.types.DataTypes.DECIMAL(38, 18), - com.alibaba.fluss.types.DataTypes.CHAR(10), + org.apache.fluss.types.DataTypes.BYTES(), + org.apache.fluss.types.DataTypes.BYTES(), + org.apache.fluss.types.DataTypes.BOOLEAN(), + org.apache.fluss.types.DataTypes.TINYINT(), + org.apache.fluss.types.DataTypes.SMALLINT(), + new org.apache.fluss.types.IntType(false), + org.apache.fluss.types.DataTypes.BIGINT(), + org.apache.fluss.types.DataTypes.FLOAT(), + org.apache.fluss.types.DataTypes.DOUBLE(), + org.apache.fluss.types.DataTypes.DECIMAL(38, 18), + org.apache.fluss.types.DataTypes.CHAR(10), // fluss not support varchar, will be mapped to string - com.alibaba.fluss.types.DataTypes.STRING(), - com.alibaba.fluss.types.DataTypes.STRING(), - com.alibaba.fluss.types.DataTypes.DATE(), - com.alibaba.fluss.types.DataTypes.TIME(), - com.alibaba.fluss.types.DataTypes.TIMESTAMP(3), - com.alibaba.fluss.types.DataTypes.TIMESTAMP_LTZ(6) + org.apache.fluss.types.DataTypes.STRING(), + org.apache.fluss.types.DataTypes.STRING(), + org.apache.fluss.types.DataTypes.DATE(), + org.apache.fluss.types.DataTypes.TIME(), + org.apache.fluss.types.DataTypes.TIMESTAMP(3), + org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6) }; try (FlussMetaDataApplier applier = @@ -227,10 +227,10 @@ void testDropTableEvent() throws Exception { tablePath, TableDescriptor.builder() .schema( - com.alibaba.fluss.metadata.Schema.newBuilder() + org.apache.fluss.metadata.Schema.newBuilder() .column( "id", - com.alibaba.fluss.types.DataTypes.INT()) + org.apache.fluss.types.DataTypes.INT()) .build()) .build(), true) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java index 535d90244ca..93e8e52d0ea 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java @@ -40,7 +40,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; -import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.server.testutils.FlussClusterExtension; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,9 +56,9 @@ import java.util.Collections; import java.util.List; -import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; -import static com.alibaba.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; -import static com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration tests for FlussSink. */ diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 0e905e5af78..51d98b7b6b1 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -42,7 +42,7 @@ limitations under the License. 1.6.8 1.6.1 2.3.9 - 0.7.0 + 0.8.0-incubating @@ -613,7 +613,7 @@ limitations under the License. - com.alibaba.fluss + org.apache.fluss fluss-flink-${flink.major.version} ${fluss.version} fluss-sql-connector.jar diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java index c20808fa3c8..d2416a4c907 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java @@ -64,7 +64,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment { "remote.data.dir: /tmp/fluss/remote-data", "security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT", "security.sasl.enabled.mechanisms: PLAIN", - "security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";", + "security.sasl.plain.jaas.config: org.apache.fluss.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";", "super.users: User:admin"); private static final List flussTabletServerProperties = @@ -78,7 +78,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment { "remote.data.dir: /tmp/fluss/remote-data", "security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT", "security.sasl.enabled.mechanisms: PLAIN", - "security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";", + "security.sasl.plain.jaas.config: org.apache.fluss.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";", "super.users: User:admin"); @Container