Skip to content

Commit f191d65

Browse files
committed
[SPARK-54199][SQL] Add DataFrame API support for new KLL quantiles sketch functions
### What changes were proposed in this pull request? This PR adds DataFrame API support for the KLL quantile sketch functions that were previously added to Spark SQL in #52800. This lets users leverage KLL sketches through both Scala and Python DataFrame APIs in addition to the existing SQL interface. **Key additions:** 1. **Scala DataFrame API** (`sql/api/src/main/scala/org/apache/spark/sql/functions.scala`): - 18 new functions covering aggregate, merge, quantile, and rank operations - Multiple overloads for each function supporting: - `Column` parameters for computed values - `String` parameters for column names - `Int` parameters for literal k values - Optional k parameters with sensible defaults - Functions for all three data type variants: bigint, float, double 2. **Python DataFrame API** (`python/pyspark/sql/functions/builtin.py`): - 18 corresponding Python functions with: - Comprehensive docstrings with usage examples - Proper type hints (`ColumnOrName`, `Optional[Union[int, Column]]`) - Support for both column objects and column name strings - Added to PySpark documentation reference 3. **Python Spark Connect Support** (`python/pyspark/sql/connect/functions/builtin.py`): - Full compatibility with Spark Connect architecture - All 18 functions properly registered ### Why are the changes needed? While the SQL API for KLL sketches was previously added, DataFrame API support is essential for full usability. Without DataFrame API support, users would be forced to use SQL expressions via `expr()` or `selectExpr()`, which is less ergonomic and type-safe. ### Does this PR introduce any user-facing change? Yes, this PR adds DataFrame API support for the 18 KLL sketch functions: **Scala DataFrame API Example:** ```scala import org.apache.spark.sql.functions._ // Create sketch with default k val df = Seq(1, 2, 3, 4, 5).toDF("value") val sketch = df.agg(kll_sketch_agg_bigint($"value")) // Create sketch with custom k value val sketch2 = df.agg(kll_sketch_agg_bigint("value", 400)) // Get median (0.5 quantile) val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch")) val median = sketchDf.select(kll_sketch_get_quantile_bigint($"sketch", lit(0.5))) // Get multiple quantiles val quantiles = sketchDf.select( kll_sketch_get_quantile_bigint($"sketch", array(lit(0.25), lit(0.5), lit(0.75))) ) // Merge sketches val merged = sketchDf.select( kll_sketch_merge_bigint($"sketch", $"sketch").alias("merged") ) // Get count of items val count = sketchDf.select(kll_sketch_get_n_bigint($"sketch")) ``` **Python DataFrame API Example:** ```python from pyspark.sql import functions as sf # Create sketch with default k df = spark.createDataFrame([1, 2, 3, 4, 5], "INT") sketch = df.agg(sf.kll_sketch_agg_bigint("value")) # Create sketch with custom k value sketch2 = df.agg(sf.kll_sketch_agg_bigint("value", 400)) # Get median (0.5 quantile) sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch")) median = sketch_df.select(sf.kll_sketch_get_quantile_bigint("sketch", sf.lit(0.5))) # Get multiple quantiles quantiles = sketch_df.select( sf.kll_sketch_get_quantile_bigint("sketch", sf.array(sf.lit(0.25), sf.lit(0.5), sf.lit(0.75))) ) # Merge sketches merged = sketch_df.select( sf.kll_sketch_merge_bigint("sketch", "sketch").alias("merged") ) # Get count of items count = sketch_df.select(sf.kll_sketch_get_n_bigint("sketch")) ``` ### How was this patch tested? 1. **Scala Unit Tests** (`DataFrameAggregateSuite`): - `kll_sketch_agg_{bigint,float,double}` with default and explicit k values - `kll_sketch_to_string` functions for all data types - `kll_sketch_get_n` functions for all data types - `kll_sketch_merge` operations - `kll_sketch_get_quantile` with single rank and array of ranks - `kll_sketch_get_rank` operations - Null value handling tests 2. **Python Unit Tests** (`test_functions.py`): - Comprehensive tests mirroring Scala tests - Tests for Column object and string column name overloads - Tests for optional k parameter - Array input tests for quantile/rank functions - Null handling validation - Type checking (bytes/bytearray for sketches, str for to_string, int/float for values) ### Was this patch authored or co-authored using generative AI tooling? Yes, IDE assistance used `claude-4.5-sonnet` with manual validation and integration. Closes #52900 from dtenedor/dataframe-api-kll-functions. Authored-by: Daniel Tenedorio <[email protected]> Signed-off-by: Daniel Tenedorio <[email protected]>
1 parent 37b1995 commit f191d65

File tree

10 files changed

+1955
-0
lines changed

10 files changed

+1955
-0
lines changed

python/docs/source/reference/pyspark.sql/functions.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ Aggregate Functions
458458
histogram_numeric
459459
hll_sketch_agg
460460
hll_union_agg
461+
kll_sketch_agg_bigint
462+
kll_sketch_agg_double
463+
kll_sketch_agg_float
461464
kurtosis
462465
last
463466
last_value
@@ -631,6 +634,21 @@ Misc Functions
631634
current_user
632635
hll_sketch_estimate
633636
hll_union
637+
kll_sketch_get_n_bigint
638+
kll_sketch_get_n_double
639+
kll_sketch_get_n_float
640+
kll_sketch_get_quantile_bigint
641+
kll_sketch_get_quantile_double
642+
kll_sketch_get_quantile_float
643+
kll_sketch_get_rank_bigint
644+
kll_sketch_get_rank_double
645+
kll_sketch_get_rank_float
646+
kll_sketch_merge_bigint
647+
kll_sketch_merge_double
648+
kll_sketch_merge_float
649+
kll_sketch_to_string_bigint
650+
kll_sketch_to_string_double
651+
kll_sketch_to_string_float
634652
input_file_block_length
635653
input_file_block_start
636654
input_file_name

python/pyspark/sql/connect/functions/builtin.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4529,6 +4529,168 @@ def theta_intersection_agg(
45294529
theta_intersection_agg.__doc__ = pysparkfuncs.theta_intersection_agg.__doc__
45304530

45314531

4532+
def kll_sketch_agg_bigint(
4533+
col: "ColumnOrName",
4534+
k: Optional[Union[int, Column]] = None,
4535+
) -> Column:
4536+
fn = "kll_sketch_agg_bigint"
4537+
if k is None:
4538+
return _invoke_function_over_columns(fn, col)
4539+
else:
4540+
return _invoke_function_over_columns(fn, col, lit(k))
4541+
4542+
4543+
kll_sketch_agg_bigint.__doc__ = pysparkfuncs.kll_sketch_agg_bigint.__doc__
4544+
4545+
4546+
def kll_sketch_agg_float(
4547+
col: "ColumnOrName",
4548+
k: Optional[Union[int, Column]] = None,
4549+
) -> Column:
4550+
fn = "kll_sketch_agg_float"
4551+
if k is None:
4552+
return _invoke_function_over_columns(fn, col)
4553+
else:
4554+
return _invoke_function_over_columns(fn, col, lit(k))
4555+
4556+
4557+
kll_sketch_agg_float.__doc__ = pysparkfuncs.kll_sketch_agg_float.__doc__
4558+
4559+
4560+
def kll_sketch_agg_double(
4561+
col: "ColumnOrName",
4562+
k: Optional[Union[int, Column]] = None,
4563+
) -> Column:
4564+
fn = "kll_sketch_agg_double"
4565+
if k is None:
4566+
return _invoke_function_over_columns(fn, col)
4567+
else:
4568+
return _invoke_function_over_columns(fn, col, lit(k))
4569+
4570+
4571+
kll_sketch_agg_double.__doc__ = pysparkfuncs.kll_sketch_agg_double.__doc__
4572+
4573+
4574+
def kll_sketch_to_string_bigint(col: "ColumnOrName") -> Column:
4575+
fn = "kll_sketch_to_string_bigint"
4576+
return _invoke_function_over_columns(fn, col)
4577+
4578+
4579+
kll_sketch_to_string_bigint.__doc__ = pysparkfuncs.kll_sketch_to_string_bigint.__doc__
4580+
4581+
4582+
def kll_sketch_to_string_float(col: "ColumnOrName") -> Column:
4583+
fn = "kll_sketch_to_string_float"
4584+
return _invoke_function_over_columns(fn, col)
4585+
4586+
4587+
kll_sketch_to_string_float.__doc__ = pysparkfuncs.kll_sketch_to_string_float.__doc__
4588+
4589+
4590+
def kll_sketch_to_string_double(col: "ColumnOrName") -> Column:
4591+
fn = "kll_sketch_to_string_double"
4592+
return _invoke_function_over_columns(fn, col)
4593+
4594+
4595+
kll_sketch_to_string_double.__doc__ = pysparkfuncs.kll_sketch_to_string_double.__doc__
4596+
4597+
4598+
def kll_sketch_get_n_bigint(col: "ColumnOrName") -> Column:
4599+
fn = "kll_sketch_get_n_bigint"
4600+
return _invoke_function_over_columns(fn, col)
4601+
4602+
4603+
kll_sketch_get_n_bigint.__doc__ = pysparkfuncs.kll_sketch_get_n_bigint.__doc__
4604+
4605+
4606+
def kll_sketch_get_n_float(col: "ColumnOrName") -> Column:
4607+
fn = "kll_sketch_get_n_float"
4608+
return _invoke_function_over_columns(fn, col)
4609+
4610+
4611+
kll_sketch_get_n_float.__doc__ = pysparkfuncs.kll_sketch_get_n_float.__doc__
4612+
4613+
4614+
def kll_sketch_get_n_double(col: "ColumnOrName") -> Column:
4615+
fn = "kll_sketch_get_n_double"
4616+
return _invoke_function_over_columns(fn, col)
4617+
4618+
4619+
kll_sketch_get_n_double.__doc__ = pysparkfuncs.kll_sketch_get_n_double.__doc__
4620+
4621+
4622+
def kll_sketch_merge_bigint(left: "ColumnOrName", right: "ColumnOrName") -> Column:
4623+
fn = "kll_sketch_merge_bigint"
4624+
return _invoke_function_over_columns(fn, left, right)
4625+
4626+
4627+
kll_sketch_merge_bigint.__doc__ = pysparkfuncs.kll_sketch_merge_bigint.__doc__
4628+
4629+
4630+
def kll_sketch_merge_float(left: "ColumnOrName", right: "ColumnOrName") -> Column:
4631+
fn = "kll_sketch_merge_float"
4632+
return _invoke_function_over_columns(fn, left, right)
4633+
4634+
4635+
kll_sketch_merge_float.__doc__ = pysparkfuncs.kll_sketch_merge_float.__doc__
4636+
4637+
4638+
def kll_sketch_merge_double(left: "ColumnOrName", right: "ColumnOrName") -> Column:
4639+
fn = "kll_sketch_merge_double"
4640+
return _invoke_function_over_columns(fn, left, right)
4641+
4642+
4643+
kll_sketch_merge_double.__doc__ = pysparkfuncs.kll_sketch_merge_double.__doc__
4644+
4645+
4646+
def kll_sketch_get_quantile_bigint(sketch: "ColumnOrName", rank: "ColumnOrName") -> Column:
4647+
fn = "kll_sketch_get_quantile_bigint"
4648+
return _invoke_function_over_columns(fn, sketch, rank)
4649+
4650+
4651+
kll_sketch_get_quantile_bigint.__doc__ = pysparkfuncs.kll_sketch_get_quantile_bigint.__doc__
4652+
4653+
4654+
def kll_sketch_get_quantile_float(sketch: "ColumnOrName", rank: "ColumnOrName") -> Column:
4655+
fn = "kll_sketch_get_quantile_float"
4656+
return _invoke_function_over_columns(fn, sketch, rank)
4657+
4658+
4659+
kll_sketch_get_quantile_float.__doc__ = pysparkfuncs.kll_sketch_get_quantile_float.__doc__
4660+
4661+
4662+
def kll_sketch_get_quantile_double(sketch: "ColumnOrName", rank: "ColumnOrName") -> Column:
4663+
fn = "kll_sketch_get_quantile_double"
4664+
return _invoke_function_over_columns(fn, sketch, rank)
4665+
4666+
4667+
kll_sketch_get_quantile_double.__doc__ = pysparkfuncs.kll_sketch_get_quantile_double.__doc__
4668+
4669+
4670+
def kll_sketch_get_rank_bigint(sketch: "ColumnOrName", quantile: "ColumnOrName") -> Column:
4671+
fn = "kll_sketch_get_rank_bigint"
4672+
return _invoke_function_over_columns(fn, sketch, quantile)
4673+
4674+
4675+
kll_sketch_get_rank_bigint.__doc__ = pysparkfuncs.kll_sketch_get_rank_bigint.__doc__
4676+
4677+
4678+
def kll_sketch_get_rank_float(sketch: "ColumnOrName", quantile: "ColumnOrName") -> Column:
4679+
fn = "kll_sketch_get_rank_float"
4680+
return _invoke_function_over_columns(fn, sketch, quantile)
4681+
4682+
4683+
kll_sketch_get_rank_float.__doc__ = pysparkfuncs.kll_sketch_get_rank_float.__doc__
4684+
4685+
4686+
def kll_sketch_get_rank_double(sketch: "ColumnOrName", quantile: "ColumnOrName") -> Column:
4687+
fn = "kll_sketch_get_rank_double"
4688+
return _invoke_function_over_columns(fn, sketch, quantile)
4689+
4690+
4691+
kll_sketch_get_rank_double.__doc__ = pysparkfuncs.kll_sketch_get_rank_double.__doc__
4692+
4693+
45324694
def theta_sketch_estimate(col: "ColumnOrName") -> Column:
45334695
fn = "theta_sketch_estimate"
45344696
return _invoke_function_over_columns(fn, col)

python/pyspark/sql/functions/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@
372372
"histogram_numeric",
373373
"hll_sketch_agg",
374374
"hll_union_agg",
375+
"kll_sketch_agg_bigint",
376+
"kll_sketch_agg_double",
377+
"kll_sketch_agg_float",
375378
"kurtosis",
376379
"last",
377380
"last_value",
@@ -495,6 +498,21 @@
495498
"input_file_block_start",
496499
"input_file_name",
497500
"java_method",
501+
"kll_sketch_get_n_bigint",
502+
"kll_sketch_get_n_double",
503+
"kll_sketch_get_n_float",
504+
"kll_sketch_get_quantile_bigint",
505+
"kll_sketch_get_quantile_double",
506+
"kll_sketch_get_quantile_float",
507+
"kll_sketch_get_rank_bigint",
508+
"kll_sketch_get_rank_double",
509+
"kll_sketch_get_rank_float",
510+
"kll_sketch_merge_bigint",
511+
"kll_sketch_merge_double",
512+
"kll_sketch_merge_float",
513+
"kll_sketch_to_string_bigint",
514+
"kll_sketch_to_string_double",
515+
"kll_sketch_to_string_float",
498516
"monotonically_increasing_id",
499517
"raise_error",
500518
"reflect",

0 commit comments

Comments
 (0)