From ebb28b64d026a04b760d4d888cbf41966c8aee44 Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Mon, 15 Sep 2025 22:34:21 -0700 Subject: [PATCH 1/2] Add test --- .../catalog/e2e/SparkRewriteBucketsTest.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java diff --git a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java new file mode 100644 index 000000000..73b0e41ff --- /dev/null +++ b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java @@ -0,0 +1,97 @@ +package com.linkedin.openhouse.catalog.e2e; + +import static org.assertj.core.api.Assertions.*; + +import com.linkedin.openhouse.jobs.spark.Operations; +import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class SparkRewriteBucketsTest extends OpenHouseSparkITest { + static final String tableName = "db.test_data_compaction_filter"; + private Operations ops; + + @BeforeEach + public void setUp() throws Exception { + ops = Operations.withCatalog(getSparkSession(), null); + } + + @AfterEach + public void cleanUp() throws Exception { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testBucketPartitionsCanBeFilteredInCompaction() throws NoSuchTableException { + SparkSession spark = ops.spark(); + sql( + "CREATE TABLE openhouse.%s (id int, key string) PARTITIONED BY (bucket(2, key))", + tableName); + sql( + "INSERT INTO openhouse.%s VALUES (0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e')", + tableName); + sql( + "INSERT INTO openhouse.%s VALUES (5, 'a'), (6, 'b'), (7, 'c'), (8, 'd'), (9, 'e')", + tableName); + + Table table = ops.getTable(tableName); + + RewriteDataFiles.Result result = + SparkActions.get(spark) + .rewriteDataFiles(table) + .filter(Expressions.in(Expressions.bucket("key", 2), 0)) + .binPack() + .option("min-input-files", "2") + .execute(); + + // rewrite bucket 0 + assertThat(result.rewrittenDataFilesCount()).isEqualTo(2); + assertThat(result.addedDataFilesCount()).isEqualTo(1); + + table.refresh(); + + result = + SparkActions.get(spark) + .rewriteDataFiles(table) + .filter(Expressions.in(Expressions.bucket("key", 2), 1)) + .binPack() + .option("min-input-files", "2") + .execute(); + + // rewrite bucket 1 + assertThat(result.rewrittenDataFilesCount()).isEqualTo(2); + assertThat(result.addedDataFilesCount()).isEqualTo(1); + + table.refresh(); + + result = + SparkActions.get(spark) + .rewriteDataFiles(table) + .filter(Expressions.in(Expressions.bucket("key", 2), 1)) + .binPack() + .option("min-input-files", "2") + .execute(); + + // rewrite bucket 1 and check no-op + assertThat(result.rewrittenDataFilesCount()).isEqualTo(0); + assertThat(result.addedDataFilesCount()).isEqualTo(0); + } + + protected List sql(String query, Object... args) { + List rows = ops.spark().sql(String.format(query, args)).collectAsList(); + if (rows.isEmpty()) { + return Collections.emptyList(); + } + return rows; + } +} From b57dee7cfb7791ff9daf672645f3a38f9b6ce2c9 Mon Sep 17 00:00:00 2001 From: Stas Pak Date: Tue, 16 Sep 2025 09:01:30 -0700 Subject: [PATCH 2/2] Fix --- .../openhouse/catalog/e2e/SparkRewriteBucketsTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java index 73b0e41ff..a49bab4aa 100644 --- a/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java +++ b/apps/spark-3.5/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkRewriteBucketsTest.java @@ -49,7 +49,7 @@ public void testBucketPartitionsCanBeFilteredInCompaction() throws NoSuchTableEx RewriteDataFiles.Result result = SparkActions.get(spark) .rewriteDataFiles(table) - .filter(Expressions.in(Expressions.bucket("key", 2), 0)) + .filter(Expressions.in(Expressions.bucket("key", 2), Collections.singletonList(0))) .binPack() .option("min-input-files", "2") .execute(); @@ -63,7 +63,7 @@ public void testBucketPartitionsCanBeFilteredInCompaction() throws NoSuchTableEx result = SparkActions.get(spark) .rewriteDataFiles(table) - .filter(Expressions.in(Expressions.bucket("key", 2), 1)) + .filter(Expressions.in(Expressions.bucket("key", 2), Collections.singletonList(1))) .binPack() .option("min-input-files", "2") .execute(); @@ -77,7 +77,7 @@ public void testBucketPartitionsCanBeFilteredInCompaction() throws NoSuchTableEx result = SparkActions.get(spark) .rewriteDataFiles(table) - .filter(Expressions.in(Expressions.bucket("key", 2), 1)) + .filter(Expressions.in(Expressions.bucket("key", 2), Collections.singletonList(1))) .binPack() .option("min-input-files", "2") .execute();