Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
binaryPythonDataSource
}

private lazy val pandasScalarFunc: Array[Byte] = if (shouldTestPandasUDFs) {
private lazy val pandasFunc: Array[Byte] = if (shouldTestPandasUDFs) {
var binaryPandasFunc: Array[Byte] = null
withTempPath { path =>
Process(
Expand All @@ -272,29 +272,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.")
}

private lazy val pandasScalarIterFunc: Array[Byte] = if (shouldTestPandasUDFs) {
var binaryPandasFunc: Array[Byte] = null
withTempPath { path =>
Process(
Seq(
pythonExec,
"-c",
"from pyspark.sql.types import StringType; " +
"from pyspark.serializers import CloudPickleSerializer; " +
s"f = open('$path', 'wb');" +
"f.write(CloudPickleSerializer().dumps((" +
"lambda it: (x.apply(lambda v: None if v is None else str(v)) for x in it), " +
"StringType())))"),
None,
"PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
binaryPandasFunc = Files.readAllBytes(path.toPath)
}
assert(binaryPandasFunc != null)
binaryPandasFunc
} else {
throw new RuntimeException(s"Python executable [$pythonExec] and/or pyspark are unavailable.")
}

private lazy val pandasGroupedAggFunc: Array[Byte] = if (shouldTestPandasUDFs) {
var binaryPandasFunc: Array[Byte] = null
withTempPath { path =>
Expand Down Expand Up @@ -1403,7 +1380,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction(
name = name,
func = SimplePythonFunction(
command = pandasScalarFunc.toImmutableArraySeq,
command = pandasFunc.toImmutableArraySeq,
envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]],
pythonIncludes = List.empty[String].asJava,
pythonExec = pythonExec,
Expand Down Expand Up @@ -1433,60 +1410,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
val prettyName: String = "Scalar Pandas UDF"
}

/**
* A Scalar Iterator Pandas UDF that takes one column, casts into string, executes the
* Python native function, and casts back to the type of input column.
*
* Virtually equivalent to:
*
* {{{
* from pyspark.sql.functions import pandas_udf, PandasUDFType
*
* df = spark.range(3).toDF("col")
* scalar_iter_udf = pandas_udf(
* lambda it: map(lambda x: x.apply(lambda v: str(v)), it),
* "string",
* PandasUDFType.SCALAR_ITER)
* casted_col = scalar_iter_udf(df.col.cast("string"))
* casted_col.cast(df.schema["col"].dataType)
* }}}
*/
case class TestScalarIterPandasUDF(
name: String,
returnType: Option[DataType] = None) extends TestUDF {
private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction(
name = name,
func = SimplePythonFunction(
command = pandasScalarIterFunc.toImmutableArraySeq,
envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]],
pythonIncludes = List.empty[String].asJava,
pythonExec = pythonExec,
pythonVer = pythonVer,
broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
accumulator = null),
dataType = StringType,
pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
udfDeterministic = true) {

override def builder(e: Seq[Expression]): Expression = {
assert(e.length == 1, "Defined UDF only has one column")
val expr = e.head
val rt = returnType.getOrElse {
assert(expr.resolved, "column should be resolved to use the same type " +
"as input. Try df(name) or df.col(name)")
expr.dataType
}
val pythonUDF = new PythonUDFWithoutId(
super.builder(Cast(expr, StringType) :: Nil).asInstanceOf[PythonUDF])
Cast(pythonUDF, rt)
}
}

def apply(exprs: Column*): Column = udf(exprs: _*)

val prettyName: String = "Scalar Pandas Iterator UDF"
}

/**
* A Grouped Aggregate Pandas UDF that takes one column, executes the
* Python native function calculating the count of the column using pandas.
Expand Down Expand Up @@ -1683,7 +1606,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
def registerTestUDF(testUDF: TestUDF, session: classic.SparkSession): Unit = testUDF match {
case udf: TestPythonUDF => session.udf.registerPython(udf.name, udf.udf)
case udf: TestScalarPandasUDF => session.udf.registerPython(udf.name, udf.udf)
case udf: TestScalarIterPandasUDF => session.udf.registerPython(udf.name, udf.udf)
case udf: TestGroupedAggPandasUDF => session.udf.registerPython(udf.name, udf.udf)
case udf: TestScalaUDF =>
val registry = session.sessionState.functionRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ import org.apache.spark.util.Utils
* - Scalar Pandas UDF test case with a Scalar Pandas UDF registered as the name 'udf'
* iff Python executable, pyspark, pandas and pyarrow are available.
*
* - Scalar Iterator Pandas UDF test case with a Scalar Iterator Pandas UDF registered
* as the name 'udf' iff Python executable, pyspark, pandas and pyarrow are available.
*
* Therefore, UDF test cases should have single input and output files but executed by three
* different types of UDFs. See 'udf/udf-inner-join.sql' as an example.
*
Expand Down Expand Up @@ -196,12 +193,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
s"pandas and/or pyarrow were not available in [$pythonExec].") {
/* Do nothing */
}
case udfTestCase: SQLQueryTestSuite#UDFTest
if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] && !shouldTestPandasUDFs =>
ignore(s"${testCase.name} is skipped because pyspark," +
s"pandas and/or pyarrow were not available in [$pythonExec].") {
/* Do nothing */
}
case udfTestCase: SQLQueryTestSuite#UDFTest
if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] &&
!shouldTestPandasUDFs =>
Expand Down Expand Up @@ -406,10 +397,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
if udfTestCase.udf.isInstanceOf[TestScalarPandasUDF] && shouldTestPandasUDFs =>
s"${testCase.name}${System.lineSeparator()}" +
s"Python: $pythonVer Pandas: $pandasVer PyArrow: $pyarrowVer${System.lineSeparator()}"
case udfTestCase: SQLQueryTestSuite#UDFTest
if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] && shouldTestPandasUDFs =>
s"${testCase.name}${System.lineSeparator()}" +
s"Python: $pythonVer Pandas: $pandasVer PyArrow: $pyarrowVer${System.lineSeparator()}"
case udfTestCase: SQLQueryTestSuite#UDFTest
if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] &&
shouldTestPandasUDFs =>
Expand Down Expand Up @@ -459,14 +446,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
// Create test cases of test types that depend on the input filename.
val newTestCases: Seq[TestCase] = if (file.getAbsolutePath.startsWith(
s"$inputFilePath${File.separator}udf${File.separator}postgreSQL")) {
Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map { udf =>
Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).map { udf =>
UDFPgSQLTestCase(
s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf)
}
} else if (file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udf")) {
Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map { udf =>
Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).map { udf =>
UDFTestCase(
s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,10 @@ class ContinuousSuite extends ContinuousSuiteBase {
s"Result set ${results.toSet} are not a superset of $expected!")
}

Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).foreach { udf =>
Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).foreach { udf =>
test(s"continuous mode with various UDFs - ${udf.prettyName}") {
assume(
shouldTestPandasUDFs && udf.isInstanceOf[TestScalarPandasUDF] ||
shouldTestPandasUDFs && udf.isInstanceOf[TestScalarIterPandasUDF] ||
shouldTestPythonUDFs && udf.isInstanceOf[TestPythonUDF] ||
udf.isInstanceOf[TestScalaUDF])

Expand Down