diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala index 9b4e0343f94b8..a5f3e72f47b7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala @@ -249,7 +249,7 @@ object IntegratedUDFTestUtils extends SQLHelper { binaryPythonDataSource } - private lazy val pandasScalarFunc: Array[Byte] = if (shouldTestPandasUDFs) { + private lazy val pandasFunc: Array[Byte] = if (shouldTestPandasUDFs) { var binaryPandasFunc: Array[Byte] = null withTempPath { path => Process( @@ -272,29 +272,6 @@ object IntegratedUDFTestUtils extends SQLHelper { throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.") } - private lazy val pandasScalarIterFunc: Array[Byte] = if (shouldTestPandasUDFs) { - var binaryPandasFunc: Array[Byte] = null - withTempPath { path => - Process( - Seq( - pythonExec, - "-c", - "from pyspark.sql.types import StringType; " + - "from pyspark.serializers import CloudPickleSerializer; " + - s"f = open('$path', 'wb');" + - "f.write(CloudPickleSerializer().dumps((" + - "lambda it: (x.apply(lambda v: None if v is None else str(v)) for x in it), " + - "StringType())))"), - None, - "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! - binaryPandasFunc = Files.readAllBytes(path.toPath) - } - assert(binaryPandasFunc != null) - binaryPandasFunc - } else { - throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.") - } - private lazy val pandasGroupedAggFunc: Array[Byte] = if (shouldTestPandasUDFs) { var binaryPandasFunc: Array[Byte] = null withTempPath { path => @@ -1403,7 +1380,7 @@ object IntegratedUDFTestUtils extends SQLHelper { private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction( name = name, func = SimplePythonFunction( - command = pandasScalarFunc.toImmutableArraySeq, + command = pandasFunc.toImmutableArraySeq, envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], pythonIncludes = List.empty[String].asJava, pythonExec = pythonExec, @@ -1433,60 +1410,6 @@ object IntegratedUDFTestUtils extends SQLHelper { val prettyName: String = "Scalar Pandas UDF" } - /** - * A Scalar Iterator Pandas UDF that takes one column, casts into string, executes the - * Python native function, and casts back to the type of input column. - * - * Virtually equivalent to: - * - * {{{ - * from pyspark.sql.functions import pandas_udf, PandasUDFType - * - * df = spark.range(3).toDF("col") - * scalar_iter_udf = pandas_udf( - * lambda it: map(lambda x: x.apply(lambda v: str(v)), it), - * "string", - * PandasUDFType.SCALAR_ITER) - * casted_col = scalar_iter_udf(df.col.cast("string")) - * casted_col.cast(df.schema["col"].dataType) - * }}} - */ - case class TestScalarIterPandasUDF( - name: String, - returnType: Option[DataType] = None) extends TestUDF { - private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction( - name = name, - func = SimplePythonFunction( - command = pandasScalarIterFunc.toImmutableArraySeq, - envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], - pythonIncludes = List.empty[String].asJava, - pythonExec = pythonExec, - pythonVer = pythonVer, - broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, - accumulator = null), - dataType = StringType, - pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, - udfDeterministic = true) { - - override def builder(e: Seq[Expression]): Expression = { - assert(e.length == 1, "Defined UDF only has one column") - val expr = e.head - val rt = returnType.getOrElse { - assert(expr.resolved, "column should be resolved to use the same type " + - "as input. Try df(name) or df.col(name)") - expr.dataType - } - val pythonUDF = new PythonUDFWithoutId( - super.builder(Cast(expr, StringType) :: Nil).asInstanceOf[PythonUDF]) - Cast(pythonUDF, rt) - } - } - - def apply(exprs: Column*): Column = udf(exprs: _*) - - val prettyName: String = "Scalar Pandas Iterator UDF" - } - /** * A Grouped Aggregate Pandas UDF that takes one column, executes the * Python native function calculating the count of the column using pandas. @@ -1683,7 +1606,6 @@ object IntegratedUDFTestUtils extends SQLHelper { def registerTestUDF(testUDF: TestUDF, session: classic.SparkSession): Unit = testUDF match { case udf: TestPythonUDF => session.udf.registerPython(udf.name, udf.udf) case udf: TestScalarPandasUDF => session.udf.registerPython(udf.name, udf.udf) - case udf: TestScalarIterPandasUDF => session.udf.registerPython(udf.name, udf.udf) case udf: TestGroupedAggPandasUDF => session.udf.registerPython(udf.name, udf.udf) case udf: TestScalaUDF => val registry = session.sessionState.functionRegistry diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index f87785600f7cd..a57c72f5fc155 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -120,9 +120,6 @@ import org.apache.spark.util.Utils * - Scalar Pandas UDF test case with a Scalar Pandas UDF registered as the name 'udf' * iff Python executable, pyspark, pandas and pyarrow are available. * - * - Scalar Iterator Pandas UDF test case with a Scalar Iterator Pandas UDF registered - * as the name 'udf' iff Python executable, pyspark, pandas and pyarrow are available. - * * Therefore, UDF test cases should have single input and output files but executed by three * different types of UDFs. See 'udf/udf-inner-join.sql' as an example. * @@ -196,12 +193,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper s"pandas and/or pyarrow were not available in [$pythonExec].") { /* Do nothing */ } - case udfTestCase: SQLQueryTestSuite#UDFTest - if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] && !shouldTestPandasUDFs => - ignore(s"${testCase.name} is skipped because pyspark," + - s"pandas and/or pyarrow were not available in [$pythonExec].") { - /* Do nothing */ - } case udfTestCase: SQLQueryTestSuite#UDFTest if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] && !shouldTestPandasUDFs => @@ -406,10 +397,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper if udfTestCase.udf.isInstanceOf[TestScalarPandasUDF] && shouldTestPandasUDFs => s"${testCase.name}${System.lineSeparator()}" + s"Python: $pythonVer Pandas: $pandasVer PyArrow: $pyarrowVer${System.lineSeparator()}" - case udfTestCase: SQLQueryTestSuite#UDFTest - if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] && shouldTestPandasUDFs => - s"${testCase.name}${System.lineSeparator()}" + - s"Python: $pythonVer Pandas: $pandasVer PyArrow: $pyarrowVer${System.lineSeparator()}" case udfTestCase: SQLQueryTestSuite#UDFTest if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] && shouldTestPandasUDFs => @@ -459,14 +446,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper // Create test cases of test types that depend on the input filename. val newTestCases: Seq[TestCase] = if (file.getAbsolutePath.startsWith( s"$inputFilePath${File.separator}udf${File.separator}postgreSQL")) { - Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), - TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map { udf => + Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).map { udf => UDFPgSQLTestCase( s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf) } } else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udf")) { - Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), - TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map { udf => + Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).map { udf => UDFTestCase( s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 2dd17136081eb..c70f21ae144b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -278,12 +278,10 @@ class ContinuousSuite extends ContinuousSuiteBase { s"Result set ${results.toSet} are not a superset of $expected!") } - Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), - TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).foreach { udf => + Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).foreach { udf => test(s"continuous mode with various UDFs - ${udf.prettyName}") { assume( shouldTestPandasUDFs && udf.isInstanceOf[TestScalarPandasUDF] || - shouldTestPandasUDFs && udf.isInstanceOf[TestScalarIterPandasUDF] || shouldTestPythonUDFs && udf.isInstanceOf[TestPythonUDF] || udf.isInstanceOf[TestScalaUDF])