Skip to content

Commit df00904

Browse files
authored
Filter nulls from joins where possible to improve performance. (#594)
* Filter nulls from joins where possible to improve performance. Signed-off-by: Robert (Bobby) Evans <[email protected]> * Addressed review comments Signed-off-by: Robert (Bobby) Evans <[email protected]> * Updated patch for other shims
1 parent ce1f9b8 commit df00904

File tree

11 files changed

+292
-72
lines changed

11 files changed

+292
-72
lines changed

integration_tests/src/main/python/tpcds_test.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b',
2424
'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49',
2525
'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59',
26-
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69',
27-
'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
26+
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69',
27+
'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
2828
'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89',
2929
'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99',
3030
'ss_max', 'ss_maxb']
@@ -35,5 +35,17 @@
3535
@allow_non_gpu(any=True)
3636
@pytest.mark.parametrize('query', queries)
3737
def test_tpcds(tpcds, query):
38+
assert_gpu_and_cpu_are_equal_collect(
39+
lambda spark : tpcds.do_test_query(query),
40+
conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'})
41+
42+
no_var_agg_queries = ['q67', 'q70']
43+
44+
@incompat
45+
@ignore_order
46+
@approximate_float
47+
@allow_non_gpu(any=True)
48+
@pytest.mark.parametrize('query', no_var_agg_queries)
49+
def test_tpcds_no_var_agg(tpcds, query):
3850
assert_gpu_and_cpu_are_equal_collect(
3951
lambda spark : tpcds.do_test_query(query))

shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,15 @@ case class GpuBroadcastHashJoinExec(
137137
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
138138

139139
lazy val builtTable = {
140-
// TODO clean up intermediate results...
141-
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
142-
val combined = combine(keys, broadcastRelation.value.batch)
143-
val ret = GpuColumnVector.from(combined)
140+
val ret = withResource(
141+
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
142+
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
143+
val filtered = filterBuiltTableIfNeeded(combined)
144+
withResource(filtered) { filtered =>
145+
GpuColumnVector.from(filtered)
146+
}
147+
}
148+
144149
// Don't warn for a leak, because we cannot control when we are done with this
145150
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
146151
ret

shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2323
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
2424
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin}
2525
import org.apache.spark.sql.execution.metric.SQLMetric
26+
import org.apache.spark.sql.rapids.GpuAnd
2627
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
2728

2829
object GpuHashJoin {
@@ -39,6 +40,11 @@ object GpuHashJoin {
3940
}
4041
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
4142
}
43+
44+
def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
45+
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
46+
cb
47+
}
4248
}
4349

4450
trait GpuHashJoin extends GpuExec with HashJoin {
@@ -110,6 +116,63 @@ trait GpuHashJoin extends GpuExec with HashJoin {
110116
output.indices.map (v => v + joinLength)
111117
}
112118

119+
// Spark adds in rules to filter out nulls for some types of joins, but it does not
120+
// guarantee 100% that all nulls will be filtered out by the time they get to
121+
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
122+
// we need to filter out the nulls ourselves until it is fixed.
123+
// InnerLike | LeftSemi =>
124+
// filter left and right keys
125+
// RightOuter =>
126+
// filter left keys
127+
// LeftOuter | LeftAnti =>
128+
// filter right keys
129+
130+
private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
131+
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
132+
(joinType, buildSide) match {
133+
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
134+
case (RightOuter, BuildLeft) => builtAnyNullable
135+
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
136+
case _ => false
137+
}
138+
}
139+
140+
private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
141+
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
142+
(joinType, buildSide) match {
143+
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
144+
case (RightOuter, BuildRight) => streamedAnyNullable
145+
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
146+
case _ => false
147+
}
148+
}
149+
150+
private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
151+
exprs.zipWithIndex.map { kv =>
152+
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
153+
}.reduce(GpuAnd)
154+
155+
private[this] lazy val builtTableNullFilterExpression: GpuExpression =
156+
mkNullFilterExpr(gpuBuildKeys)
157+
158+
private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
159+
mkNullFilterExpr(gpuStreamedKeys)
160+
161+
/**
162+
* Filter the builtBatch if needed. builtBatch will be closed.
163+
* @param builtBatch
164+
* @return
165+
*/
166+
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
167+
if (shouldFilterBuiltTableForNulls) {
168+
GpuFilter(builtBatch, builtTableNullFilterExpression)
169+
} else {
170+
builtBatch
171+
}
172+
173+
private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
174+
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
175+
113176
def doJoin(builtTable: Table,
114177
stream: Iterator[ColumnarBatch],
115178
boundCondition: Option[Expression],
@@ -134,7 +197,11 @@ trait GpuHashJoin extends GpuExec with HashJoin {
134197
override def hasNext: Boolean = {
135198
while (nextCb.isEmpty && (first || stream.hasNext)) {
136199
if (stream.hasNext) {
137-
val cb = stream.next()
200+
val cb = if (shouldFilterStreamTableForNulls) {
201+
filterStreamedTable(stream.next())
202+
} else {
203+
stream.next()
204+
}
138205
val startTime = System.nanoTime()
139206
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
140207
numOutputBatches, joinTime, filterTime)

shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,20 @@ case class GpuShuffledHashJoinExec(
117117
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
118118
(streamIter, buildIter) => {
119119
var combinedSize = 0
120+
120121
val startTime = System.nanoTime()
121-
val buildBatch =
122-
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
123-
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
124-
val builtTable = try {
125-
// Combine does not inc any reference counting
126-
val combined = combine(keys, buildBatch)
127-
combinedSize =
128-
GpuColumnVector.extractColumns(combined)
129-
.map(_.getBase.getDeviceMemorySize).sum.toInt
130-
GpuColumnVector.from(combined)
131-
} finally {
132-
keys.close()
133-
buildBatch.close()
122+
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
123+
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
124+
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
125+
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
126+
val filtered = filterBuiltTableIfNeeded(combined)
127+
combinedSize =
128+
GpuColumnVector.extractColumns(filtered)
129+
.map(_.getBase.getDeviceMemorySize).sum.toInt
130+
withResource(filtered) { filtered =>
131+
GpuColumnVector.from(filtered)
132+
}
133+
}
134134
}
135135

136136
val delta = System.nanoTime() - startTime

shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,15 @@ case class GpuBroadcastHashJoinExec(
138138
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
139139

140140
lazy val builtTable = {
141-
// TODO clean up intermediate results...
142-
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
143-
val combined = combine(keys, broadcastRelation.value.batch)
144-
val ret = GpuColumnVector.from(combined)
141+
val ret = withResource(
142+
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
143+
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
144+
val filtered = filterBuiltTableIfNeeded(combined)
145+
withResource(filtered) { filtered =>
146+
GpuColumnVector.from(filtered)
147+
}
148+
}
149+
145150
// Don't warn for a leak, because we cannot control when we are done with this
146151
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
147152
ret

shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
2424
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
2525
import org.apache.spark.sql.execution.joins.HashJoin
2626
import org.apache.spark.sql.execution.metric.SQLMetric
27+
import org.apache.spark.sql.rapids.GpuAnd
2728
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
2829

2930
object GpuHashJoin {
@@ -40,6 +41,11 @@ object GpuHashJoin {
4041
}
4142
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
4243
}
44+
45+
def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
46+
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
47+
cb
48+
}
4349
}
4450

4551
trait GpuHashJoin extends GpuExec with HashJoin {
@@ -111,6 +117,58 @@ trait GpuHashJoin extends GpuExec with HashJoin {
111117
output.indices.map (v => v + joinLength)
112118
}
113119

120+
// Spark adds in rules to filter out nulls for some types of joins, but it does not
121+
// guarantee 100% that all nulls will be filtered out by the time they get to
122+
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
123+
// we need to filter out the nulls ourselves until it is fixed.
124+
// InnerLike | LeftSemi =>
125+
// filter left and right keys
126+
// RightOuter =>
127+
// filter left keys
128+
// LeftOuter | LeftAnti =>
129+
// filter right keys
130+
131+
private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
132+
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
133+
(joinType, buildSide) match {
134+
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
135+
case (RightOuter, BuildLeft) => builtAnyNullable
136+
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
137+
case _ => false
138+
}
139+
}
140+
141+
private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
142+
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
143+
(joinType, buildSide) match {
144+
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
145+
case (RightOuter, BuildRight) => streamedAnyNullable
146+
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
147+
case _ => false
148+
}
149+
}
150+
151+
private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
152+
exprs.zipWithIndex.map { kv =>
153+
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
154+
}.reduce(GpuAnd)
155+
156+
private[this] lazy val builtTableNullFilterExpression: GpuExpression =
157+
mkNullFilterExpr(gpuBuildKeys)
158+
159+
private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
160+
mkNullFilterExpr(gpuStreamedKeys)
161+
162+
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
163+
if (shouldFilterBuiltTableForNulls) {
164+
GpuFilter(builtBatch, builtTableNullFilterExpression)
165+
} else {
166+
builtBatch
167+
}
168+
169+
private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
170+
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
171+
114172
def doJoin(builtTable: Table,
115173
stream: Iterator[ColumnarBatch],
116174
boundCondition: Option[Expression],
@@ -135,7 +193,11 @@ trait GpuHashJoin extends GpuExec with HashJoin {
135193
override def hasNext: Boolean = {
136194
while (nextCb.isEmpty && (first || stream.hasNext)) {
137195
if (stream.hasNext) {
138-
val cb = stream.next()
196+
val cb = if (shouldFilterStreamTableForNulls) {
197+
filterStreamedTable(stream.next())
198+
} else {
199+
stream.next()
200+
}
139201
val startTime = System.nanoTime()
140202
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
141203
numOutputBatches, joinTime, filterTime)

shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec(
118118
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
119119
(streamIter, buildIter) => {
120120
var combinedSize = 0
121+
121122
val startTime = System.nanoTime()
122-
val buildBatch =
123-
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
124-
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
125-
val builtTable = try {
126-
// Combine does not inc any reference counting
127-
val combined = combine(keys, buildBatch)
128-
combinedSize =
129-
GpuColumnVector.extractColumns(combined)
130-
.map(_.getBase.getDeviceMemorySize).sum.toInt
131-
GpuColumnVector.from(combined)
132-
} finally {
133-
keys.close()
134-
buildBatch.close()
123+
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
124+
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
125+
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
126+
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
127+
val filtered = filterBuiltTableIfNeeded(combined)
128+
combinedSize =
129+
GpuColumnVector.extractColumns(filtered)
130+
.map(_.getBase.getDeviceMemorySize).sum.toInt
131+
withResource(filtered) { filtered =>
132+
GpuColumnVector.from(filtered)
133+
}
134+
}
135135
}
136136

137137
val delta = System.nanoTime() - startTime

shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,15 @@ case class GpuBroadcastHashJoinExec(
143143
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))
144144

145145
lazy val builtTable = {
146-
// TODO clean up intermediate results...
147-
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
148-
val combined = combine(keys, broadcastRelation.value.batch)
149-
val ret = GpuColumnVector.from(combined)
146+
val ret = withResource(
147+
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
148+
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
149+
val filtered = filterBuiltTableIfNeeded(combined)
150+
withResource(filtered) { filtered =>
151+
GpuColumnVector.from(filtered)
152+
}
153+
}
154+
150155
// Don't warn for a leak, because we cannot control when we are done with this
151156
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
152157
ret

0 commit comments

Comments
 (0)