@@ -157,24 +157,84 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
157157end
158158
159159function groupreduce! (res:: AbstractVector , f, op, condf, adjust, checkempty:: Bool ,
160- incol:: AbstractVector , gd:: GroupedDataFrame )
160+ incol:: AbstractVector , gd:: GroupedDataFrame , nthreads :: Integer )
161161 n = length (gd)
162+ groups = gd. groups
162163 if adjust != = nothing || checkempty
163164 counts = zeros (Int, n)
164165 end
165- groups = gd. groups
166- @inbounds for i in eachindex (incol, groups)
167- gix = groups[i]
168- x = incol[i]
169- if gix > 0 && (condf === nothing || condf (x))
170- # this check should be optimized out if U is not Any
171- if eltype (res) === Any && ! isassigned (res, gix)
172- res[gix] = f (x, gix)
173- else
174- res[gix] = op (res[gix], f (x, gix))
166+ nt = min (nthreads, Threads. nthreads ())
167+ if nt <= 1 || axes (incol) != axes (groups)
168+ @inbounds for i in eachindex (incol, groups)
169+ gix = groups[i]
170+ x = incol[i]
171+ if gix > 0 && (condf === nothing || condf (x))
172+ # this check should be optimized out if eltype is not Any
173+ if eltype (res) === Any && ! isassigned (res, gix)
174+ res[gix] = f (x, gix)
175+ else
176+ res[gix] = op (res[gix], f (x, gix))
177+ end
178+ if adjust != = nothing || checkempty
179+ counts[gix] += 1
180+ end
181+ end
182+ end
183+ else
184+ res_vec = Vector {typeof(res)} (undef, nt)
185+ # needs to be always allocated to fix type instability with @threads
186+ counts_vec = Vector {Vector{Int}} (undef, nt)
187+ res_vec[1 ] = res
188+ if adjust != = nothing || checkempty
189+ counts_vec[1 ] = counts
190+ end
191+ for i in 2 : nt
192+ res_vec[i] = copy (res)
193+ if adjust != = nothing || checkempty
194+ counts_vec[i] = zeros (Int, n)
175195 end
196+ end
197+ Threads. @threads for tid in 1 : nt
198+ res′ = res_vec[tid]
176199 if adjust != = nothing || checkempty
177- counts[gix] += 1
200+ counts′ = counts_vec[tid]
201+ end
202+ start = 1 + ((tid - 1 ) * length (groups)) ÷ nt
203+ stop = (tid * length (groups)) ÷ nt
204+ @inbounds for i in start: stop
205+ gix = groups[i]
206+ x = incol[i]
207+ if gix > 0 && (condf === nothing || condf (x))
208+ # this check should be optimized out if eltype is not Any
209+ if eltype (res′) === Any && ! isassigned (res′, gix)
210+ res′[gix] = f (x, gix)
211+ else
212+ res′[gix] = op (res′[gix], f (x, gix))
213+ end
214+ if adjust != = nothing || checkempty
215+ counts′[gix] += 1
216+ end
217+ end
218+ end
219+ end
220+ for i in 2 : length (res_vec)
221+ resi = res_vec[i]
222+ @inbounds @simd for j in eachindex (res)
223+ # this check should be optimized out if eltype is not Any
224+ if eltype (res) === Any
225+ if isassigned (resi, j) && isassigned (res, j)
226+ res[j] = op (res[j], resi[j])
227+ elseif isassigned (resi, j)
228+ res[j] = resi[j]
229+ end
230+ else
231+ res[j] = op (res[j], resi[j])
232+ end
233+ end
234+ end
235+ if adjust != = nothing || checkempty
236+ for i in 2 : length (counts_vec)
237+ counts .+ = counts_vec[i]
178238 end
179239 end
180240 end
@@ -218,26 +278,31 @@ end
218278
219279# function barrier works around type instability of groupreduce_init due to applicable
220280groupreduce (f, op, condf, adjust, checkempty:: Bool ,
221- incol:: AbstractVector , gd:: GroupedDataFrame ) =
281+ incol:: AbstractVector , gd:: GroupedDataFrame ,
282+ nthreads:: Integer ) =
222283 groupreduce! (groupreduce_init (op, condf, adjust, incol, gd),
223- f, op, condf, adjust, checkempty, incol, gd)
284+ f, op, condf, adjust, checkempty, incol, gd, nthreads )
224285# Avoids the overhead due to Missing when computing reduction
225286groupreduce (f, op, condf:: typeof (! ismissing), adjust, checkempty:: Bool ,
226- incol:: AbstractVector , gd:: GroupedDataFrame ) =
287+ incol:: AbstractVector , gd:: GroupedDataFrame ,
288+ nthreads:: Integer ) =
227289 groupreduce! (disallowmissing (groupreduce_init (op, condf, adjust, incol, gd)),
228- f, op, condf, adjust, checkempty, incol, gd)
290+ f, op, condf, adjust, checkempty, incol, gd, nthreads )
229291
230- (r:: Reduce )(incol:: AbstractVector , gd:: GroupedDataFrame ) =
231- groupreduce ((x, i) -> x, r. op, r. condf, r. adjust, r. checkempty, incol, gd)
292+ (r:: Reduce )(incol:: AbstractVector , gd:: GroupedDataFrame ;
293+ nthreads:: Integer = 1 ) =
294+ groupreduce ((x, i) -> x, r. op, r. condf, r. adjust, r. checkempty, incol, gd, nthreads)
232295
233296# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
234297# TODO : remove this when we drop 1.0 support
235298if VERSION < v " 1.1"
236299 Base. zero (:: Type{Missing} ) = missing
237300end
238301
239- function (agg:: Aggregate{typeof(var)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
240- means = groupreduce ((x, i) -> x, Base. add_sum, agg. condf, / , false , incol, gd)
302+ function (agg:: Aggregate{typeof(var)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
303+ nthreads:: Integer = 1 )
304+ means = groupreduce ((x, i) -> x, Base. add_sum, agg. condf, / , false ,
305+ incol, gd, nthreads)
241306 # !ismissing check is purely an optimization to avoid a copy later
242307 if eltype (means) >: Missing && agg. condf != = ! ismissing
243308 T = Union{Missing, real (eltype (means))}
@@ -247,32 +312,38 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
247312 res = zeros (T, length (gd))
248313 return groupreduce! (res, (x, i) -> @inbounds (abs2 (x - means[i])), + , agg. condf,
249314 (x, l) -> l <= 1 ? oftype (x / (l- 1 ), NaN ) : x / (l- 1 ),
250- false , incol, gd)
315+ false , incol, gd, nthreads )
251316end
252317
253- function (agg:: Aggregate{typeof(std)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
254- outcol = Aggregate (var, agg. condf)(incol, gd)
318+ function (agg:: Aggregate{typeof(std)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
319+ nthreads:: Integer = 1 )
320+ outcol = Aggregate (var, agg. condf)(incol, gd; nthreads= nthreads)
255321 if eltype (outcol) <: Union{Missing, Rational}
256322 return sqrt .(outcol)
257323 else
258324 return map! (sqrt, outcol, outcol)
259325 end
260326end
261327
262- for f in (first, last)
263- function (agg:: Aggregate{typeof(f)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
264- n = length (gd)
265- outcol = similar (incol, n)
266- fillfirst! (agg. condf, outcol, incol, gd, rev= agg. f === last)
267- if isconcretetype (eltype (outcol))
268- return outcol
269- else
270- return copyto_widen! (Tables. allocatecolumn (typeof (first (outcol)), n), outcol)
328+ for f in (:first , :last )
329+ # Without using @eval the presence of a keyword argument triggers a Julia bug
330+ @eval begin
331+ function (agg:: Aggregate{typeof($f)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
332+ nthreads:: Integer = 1 )
333+ n = length (gd)
334+ outcol = similar (incol, n)
335+ fillfirst! (agg. condf, outcol, incol, gd, rev= agg. f === last)
336+ if isconcretetype (eltype (outcol))
337+ return outcol
338+ else
339+ return copyto_widen! (Tables. allocatecolumn (typeof (first (outcol)), n), outcol)
340+ end
271341 end
272342 end
273343end
274344
275- function (agg:: Aggregate{typeof(length)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
345+ function (agg:: Aggregate{typeof(length)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
346+ nthreads:: Integer = 1 )
276347 if getfield (gd, :idx ) === nothing
277348 lens = zeros (Int, length (gd))
278349 @inbounds for gix in gd. groups
0 commit comments