@@ -157,24 +157,72 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
157157end
158158
159159function groupreduce! (res:: AbstractVector , f, op, condf, adjust, checkempty:: Bool ,
160- incol:: AbstractVector , gd:: GroupedDataFrame )
160+ incol:: AbstractVector , gd:: GroupedDataFrame , nthreads :: Integer )
161161 n = length (gd)
162+ groups = gd. groups
162163 if adjust != = nothing || checkempty
163164 counts = zeros (Int, n)
164165 end
165- groups = gd. groups
166- @inbounds for i in eachindex (incol, groups)
167- gix = groups[i]
168- x = incol[i]
169- if gix > 0 && (condf === nothing || condf (x))
170- # this check should be optimized out if U is not Any
171- if eltype (res) === Any && ! isassigned (res, gix)
172- res[gix] = f (x, gix)
173- else
174- res[gix] = op (res[gix], f (x, gix))
166+ nt = min (nthreads, Threads. nthreads ())
167+ if nt <= 1 || axes (incol) != axes (groups)
168+ @inbounds for i in eachindex (incol, groups)
169+ gix = groups[i]
170+ x = incol[i]
171+ if gix > 0 && (condf === nothing || condf (x))
172+ # this check should be optimized out if U is not Any
173+ if eltype (res) === Any && ! isassigned (res, gix)
174+ res[gix] = f (x, gix)
175+ else
176+ res[gix] = op (res[gix], f (x, gix))
177+ end
178+ if adjust != = nothing || checkempty
179+ counts[gix] += 1
180+ end
175181 end
182+ end
183+ else
184+ res_vec = Vector {typeof(res)} (undef, nt)
185+ # needs to be always allocated to fix type instability with @threads
186+ counts_vec = Vector {Vector{Int}} (undef, nt)
187+ res_vec[1 ] = res
188+ if adjust != = nothing || checkempty
189+ counts_vec[1 ] = counts
190+ end
191+ for i in 2 : nt
192+ res_vec[i] = copy (res)
193+ if adjust != = nothing || checkempty
194+ counts_vec[i] = zeros (Int, n)
195+ end
196+ end
197+ Threads. @threads for tid in 1 : nt
198+ res′ = res_vec[tid]
176199 if adjust != = nothing || checkempty
177- counts[gix] += 1
200+ counts′ = counts_vec[tid]
201+ end
202+ start = 1 + ((tid - 1 ) * length (groups)) ÷ nt
203+ stop = (tid * length (groups)) ÷ nt
204+ @inbounds for i in start: stop
205+ gix = groups[i]
206+ x = incol[i]
207+ if gix > 0 && (condf === nothing || condf (x))
208+ # this check should be optimized out if U is not Any
209+ if eltype (res′) === Any && ! isassigned (res′, gix)
210+ res′[gix] = f (x, gix)
211+ else
212+ res′[gix] = op (res′[gix], f (x, gix))
213+ end
214+ if adjust != = nothing || checkempty
215+ counts′[gix] += 1
216+ end
217+ end
218+ end
219+ end
220+ for i in 2 : length (res_vec)
221+ res .= op .(res, res_vec[i])
222+ end
223+ if adjust != = nothing || checkempty
224+ for i in 2 : length (counts_vec)
225+ counts .+ = counts_vec[i]
178226 end
179227 end
180228 end
@@ -218,26 +266,31 @@ end
218266
219267# function barrier works around type instability of groupreduce_init due to applicable
220268groupreduce (f, op, condf, adjust, checkempty:: Bool ,
221- incol:: AbstractVector , gd:: GroupedDataFrame ) =
269+ incol:: AbstractVector , gd:: GroupedDataFrame ,
270+ nthreads:: Integer ) =
222271 groupreduce! (groupreduce_init (op, condf, adjust, incol, gd),
223- f, op, condf, adjust, checkempty, incol, gd)
272+ f, op, condf, adjust, checkempty, incol, gd, nthreads )
224273# Avoids the overhead due to Missing when computing reduction
225274groupreduce (f, op, condf:: typeof (! ismissing), adjust, checkempty:: Bool ,
226- incol:: AbstractVector , gd:: GroupedDataFrame ) =
275+ incol:: AbstractVector , gd:: GroupedDataFrame ,
276+ nthreads:: Integer ) =
227277 groupreduce! (disallowmissing (groupreduce_init (op, condf, adjust, incol, gd)),
228- f, op, condf, adjust, checkempty, incol, gd)
278+ f, op, condf, adjust, checkempty, incol, gd, nthreads )
229279
230- (r:: Reduce )(incol:: AbstractVector , gd:: GroupedDataFrame ) =
231- groupreduce ((x, i) -> x, r. op, r. condf, r. adjust, r. checkempty, incol, gd)
280+ (r:: Reduce )(incol:: AbstractVector , gd:: GroupedDataFrame ;
281+ nthreads:: Integer = 1 ) =
282+ groupreduce ((x, i) -> x, r. op, r. condf, r. adjust, r. checkempty, incol, gd, nthreads)
232283
233284# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
234285# TODO : remove this when we drop 1.0 support
235286if VERSION < v " 1.1"
236287 Base. zero (:: Type{Missing} ) = missing
237288end
238289
239- function (agg:: Aggregate{typeof(var)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
240- means = groupreduce ((x, i) -> x, Base. add_sum, agg. condf, / , false , incol, gd)
290+ function (agg:: Aggregate{typeof(var)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
291+ nthreads:: Integer = 1 )
292+ means = groupreduce ((x, i) -> x, Base. add_sum, agg. condf, / , false ,
293+ incol, gd, nthreads)
241294 # !ismissing check is purely an optimization to avoid a copy later
242295 if eltype (means) >: Missing && agg. condf != = ! ismissing
243296 T = Union{Missing, real (eltype (means))}
@@ -247,32 +300,38 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
247300 res = zeros (T, length (gd))
248301 return groupreduce! (res, (x, i) -> @inbounds (abs2 (x - means[i])), + , agg. condf,
249302 (x, l) -> l <= 1 ? oftype (x / (l- 1 ), NaN ) : x / (l- 1 ),
250- false , incol, gd)
303+ false , incol, gd, nthreads )
251304end
252305
253- function (agg:: Aggregate{typeof(std)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
254- outcol = Aggregate (var, agg. condf)(incol, gd)
306+ function (agg:: Aggregate{typeof(std)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
307+ nthreads:: Integer = 1 )
308+ outcol = Aggregate (var, agg. condf)(incol, gd; nthreads= nthreads)
255309 if eltype (outcol) <: Union{Missing, Rational}
256310 return sqrt .(outcol)
257311 else
258312 return map! (sqrt, outcol, outcol)
259313 end
260314end
261315
262- for f in (first, last)
263- function (agg:: Aggregate{typeof(f)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
264- n = length (gd)
265- outcol = similar (incol, n)
266- fillfirst! (agg. condf, outcol, incol, gd, rev= agg. f === last)
267- if isconcretetype (eltype (outcol))
268- return outcol
269- else
270- return copyto_widen! (Tables. allocatecolumn (typeof (first (outcol)), n), outcol)
316+ for f in (:first , :last )
317+ # Without using @eval the presence of a keyword argument triggers a Julia bug
318+ @eval begin
319+ function (agg:: Aggregate{typeof($f)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
320+ nthreads:: Integer = 1 )
321+ n = length (gd)
322+ outcol = similar (incol, n)
323+ fillfirst! (agg. condf, outcol, incol, gd, rev= agg. f === last)
324+ if isconcretetype (eltype (outcol))
325+ return outcol
326+ else
327+ return copyto_widen! (Tables. allocatecolumn (typeof (first (outcol)), n), outcol)
328+ end
271329 end
272330 end
273331end
274332
275- function (agg:: Aggregate{typeof(length)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
333+ function (agg:: Aggregate{typeof(length)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
334+ nthreads:: Integer = 1 )
276335 if getfield (gd, :idx ) === nothing
277336 lens = zeros (Int, length (gd))
278337 @inbounds for gix in gd. groups
0 commit comments