Skip to content

Commit c69e999

Browse files
fedimserDmytro Fedoriaka
authored andcommitted
[SPARK-54117][SS] Throw better error to indicate that TWS is only supported with RocksDB state store provider
### What changes were proposed in this pull request? Change error message when user uses TransformWithState with state store provider other than RocksDB state store provider. ### Why are the changes needed? Improves user experience by making it clear that they need to use RocksDBStateStoreProvider. ### Does this PR introduce _any_ user-facing change? Yes, changes error message when user uses TransformWithState with HDFSBackedStateStoreProvider. Old error message: `[UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES] The feature is not supported: Creating multiple column families with HDFSBackedStateStoreProvider is not supported.` New error message: `[UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS] The feature is not supported: Store backend HDFSBackedStateStoreProvider is not supported by TransformWithState operator. Please use RocksDBStateStoreProvider.` ### How was this patch tested? Unit tests: TransformWithStateSuite, TransformWithStateValidationSuite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52822 from fedimser/fedimser/tws-rocksdb-error. Lead-authored-by: Dmytro Fedoriaka <[email protected]> Co-authored-by: Dmytro Fedoriaka <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent a8a4a77 commit c69e999

File tree

6 files changed

+35
-3
lines changed

6 files changed

+35
-3
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6646,6 +6646,11 @@
66466646
"State TTL with <stateStoreProvider> is not supported. Please use RocksDBStateStoreProvider."
66476647
]
66486648
},
6649+
"STORE_BACKEND_NOT_SUPPORTED_FOR_TWS" : {
6650+
"message" : [
6651+
"Store backend <stateStoreProvider> is not supported by TransformWithState operator. Please use RocksDBStateStoreProvider."
6652+
]
6653+
},
66496654
"TABLE_OPERATION" : {
66506655
"message" : [
66516656
"Table <tableName> does not support <operation>. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by \"spark.sql.catalog\"."

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ case class TransformWithStateInPySparkExec(
181181
*/
182182
override protected def doExecute(): RDD[InternalRow] = {
183183
metrics
184+
validateStateStoreProvider(isStreaming)
184185

185186
if (!hasInitialState) {
186187
if (isStreaming) {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ case class TransformWithStateExec(
374374
metrics // force lazy init at driver
375375

376376
validateTimeMode()
377+
validateStateStoreProvider(isStreaming)
377378

378379
if (hasInitialState) {
379380
val storeConf = new StateStoreConf(session.sessionState.conf)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.plans.physical.Distribution
2424
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
2525
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorCustomMetric, StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning, StateStoreWriter, WatermarkSupport}
2626
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
27-
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, TransformWithStateUserFunctionException}
27+
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, RocksDBStateStoreProvider, StateStoreErrors, TransformWithStateUserFunctionException}
28+
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
2930
import org.apache.spark.sql.types.{BinaryType, StructType}
3031
import org.apache.spark.util.NextIterator
@@ -53,6 +54,12 @@ abstract class TransformWithStateExecBase(
5354

5455
override def operatorStateMetadataVersion: Int = 2
5556

57+
// Supported state store providers for TransformWithState.
58+
// TransformWithState currently supports only RocksDBStateStoreProvider.
59+
private val SUPPORTED_STATE_STORE_PROVIDERS = Set(
60+
classOf[RocksDBStateStoreProvider].getName
61+
)
62+
5663
override def supportsSchemaEvolution: Boolean = true
5764

5865
override def left: SparkPlan = child
@@ -216,6 +223,14 @@ abstract class TransformWithStateExecBase(
216223
}
217224
}
218225

226+
/** Validates that the configured state store provider is supported by TransformWithState. */
227+
protected def validateStateStoreProvider(isStreaming: Boolean): Unit = {
228+
val providerName = conf.getConf(SQLConf.STATE_STORE_PROVIDER_CLASS)
229+
if (isStreaming && !SUPPORTED_STATE_STORE_PROVIDERS.contains(providerName)) {
230+
throw StateStoreErrors.storeBackendNotSupportedForTWS(providerName)
231+
}
232+
}
233+
219234
/**
220235
* Executes a block of code with standardized error handling for StatefulProcessor operations.
221236
* Rethrows SparkThrowables directly and wraps other exceptions in

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ object StateStoreErrors {
6565
new StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
6666
}
6767

68+
def storeBackendNotSupportedForTWS(stateStoreProvider: String):
69+
StateStoreBackendNotSupportedForTWSException = {
70+
new StateStoreBackendNotSupportedForTWSException(stateStoreProvider)
71+
}
72+
6873
def cannotUseColumnFamilyWithInvalidName(operationName: String, colFamilyName: String):
6974
StateStoreCannotUseColumnFamilyWithInvalidName = {
7075
new StateStoreCannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
@@ -330,6 +335,11 @@ class StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider:
330335
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES",
331336
messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
332337

338+
class StateStoreBackendNotSupportedForTWSException(stateStoreProvider: String)
339+
extends SparkUnsupportedOperationException(
340+
errorClass = "UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS",
341+
messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
342+
333343
class StateStoreCannotUseColumnFamilyWithInvalidName(operationName: String, colFamilyName: String)
334344
extends SparkUnsupportedOperationException(
335345
errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",

sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2570,7 +2570,7 @@ class TransformWithStateValidationSuite extends StateStoreMetricsTest {
25702570

25712571
testStream(result, OutputMode.Update())(
25722572
AddData(inputData, "a"),
2573-
ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] { t =>
2573+
ExpectFailure[StateStoreBackendNotSupportedForTWSException] { t =>
25742574
assert(t.getMessage.contains("not supported"))
25752575
}
25762576
)
@@ -2836,7 +2836,7 @@ class TransformWithStateValidationSuite extends StateStoreMetricsTest {
28362836
)
28372837
testStream(result, OutputMode.Update())(
28382838
AddData(inputData, InitInputRow("a", "add", -1.0)),
2839-
ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] {
2839+
ExpectFailure[StateStoreBackendNotSupportedForTWSException] {
28402840
(t: Throwable) => {
28412841
assert(t.getMessage.contains("not supported"))
28422842
}

0 commit comments

Comments
 (0)