@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.datasources.parquet
2020import java .sql .{Date , Timestamp }
2121
2222import org .apache .parquet .filter2 .predicate ._
23- import org .apache .parquet .filter2 .predicate .FilterApi ._
23+ import org .apache .parquet .filter2 .predicate .Operators .{Column , SupportsEqNotEq , SupportsLtGt }
24+ import org .apache .parquet .hadoop .metadata .ColumnPath
2425import org .apache .parquet .io .api .Binary
2526
2627import org .apache .spark .sql .catalyst .util .DateTimeUtils
@@ -32,29 +33,7 @@ import org.apache.spark.sql.types._
3233 */
3334private [parquet] class ParquetFilters (pushDownDate : Boolean , int96AsTimestamp : Boolean ) {
3435
35- case class SetInFilter [T <: Comparable [T ]](valueSet : Set [T ])
36- extends UserDefinedPredicate [T ] with Serializable {
37-
38- override def keep (value : T ): Boolean = {
39- value != null && valueSet.contains(value)
40- }
41-
42- // Drop when no value in the set is within the statistics range.
43- override def canDrop (statistics : Statistics [T ]): Boolean = {
44- val statMax = statistics.getMax
45- val statMin = statistics.getMin
46- val statRange = com.google.common.collect.Range .closed(statMin, statMax)
47- ! valueSet.exists(value => statRange.contains(value))
48- }
49-
50- // Can only drop not(in(set)) when we are know that every element in the block is in valueSet.
51- // From the statistics, we can only be assured of this when min == max.
52- override def inverseCanDrop (statistics : Statistics [T ]): Boolean = {
53- val statMax = statistics.getMax
54- val statMin = statistics.getMin
55- statMin == statMax && valueSet.contains(statMin)
56- }
57- }
36+ import ParquetColumns ._
5837
5938 private val makeInSet : PartialFunction [DataType , (String , Set [Any ]) => FilterPredicate ] = {
6039 case IntegerType =>
@@ -338,3 +317,63 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, int96AsTimestamp: B
338317 }
339318 }
340319}
320+
321+ private [parquet] case class SetInFilter [T <: Comparable [T ]](valueSet : Set [T ])
322+ extends UserDefinedPredicate [T ] with Serializable {
323+
324+ override def keep (value : T ): Boolean = {
325+ value != null && valueSet.contains(value)
326+ }
327+
328+ // Drop when no value in the set is within the statistics range.
329+ override def canDrop (statistics : Statistics [T ]): Boolean = {
330+ val statMax = statistics.getMax
331+ val statMin = statistics.getMin
332+ val statRange = com.google.common.collect.Range .closed(statMin, statMax)
333+ ! valueSet.exists(value => statRange.contains(value))
334+ }
335+
336+ // Can only drop not(in(set)) when we are know that every element in the block is in valueSet.
337+ // From the statistics, we can only be assured of this when min == max.
338+ override def inverseCanDrop (statistics : Statistics [T ]): Boolean = {
339+ val statMax = statistics.getMax
340+ val statMin = statistics.getMin
341+ statMin == statMax && valueSet.contains(statMin)
342+ }
343+ }
344+
345+ /**
346+ * Note that, this is a hacky workaround to allow dots in column names. Currently, column APIs
347+ * in Parquet's `FilterApi` only allows dot-separated names so here we resemble those columns
348+ * but only allow single column path that allows dots in the names as we don't currently push
349+ * down filters with nested fields.
350+ */
351+ private [parquet] object ParquetColumns {
352+ def intColumn (columnPath : String ): Column [Integer ] with SupportsLtGt = {
353+ new Column [Integer ] (ColumnPath .get(columnPath), classOf [Integer ]) with SupportsLtGt
354+ }
355+
356+ def longColumn (columnPath : String ): Column [java.lang.Long ] with SupportsLtGt = {
357+ new Column [java.lang.Long ] (
358+ ColumnPath .get(columnPath), classOf [java.lang.Long ]) with SupportsLtGt
359+ }
360+
361+ def floatColumn (columnPath : String ): Column [java.lang.Float ] with SupportsLtGt = {
362+ new Column [java.lang.Float ] (
363+ ColumnPath .get(columnPath), classOf [java.lang.Float ]) with SupportsLtGt
364+ }
365+
366+ def doubleColumn (columnPath : String ): Column [java.lang.Double ] with SupportsLtGt = {
367+ new Column [java.lang.Double ] (
368+ ColumnPath .get(columnPath), classOf [java.lang.Double ]) with SupportsLtGt
369+ }
370+
371+ def booleanColumn (columnPath : String ): Column [java.lang.Boolean ] with SupportsEqNotEq = {
372+ new Column [java.lang.Boolean ] (
373+ ColumnPath .get(columnPath), classOf [java.lang.Boolean ]) with SupportsEqNotEq
374+ }
375+
376+ def binaryColumn (columnPath : String ): Column [Binary ] with SupportsLtGt = {
377+ new Column [Binary ] (ColumnPath .get(columnPath), classOf [Binary ]) with SupportsLtGt
378+ }
379+ }
0 commit comments