Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ Aggregate Functions
histogram_numeric
hll_sketch_agg
hll_union_agg
kll_sketch_agg_bigint
kll_sketch_agg_double
kll_sketch_agg_float
kurtosis
last
last_value
Expand Down Expand Up @@ -631,6 +634,21 @@ Misc Functions
current_user
hll_sketch_estimate
hll_union
kll_sketch_get_n_bigint
kll_sketch_get_n_double
kll_sketch_get_n_float
kll_sketch_get_quantile_bigint
kll_sketch_get_quantile_double
kll_sketch_get_quantile_float
kll_sketch_get_rank_bigint
kll_sketch_get_rank_double
kll_sketch_get_rank_float
kll_sketch_merge_bigint
kll_sketch_merge_double
kll_sketch_merge_float
kll_sketch_to_string_bigint
kll_sketch_to_string_double
kll_sketch_to_string_float
input_file_block_length
input_file_block_start
input_file_name
Expand Down
162 changes: 162 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4529,6 +4529,168 @@ def theta_intersection_agg(
theta_intersection_agg.__doc__ = pysparkfuncs.theta_intersection_agg.__doc__


def kll_sketch_agg_bigint(
col: "ColumnOrName",
k: Optional[Union[int, Column]] = None,
) -> Column:
fn = "kll_sketch_agg_bigint"
if k is None:
return _invoke_function_over_columns(fn, col)
else:
return _invoke_function_over_columns(fn, col, lit(k))


kll_sketch_agg_bigint.__doc__ = pysparkfuncs.kll_sketch_agg_bigint.__doc__


def kll_sketch_agg_float(
col: "ColumnOrName",
k: Optional[Union[int, Column]] = None,
) -> Column:
fn = "kll_sketch_agg_float"
if k is None:
return _invoke_function_over_columns(fn, col)
else:
return _invoke_function_over_columns(fn, col, lit(k))


kll_sketch_agg_float.__doc__ = pysparkfuncs.kll_sketch_agg_float.__doc__


def kll_sketch_agg_double(
col: "ColumnOrName",
k: Optional[Union[int, Column]] = None,
) -> Column:
fn = "kll_sketch_agg_double"
if k is None:
return _invoke_function_over_columns(fn, col)
else:
return _invoke_function_over_columns(fn, col, lit(k))


kll_sketch_agg_double.__doc__ = pysparkfuncs.kll_sketch_agg_double.__doc__


def kll_sketch_to_string_bigint(col: "ColumnOrName") -> Column:
fn = "kll_sketch_to_string_bigint"
return _invoke_function_over_columns(fn, col)


kll_sketch_to_string_bigint.__doc__ = pysparkfuncs.kll_sketch_to_string_bigint.__doc__


def kll_sketch_to_string_float(col: "ColumnOrName") -> Column:
fn = "kll_sketch_to_string_float"
return _invoke_function_over_columns(fn, col)


kll_sketch_to_string_float.__doc__ = pysparkfuncs.kll_sketch_to_string_float.__doc__


def kll_sketch_to_string_double(col: "ColumnOrName") -> Column:
fn = "kll_sketch_to_string_double"
return _invoke_function_over_columns(fn, col)


kll_sketch_to_string_double.__doc__ = pysparkfuncs.kll_sketch_to_string_double.__doc__


def kll_sketch_get_n_bigint(col: "ColumnOrName") -> Column:
fn = "kll_sketch_get_n_bigint"
return _invoke_function_over_columns(fn, col)


kll_sketch_get_n_bigint.__doc__ = pysparkfuncs.kll_sketch_get_n_bigint.__doc__


def kll_sketch_get_n_float(col: "ColumnOrName") -> Column:
fn = "kll_sketch_get_n_float"
return _invoke_function_over_columns(fn, col)


kll_sketch_get_n_float.__doc__ = pysparkfuncs.kll_sketch_get_n_float.__doc__


def kll_sketch_get_n_double(col: "ColumnOrName") -> Column:
fn = "kll_sketch_get_n_double"
return _invoke_function_over_columns(fn, col)


kll_sketch_get_n_double.__doc__ = pysparkfuncs.kll_sketch_get_n_double.__doc__


def kll_sketch_merge_bigint(left: "ColumnOrName", right: "ColumnOrName") -> Column:
fn = "kll_sketch_merge_bigint"
return _invoke_function_over_columns(fn, left, right)


kll_sketch_merge_bigint.__doc__ = pysparkfuncs.kll_sketch_merge_bigint.__doc__


def kll_sketch_merge_float(left: "ColumnOrName", right: "ColumnOrName") -> Column:
fn = "kll_sketch_merge_float"
return _invoke_function_over_columns(fn, left, right)


kll_sketch_merge_float.__doc__ = pysparkfuncs.kll_sketch_merge_float.__doc__


def kll_sketch_merge_double(left: "ColumnOrName", right: "ColumnOrName") -> Column:
fn = "kll_sketch_merge_double"
return _invoke_function_over_columns(fn, left, right)


kll_sketch_merge_double.__doc__ = pysparkfuncs.kll_sketch_merge_double.__doc__


def kll_sketch_get_quantile_bigint(sketch: "ColumnOrName", rank: "ColumnOrName") -> Column:
fn = "kll_sketch_get_quantile_bigint"
return _invoke_function_over_columns(fn, sketch, rank)


kll_sketch_get_quantile_bigint.__doc__ = pysparkfuncs.kll_sketch_get_quantile_bigint.__doc__


def kll_sketch_get_quantile_float(sketch: "ColumnOrName", rank: "ColumnOrName") -> Column:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noticed that in the line below, we don't check if the ranks column type is not supported, not sure what would happen in that case. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/kllExpressions.scala#L488

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this, I think the class mixing in ImplicitCastInputTypes should make sure that both arguments have either the same types or are coercible to the required types. I'll add a couple negative tests just to make sure.

abstract class KllSketchGetQuantileBase
    extends BinaryExpression
        with CodegenFallback
        with ImplicitCastInputTypes {

  ...

  override def inputTypes: Seq[AbstractDataType] =
    Seq(
      BinaryType,
      TypeCollection(
        DoubleType,
        ArrayType(DoubleType, containsNull = false)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah then the checkInputDataTypes function probably does the check when the expression is resolved. I missed that.

fn = "kll_sketch_get_quantile_float"
return _invoke_function_over_columns(fn, sketch, rank)


kll_sketch_get_quantile_float.__doc__ = pysparkfuncs.kll_sketch_get_quantile_float.__doc__


def kll_sketch_get_quantile_double(sketch: "ColumnOrName", rank: "ColumnOrName") -> Column:
fn = "kll_sketch_get_quantile_double"
return _invoke_function_over_columns(fn, sketch, rank)


kll_sketch_get_quantile_double.__doc__ = pysparkfuncs.kll_sketch_get_quantile_double.__doc__


def kll_sketch_get_rank_bigint(sketch: "ColumnOrName", quantile: "ColumnOrName") -> Column:
fn = "kll_sketch_get_rank_bigint"
return _invoke_function_over_columns(fn, sketch, quantile)


kll_sketch_get_rank_bigint.__doc__ = pysparkfuncs.kll_sketch_get_rank_bigint.__doc__


def kll_sketch_get_rank_float(sketch: "ColumnOrName", quantile: "ColumnOrName") -> Column:
fn = "kll_sketch_get_rank_float"
return _invoke_function_over_columns(fn, sketch, quantile)


kll_sketch_get_rank_float.__doc__ = pysparkfuncs.kll_sketch_get_rank_float.__doc__


def kll_sketch_get_rank_double(sketch: "ColumnOrName", quantile: "ColumnOrName") -> Column:
fn = "kll_sketch_get_rank_double"
return _invoke_function_over_columns(fn, sketch, quantile)


kll_sketch_get_rank_double.__doc__ = pysparkfuncs.kll_sketch_get_rank_double.__doc__


def theta_sketch_estimate(col: "ColumnOrName") -> Column:
fn = "theta_sketch_estimate"
return _invoke_function_over_columns(fn, col)
Expand Down
Loading