Skip to content

Conversation

@dtenedor
Copy link
Contributor

@dtenedor dtenedor commented Nov 5, 2025

What changes were proposed in this pull request?

This PR adds DataFrame API support for the KLL quantile sketch functions that were previously added to Spark SQL in #52800. This lets users leverage KLL sketches through both Scala and Python DataFrame APIs in addition to the existing SQL interface.

Key additions:

  1. Scala DataFrame API (sql/api/src/main/scala/org/apache/spark/sql/functions.scala):

    • 18 new functions covering aggregate, merge, quantile, and rank operations
    • Multiple overloads for each function supporting:
      • Column parameters for computed values
      • String parameters for column names
      • Int parameters for literal k values
      • Optional k parameters with sensible defaults
    • Functions for all three data type variants: bigint, float, double
  2. Python DataFrame API (python/pyspark/sql/functions/builtin.py):

    • 18 corresponding Python functions with:
      • Comprehensive docstrings with usage examples
      • Proper type hints (ColumnOrName, Optional[Union[int, Column]])
      • Support for both column objects and column name strings
    • Added to PySpark documentation reference
  3. Python Spark Connect Support (python/pyspark/sql/connect/functions/builtin.py):

    • Full compatibility with Spark Connect architecture
    • All 18 functions properly registered

Why are the changes needed?

While the SQL API for KLL sketches was previously added, DataFrame API support is essential for full usability. Without DataFrame API support, users would be forced to use SQL expressions via expr() or selectExpr(), which is less ergonomic and type-safe.

Does this PR introduce any user-facing change?

Yes, this PR adds DataFrame API support for the 18 KLL sketch functions:

Scala DataFrame API Example:

import org.apache.spark.sql.functions._

// Create sketch with default k
val df = Seq(1, 2, 3, 4, 5).toDF("value")
val sketch = df.agg(kll_sketch_agg_bigint($"value"))

// Create sketch with custom k value
val sketch2 = df.agg(kll_sketch_agg_bigint("value", 400))

// Get median (0.5 quantile)
val sketchDf = df.agg(kll_sketch_agg_bigint($"value").alias("sketch"))
val median = sketchDf.select(kll_sketch_get_quantile_bigint($"sketch", lit(0.5)))

// Get multiple quantiles
val quantiles = sketchDf.select(
  kll_sketch_get_quantile_bigint($"sketch", array(lit(0.25), lit(0.5), lit(0.75)))
)

// Merge sketches
val merged = sketchDf.select(
  kll_sketch_merge_bigint($"sketch", $"sketch").alias("merged")
)

// Get count of items
val count = sketchDf.select(kll_sketch_get_n_bigint($"sketch"))

Python DataFrame API Example:

from pyspark.sql import functions as sf

# Create sketch with default k
df = spark.createDataFrame([1, 2, 3, 4, 5], "INT")
sketch = df.agg(sf.kll_sketch_agg_bigint("value"))

# Create sketch with custom k value
sketch2 = df.agg(sf.kll_sketch_agg_bigint("value", 400))

# Get median (0.5 quantile)
sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
median = sketch_df.select(sf.kll_sketch_get_quantile_bigint("sketch", sf.lit(0.5)))

# Get multiple quantiles
quantiles = sketch_df.select(
    sf.kll_sketch_get_quantile_bigint("sketch", sf.array(sf.lit(0.25), sf.lit(0.5), sf.lit(0.75)))
)

# Merge sketches
merged = sketch_df.select(
    sf.kll_sketch_merge_bigint("sketch", "sketch").alias("merged")
)

# Get count of items
count = sketch_df.select(sf.kll_sketch_get_n_bigint("sketch"))

How was this patch tested?

  1. Scala Unit Tests (DataFrameAggregateSuite):

    • kll_sketch_agg_{bigint,float,double} with default and explicit k values
    • kll_sketch_to_string functions for all data types
    • kll_sketch_get_n functions for all data types
    • kll_sketch_merge operations
    • kll_sketch_get_quantile with single rank and array of ranks
    • kll_sketch_get_rank operations
    • Null value handling tests
  2. Python Unit Tests (test_functions.py):

    • Comprehensive tests mirroring Scala tests
    • Tests for Column object and string column name overloads
    • Tests for optional k parameter
    • Array input tests for quantile/rank functions
    • Null handling validation
    • Type checking (bytes/bytearray for sketches, str for to_string, int/float for values)

Was this patch authored or co-authored using generative AI tooling?

Yes, IDE assistance used claude-4.5-sonnet with manual validation and integration.

@dtenedor
Copy link
Contributor Author

dtenedor commented Nov 5, 2025

cc @cboumalh here is the DataFrame API support for the new functions added in #52800, do you want to take a look?

*
* @group agg_funcs
* @since 4.1.0
* @since 4.2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add something in the PR description about these versioning changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake on this version change, it looks like 4.1.0 is the correct version after all. Reverted that.

kll_sketch_get_quantile_bigint.__doc__ = pysparkfuncs.kll_sketch_get_quantile_bigint.__doc__


def kll_sketch_get_quantile_float(sketch: "ColumnOrName", rank: "ColumnOrName") -> Column:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noticed that in the line below, we don't check if the ranks column type is not supported, not sure what would happen in that case. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/kllExpressions.scala#L488

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this, I think the class mixing in ImplicitCastInputTypes should make sure that both arguments have either the same types or are coercible to the required types. I'll add a couple negative tests just to make sure.

abstract class KllSketchGetQuantileBase
    extends BinaryExpression
        with CodegenFallback
        with ImplicitCastInputTypes {

  ...

  override def inputTypes: Seq[AbstractDataType] =
    Seq(
      BinaryType,
      TypeCollection(
        DoubleType,
        ArrayType(DoubleType, containsNull = false)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah then the checkInputDataTypes function probably does the check when the expression is resolved. I missed that.

Copy link
Contributor

@cboumalh cboumalh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just have the comment above, but all looks good. Thank you!

@dtenedor
Copy link
Contributor Author

dtenedor commented Nov 6, 2025

Thanks for your review!
I just did another manual review as well.
Everything looks good, merging to master.

@dtenedor dtenedor closed this in f191d65 Nov 6, 2025
@dongjoon-hyun
Copy link
Member

Hi, @dtenedor . I don't see any committers' approval yet.

Thanks for your review! I just did another manual review as well. Everything looks good, merging to master.

Screenshot 2025-11-06 at 14 51 34

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @cloud-fan . It would be great if someone from Databricks gave a post-commit approval for this because the code is targeting Apache Spark 4.1.0 like the following. @dtenedor seems to be unable to get a proper chance of community review yet.

  /**
   * Aggregate function: returns the compact binary representation of the Datasketches
   * KllLongsSketch built with the values in the input column. The optional k parameter controls
   * the size and accuracy of the sketch (default 200, range 8-65535).
   *
   * @group agg_funcs
   * @since 4.1.0
   */
  def kll_sketch_agg_bigint(e: Column, k: Column): Column =
    Column.fn("kll_sketch_agg_bigint", e, k)

@dongjoon-hyun
Copy link
Member

In addition, if this is not targeting for Apache Spark 4.1.0 technically, please update the wrong since tags.

@cloud-fan
Copy link
Contributor

The change LGTM, it's just adding dataframe functions to match SQL.

Given the related SQL functions are in 4.1, I think it makes sense to backport this to 4.1 as well. @dtenedor what do you think?

@dongjoon-hyun
Copy link
Member

Thank you, @cloud-fan !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants