diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala new file mode 100644 index 000000000000..9082e4f0e1bf --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +class RescaleProcedureTest extends RescaleProcedureTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala new file mode 100644 index 000000000000..9082e4f0e1bf --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +class RescaleProcedureTest extends RescaleProcedureTestBase {} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala new file mode 100644 index 000000000000..9082e4f0e1bf --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +class RescaleProcedureTest extends RescaleProcedureTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 45341e9b9402..a6b08f2de4c3 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -45,6 +45,7 @@ import org.apache.paimon.spark.procedure.RenameTagProcedure; import org.apache.paimon.spark.procedure.RepairProcedure; import org.apache.paimon.spark.procedure.ReplaceTagProcedure; +import org.apache.paimon.spark.procedure.RescaleProcedure; import org.apache.paimon.spark.procedure.ResetConsumerProcedure; import org.apache.paimon.spark.procedure.RollbackProcedure; import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure; @@ -91,6 +92,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("create_branch", CreateBranchProcedure::builder); procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder); procedureBuilders.put("compact", CompactProcedure::builder); + procedureBuilders.put("rescale", RescaleProcedure::builder); procedureBuilders.put("migrate_database", MigrateDatabaseProcedure::builder); procedureBuilders.put("migrate_table", MigrateTableProcedure::builder); procedureBuilders.put("remove_orphan_files", RemoveOrphanFilesProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java new file mode 100644 index 000000000000..3cfefd03d7f3 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.spark.commands.PaimonSparkWriter; +import org.apache.paimon.spark.util.ScanPlanHelper$; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.PaimonUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Rescale procedure. Usage: + * + *

+ *  CALL sys.rescale(table => 'databaseName.tableName', [bucket_num => 16], [partition => 'dt=20250217,hh=08'], [scan_parallelism => 8], [sink_parallelism => 16])
+ * 
+ */ +public class RescaleProcedure extends BaseProcedure { + + private static final Logger LOG = LoggerFactory.getLogger(RescaleProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.optional("bucket_num", IntegerType), + ProcedureParameter.optional("partition", StringType), + ProcedureParameter.optional("scan_parallelism", IntegerType), + ProcedureParameter.optional("sink_parallelism", IntegerType), + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected RescaleProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + Integer bucketNum = args.isNullAt(1) ? null : args.getInt(1); + String partition = blank(args, 2) ? null : args.getString(2); + Integer scanParallelism = args.isNullAt(3) ? null : args.getInt(3); + Integer sinkParallelism = args.isNullAt(4) ? null : args.getInt(4); + + return modifyPaimonTable( + tableIdent, + table -> { + checkArgument(table instanceof FileStoreTable); + FileStoreTable fileStoreTable = (FileStoreTable) table; + + Optional optionalSnapshot = fileStoreTable.latestSnapshot(); + if (!optionalSnapshot.isPresent()) { + throw new IllegalArgumentException( + "Table " + + table.fullName() + + " has no snapshot. No need to rescale."); + } + Snapshot snapshot = optionalSnapshot.get(); + + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put( + CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), + String.valueOf(snapshot.id())); + fileStoreTable = fileStoreTable.copy(dynamicOptions); + + Map partitionMap = new HashMap<>(); + if (partition != null) { + List> partitions = + ParameterUtils.getPartitions(partition.split(";")); + checkArgument( + partitions.size() == 1, + "Rescale only supports one partition at a time, but got %d partitions", + partitions.size()); + partitionMap = partitions.get(0); + } + + PartitionPredicate partitionPredicate = + PartitionPredicate.fromMap( + fileStoreTable.schema().logicalPartitionType(), + partitionMap, + fileStoreTable.coreOptions().partitionDefaultName()); + + int finalBucketNum; + if (bucketNum == null) { + checkArgument( + fileStoreTable.coreOptions().bucket() != BucketMode.POSTPONE_BUCKET, + "When rescaling postpone bucket tables, you must provide the resulting bucket number."); + finalBucketNum = + currentBucketNum(fileStoreTable, snapshot, partitionPredicate); + } else { + finalBucketNum = bucketNum; + } + + int finalScanParallelism = + scanParallelism == null ? finalBucketNum : scanParallelism; + + execute( + fileStoreTable, + finalBucketNum, + partitionPredicate, + partitionMap, + finalScanParallelism, + sinkParallelism, + tableIdent); + + InternalRow internalRow = newInternalRow(true); + return new InternalRow[] {internalRow}; + }); + } + + private void execute( + FileStoreTable table, + int bucketNum, + PartitionPredicate partitionPredicate, + Map partitionMap, + int scanParallelism, + @Nullable Integer sinkParallelism, + Identifier tableIdent) { + DataSourceV2Relation relation = createRelation(tableIdent); + + SnapshotReader snapshotReader = table.newSnapshotReader(); + snapshotReader.withPartitionFilter(partitionPredicate); + List dataSplits = snapshotReader.read().dataSplits(); + + if (dataSplits.isEmpty()) { + LOG.info("No data splits found for the specified partition. No need to rescale."); + return; + } + + Dataset datasetForRead = + PaimonUtils.createDataset( + spark(), + ScanPlanHelper$.MODULE$.createNewScanPlan( + dataSplits.toArray(new DataSplit[0]), relation)); + + Dataset datasetForWrite = datasetForRead.repartition(scanParallelism); + + Map bucketOptions = new HashMap<>(table.options()); + bucketOptions.put(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + FileStoreTable rescaledTable = table.copy(table.schema().copy(bucketOptions)); + + int finalSinkParallelism = sinkParallelism == null ? bucketNum : sinkParallelism; + + PaimonSparkWriter writer = PaimonSparkWriter.apply(rescaledTable); + if (partitionMap.isEmpty()) { + writer.writeBuilder().withOverwrite(); + } else { + writer.writeBuilder().withOverwrite(partitionMap); + } + datasetForWrite = datasetForWrite.repartition(finalSinkParallelism); + writer.commit(writer.write(datasetForWrite)); + } + + private int currentBucketNum( + FileStoreTable table, Snapshot snapshot, PartitionPredicate partitionPredicate) { + Iterator it = + table.newSnapshotReader() + .withSnapshot(snapshot) + .withPartitionFilter(partitionPredicate) + .onlyReadRealBuckets() + .readFileIterator(); + checkArgument( + it.hasNext(), + "The specified partition does not have any data files. No need to rescale."); + return it.next().totalBuckets(); + } + + private boolean blank(InternalRow args, int index) { + return args.isNullAt(index) + || (args.getString(index) == null || args.getString(index).trim().isEmpty()); + } + + @Override + public String description() { + return "This procedure rescales one partition of a table by changing the bucket number."; + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public RescaleProcedure doBuild() { + return new RescaleProcedure(tableCatalog()); + } + }; + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTestBase.scala new file mode 100644 index 000000000000..cc10cd5dea38 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTestBase.scala @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.Snapshot.CommitKind +import org.apache.paimon.partition.PartitionPredicate +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.source.DataSplit + +import org.apache.spark.sql.Row +import org.assertj.core.api.Assertions + +import java.util.{Collections, Map => JMap} + +import scala.collection.JavaConverters._ + +/** Test rescale procedure. See [[RescaleProcedure]]. */ +abstract class RescaleProcedureTestBase extends PaimonSparkTestBase { + + import testImplicits._ + + // ----------------------- Basic Rescale ----------------------- + + test("Paimon Procedure: rescale non-partitioned table") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='2') + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')") + + val initialBuckets = getBucketCount(table) + Assertions.assertThat(initialBuckets).isEqualTo(2) + + val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect() + val initialSnapshotId = lastSnapshotId(table) + + checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4)"), Row(true) :: Nil) + + val reloadedTable = loadTable("T") + + Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE) + Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId) + + val newBuckets = getBucketCount(reloadedTable) + Assertions.assertThat(newBuckets).isEqualTo(4) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData).containsExactlyElementsOf(initialData) + + val dataSplits = reloadedTable.newSnapshotReader.read.dataSplits.asScala.toList + dataSplits.foreach( + split => { + Assertions.assertThat(split.bucket()).isLessThan(4) + }) + } + } + + test("Paimon Procedure: rescale partitioned table") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2') + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable("T") + + spark.sql( + s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 'c', 'p2'), (4, 'd', 'p2')") + + val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect() + val initialSnapshotId = lastSnapshotId(table) + + checkAnswer( + spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partition => 'pt=\"p1\"')"), + Row(true) :: Nil) + + val reloadedTable = loadTable("T") + Assertions.assertThat(lastSnapshotCommand(reloadedTable)).isEqualTo(CommitKind.OVERWRITE) + Assertions.assertThat(lastSnapshotId(reloadedTable)).isGreaterThan(initialSnapshotId) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData).containsExactlyElementsOf(initialData) + + val p1PartitionPredicate = PartitionPredicate.fromMap( + reloadedTable.schema().logicalPartitionType(), + Collections.singletonMap("pt", "p1"), + reloadedTable.coreOptions().partitionDefaultName()) + val p1Splits = reloadedTable.newSnapshotReader + .withPartitionFilter(p1PartitionPredicate) + .read + .dataSplits + .asScala + .toList + p1Splits.foreach( + split => { + Assertions.assertThat(split.bucket()).isLessThan(4) + }) + + val p2PartitionPredicate = PartitionPredicate.fromMap( + reloadedTable.schema().logicalPartitionType(), + Collections.singletonMap("pt", "p2"), + reloadedTable.coreOptions().partitionDefaultName()) + val p2Splits = reloadedTable.newSnapshotReader + .withPartitionFilter(p2PartitionPredicate) + .read + .dataSplits + .asScala + .toList + p2Splits.foreach( + split => { + Assertions.assertThat(split.bucket()).isLessThan(2) + }) + } + } + + test("Paimon Procedure: rescale with scan_parallelism and sink_parallelism") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='2') + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')") + + val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect() + + checkAnswer( + spark.sql( + "CALL sys.rescale(table => 'T', bucket_num => 4, scan_parallelism => 3, sink_parallelism => 5)"), + Row(true) :: Nil) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData).containsExactlyElementsOf(initialData) + + val reloadedTable = loadTable("T") + + val newBuckets = getBucketCount(reloadedTable) + Assertions.assertThat(newBuckets).isEqualTo(4) + } + } + + test("Paimon Procedure: rescale without bucket_num (use current bucket)") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='3') + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')") + + val initialBuckets = getBucketCount(table) + Assertions.assertThat(initialBuckets).isEqualTo(3) + + val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect() + + checkAnswer(spark.sql("CALL sys.rescale(table => 'T')"), Row(true) :: Nil) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData).containsExactlyElementsOf(initialData) + + val reloadedTable = loadTable("T") + + val newBuckets = getBucketCount(reloadedTable) + Assertions.assertThat(newBuckets).isEqualTo(3) + } + } + + test("Paimon Procedure: rescale partition with multiple partitions") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT) + |TBLPROPERTIES ('primary-key'='id, dt, hh', 'bucket'='2') + |PARTITIONED BY (dt, hh) + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a', '2024-01-01', 0), (2, 'b', '2024-01-01', 0)") + spark.sql(s"INSERT INTO T VALUES (3, 'c', '2024-01-01', 1), (4, 'd', '2024-01-01', 1)") + spark.sql(s"INSERT INTO T VALUES (5, 'e', '2024-01-02', 0), (6, 'f', '2024-01-02', 0)") + + val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect() + val initialSnapshotId = lastSnapshotId(table) + + checkAnswer( + spark.sql( + "CALL sys.rescale(table => 'T', bucket_num => 4, partition => 'dt=\"2024-01-01\",hh=0')"), + Row(true) :: Nil) + + Assertions.assertThat(lastSnapshotCommand(table)).isEqualTo(CommitKind.OVERWRITE) + Assertions.assertThat(lastSnapshotId(table)).isGreaterThan(initialSnapshotId) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData).containsExactlyElementsOf(initialData) + + val reloadedTable = loadTable("T") + + val targetPartitionMap: JMap[String, String] = new java.util.HashMap[String, String]() + targetPartitionMap.put("dt", "2024-01-01") + targetPartitionMap.put("hh", "0") + val targetPartitionPredicate = PartitionPredicate.fromMap( + reloadedTable.schema().logicalPartitionType(), + targetPartitionMap, + reloadedTable.coreOptions().partitionDefaultName()) + val targetPartitionSplits = reloadedTable.newSnapshotReader + .withPartitionFilter(targetPartitionPredicate) + .read + .dataSplits + .asScala + .toList + targetPartitionSplits.foreach( + split => { + Assertions.assertThat(split.bucket()).isLessThan(4) + }) + } + } + + test("Paimon Procedure: rescale table with no snapshot") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='2') + |""".stripMargin) + + val table = loadTable("T") + + assert(intercept[IllegalArgumentException] { + spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4)") + }.getMessage.contains("has no snapshot")) + } + } + + test("Paimon Procedure: rescale postpone bucket table requires bucket_num") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='-1') + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + assert( + intercept[IllegalArgumentException] { + spark.sql("CALL sys.rescale(table => 'T')") + }.getMessage.contains( + "When rescaling postpone bucket tables, you must provide the resulting bucket number")) + + checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4)"), Row(true) :: Nil) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData.length).isEqualTo(3) + } + } + + test("Paimon Procedure: rescale empty partition") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2') + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1')") + + val initialSnapshotId = lastSnapshotId(table) + + checkAnswer( + spark.sql("CALL sys.rescale(table => 'T', bucket_num => 4, partition => 'pt=\"p2\"')"), + Row(true) :: Nil) + + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(initialSnapshotId) + } + } + + test("Paimon Procedure: rescale reduces bucket count") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='4') + |""".stripMargin) + + val table = loadTable("T") + + spark.sql( + s"INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h')") + + val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect() + val initialBuckets = getBucketCount(table) + Assertions.assertThat(initialBuckets).isEqualTo(4) + + checkAnswer(spark.sql("CALL sys.rescale(table => 'T', bucket_num => 2)"), Row(true) :: Nil) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData).containsExactlyElementsOf(initialData) + + val reloadedTable = loadTable("T") + + val newBuckets = getBucketCount(reloadedTable) + Assertions.assertThat(newBuckets).isEqualTo(2) + + val dataSplits = reloadedTable.newSnapshotReader.read.dataSplits.asScala.toList + dataSplits.foreach( + split => { + Assertions.assertThat(split.bucket()).isLessThan(2) + }) + } + } + + test("Paimon Procedure: rescale aware bucket table") { + Seq(1, 2, 3).foreach( + initialBucket => { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='$initialBucket') + |""".stripMargin) + + val table = loadTable("T") + + for (i <- 1 to 10) { + spark.sql(s"INSERT INTO T VALUES ($i, 'value$i')") + } + + val initialData = spark.sql("SELECT * FROM T ORDER BY id").collect() + val initialBuckets = getBucketCount(table) + Assertions.assertThat(initialBuckets).isEqualTo(initialBucket) + + val targetBucket = if (initialBucket == 1) 3 else 1 + checkAnswer( + spark.sql(s"CALL sys.rescale(table => 'T', bucket_num => $targetBucket)"), + Row(true) :: Nil) + + val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() + Assertions.assertThat(afterData).containsExactlyElementsOf(initialData) + + val reloadedTable = loadTable("T") + + val newBuckets = getBucketCount(reloadedTable) + Assertions.assertThat(newBuckets).isEqualTo(targetBucket) + } + }) + } + + // ----------------------- Helper Methods ----------------------- + + def getBucketCount(table: FileStoreTable): Int = { + val bucketOption = table.coreOptions().bucket() + if (bucketOption == -1) { + val dataSplits = table.newSnapshotReader.read.dataSplits.asScala.toList + if (dataSplits.isEmpty) { + -1 + } else { + dataSplits.map(_.bucket()).max + 1 + } + } else { + bucketOption + } + } + + def lastSnapshotCommand(table: FileStoreTable): CommitKind = { + table.snapshotManager().latestSnapshot().commitKind() + } + + def lastSnapshotId(table: FileStoreTable): Long = { + table.snapshotManager().latestSnapshotId() + } +}