Skip to content

Commit 6721181

Browse files
authored
Merge pull request #660 from JuliaParallel/jps/darray-undef
DArray: Add undef constructor, and some adjustments
2 parents b077c84 + 62cc940 commit 6721181

File tree

7 files changed

+162
-70
lines changed

7 files changed

+162
-70
lines changed

docs/src/darray.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,12 @@ it's missing an operation that you need, please file an issue!
672672
673673
This list is not exhaustive, but documents operations which are known to work well with the `DArray`:
674674
675+
Allocation with:
676+
- `undef`
677+
- `rand`/`randn`
678+
- `sprand`
679+
- `ones`/`zeros`
680+
675681
From `Base`:
676682
- `getindex`/`setindex!`
677683
- Broadcasting

src/array/alloc.jl

Lines changed: 69 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import Base: cat
2-
import Random: MersenneTwister
2+
import Random: MersenneTwister, rand!, randn!
33
export partition
44

55
mutable struct AllocateArray{T,N} <: ArrayOp{T,N}
@@ -79,53 +79,64 @@ function allocate_array(f, T, sz)
7979
return new_f(T, sz)
8080
end
8181
allocate_array_func(::Processor, f) = f
82-
function stage(ctx, a::AllocateArray)
83-
chunks = map(CartesianIndices(a.domainchunks)) do I
84-
x = a.domainchunks[I]
85-
i = LinearIndices(a.domainchunks)[I]
86-
args = a.want_index ? (i, size(x)) : (size(x),)
87-
88-
if isnothing(a.procgrid)
89-
scope = get_compute_scope()
90-
else
91-
scope = ExactScope(a.procgrid[CartesianIndex(mod1.(Tuple(I), size(a.procgrid))...)])
92-
end
93-
if a.want_index
94-
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, i, args...)
95-
else
96-
Dagger.@spawn compute_scope=scope allocate_array(a.f, a.eltype, args...)
82+
function stage(ctx, A::AllocateArray)
83+
tasks = Array{DTask,ndims(A.domainchunks)}(undef, size(A.domainchunks)...)
84+
Dagger.spawn_datadeps() do
85+
default_scope = get_compute_scope()
86+
for I in CartesianIndices(A.domainchunks)
87+
x = A.domainchunks[I]
88+
i = LinearIndices(A.domainchunks)[I]
89+
90+
if isnothing(A.procgrid)
91+
scope = default_scope
92+
else
93+
scope = ExactScope(A.procgrid[CartesianIndex(mod1.(Tuple(I), size(A.procgrid))...)])
94+
end
95+
96+
if A.want_index
97+
task = Dagger.@spawn compute_scope=scope allocate_array(A.f, A.eltype, i, size(x))
98+
else
99+
task = Dagger.@spawn compute_scope=scope allocate_array(A.f, A.eltype, size(x))
100+
end
101+
tasks[i] = task
97102
end
98103
end
99-
return DArray(a.eltype, a.domain, a.domainchunks, chunks, a.partitioning)
104+
return DArray(A.eltype, A.domain, A.domainchunks, tasks, A.partitioning)
100105
end
101106

102107
const BlocksOrAuto = Union{Blocks{N} where N, AutoBlocks}
103108

104-
function Base.rand(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
109+
function Base.rand(p::Blocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
105110
d = ArrayDomain(map(x->1:x, dims))
106-
a = AllocateArray(eltype, rand, false, d, partition(p, d), p, assignment)
111+
a = AllocateArray(T, rand, false, d, partition(p, d), p, assignment)
107112
return _to_darray(a)
108113
end
109-
Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, T, dims; assignment)
110-
Base.rand(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment)
111-
Base.rand(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = rand(p, Float64, dims; assignment)
112-
Base.rand(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
113-
rand(auto_blocks(dims), eltype, dims; assignment)
114-
115-
function Base.randn(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
114+
Base.rand(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) =
115+
rand(p, T, dims; assignment)
116+
Base.rand(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) =
117+
rand(p, Float64, dims; assignment)
118+
Base.rand(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) =
119+
rand(p, Float64, dims; assignment)
120+
Base.rand(::AutoBlocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
121+
rand(auto_blocks(dims), T, dims; assignment)
122+
123+
function Base.randn(p::Blocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
116124
d = ArrayDomain(map(x->1:x, dims))
117-
a = AllocateArray(eltype, randn, false, d, partition(p, d), p, assignment)
125+
a = AllocateArray(T, randn, false, d, partition(p, d), p, assignment)
118126
return _to_darray(a)
119127
end
120-
Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, T, dims; assignment)
121-
Base.randn(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment)
122-
Base.randn(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = randn(p, Float64, dims; assignment)
123-
Base.randn(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
124-
randn(auto_blocks(dims), eltype, dims; assignment)
125-
126-
function sprand(p::Blocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary)
128+
Base.randn(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) =
129+
randn(p, T, dims; assignment)
130+
Base.randn(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) =
131+
randn(p, Float64, dims; assignment)
132+
Base.randn(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) =
133+
randn(p, Float64, dims; assignment)
134+
Base.randn(::AutoBlocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
135+
randn(auto_blocks(dims), T, dims; assignment)
136+
137+
function sprand(p::Blocks, T::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary)
127138
d = ArrayDomain(map(x->1:x, dims))
128-
a = AllocateArray(eltype, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p, assignment)
139+
a = AllocateArray(T, (T, _dims) -> sprand(T, _dims..., sparsity), false, d, partition(p, d), p, assignment)
129140
return _to_darray(a)
130141
end
131142
sprand(p::BlocksOrAuto, T::Type, dims_and_sparsity::Real...; assignment::AssignmentType = :arbitrary) =
@@ -134,30 +145,36 @@ sprand(p::BlocksOrAuto, dims_and_sparsity::Real...; assignment::AssignmentType =
134145
sprand(p, Float64, dims_and_sparsity[1:end-1], dims_and_sparsity[end]; assignment)
135146
sprand(p::BlocksOrAuto, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
136147
sprand(p, Float64, dims, sparsity; assignment)
137-
sprand(::AutoBlocks, eltype::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
138-
sprand(auto_blocks(dims), eltype, dims, sparsity; assignment)
148+
sprand(::AutoBlocks, T::Type, dims::Dims, sparsity::AbstractFloat; assignment::AssignmentType = :arbitrary) =
149+
sprand(auto_blocks(dims), T, dims, sparsity; assignment)
139150

140-
function Base.ones(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
151+
function Base.ones(p::Blocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
141152
d = ArrayDomain(map(x->1:x, dims))
142-
a = AllocateArray(eltype, ones, false, d, partition(p, d), p, assignment)
153+
a = AllocateArray(T, ones, false, d, partition(p, d), p, assignment)
143154
return _to_darray(a)
144155
end
145-
Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, T, dims; assignment)
146-
Base.ones(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment)
147-
Base.ones(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = ones(p, Float64, dims; assignment)
148-
Base.ones(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
149-
ones(auto_blocks(dims), eltype, dims; assignment)
150-
151-
function Base.zeros(p::Blocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
156+
Base.ones(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) =
157+
ones(p, T, dims; assignment)
158+
Base.ones(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) =
159+
ones(p, Float64, dims; assignment)
160+
Base.ones(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) =
161+
ones(p, Float64, dims; assignment)
162+
Base.ones(::AutoBlocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
163+
ones(auto_blocks(dims), T, dims; assignment)
164+
165+
function Base.zeros(p::Blocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary)
152166
d = ArrayDomain(map(x->1:x, dims))
153-
a = AllocateArray(eltype, zeros, false, d, partition(p, d), p, assignment)
167+
a = AllocateArray(T, zeros, false, d, partition(p, d), p, assignment)
154168
return _to_darray(a)
155169
end
156-
Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, T, dims; assignment)
157-
Base.zeros(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment)
158-
Base.zeros(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) = zeros(p, Float64, dims; assignment)
159-
Base.zeros(::AutoBlocks, eltype::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
160-
zeros(auto_blocks(dims), eltype, dims; assignment)
170+
Base.zeros(p::BlocksOrAuto, T::Type, dims::Integer...; assignment::AssignmentType = :arbitrary) =
171+
zeros(p, T, dims; assignment)
172+
Base.zeros(p::BlocksOrAuto, dims::Integer...; assignment::AssignmentType = :arbitrary) =
173+
zeros(p, Float64, dims; assignment)
174+
Base.zeros(p::BlocksOrAuto, dims::Dims; assignment::AssignmentType = :arbitrary) =
175+
zeros(p, Float64, dims; assignment)
176+
Base.zeros(::AutoBlocks, T::Type, dims::Dims; assignment::AssignmentType = :arbitrary) =
177+
zeros(auto_blocks(dims), T, dims; assignment)
161178

162179
function Base.zero(x::DArray{T,N}) where {T,N}
163180
dims = ntuple(i->x.domain.indexes[i].stop, N)

src/array/darray.jl

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ function Base.collect(d::DArray{T,N}; tree=false, copyto=false) where {T,N}
194194
if tree
195195
collect(fetch(treereduce_nd(map(x -> ((args...,) -> Dagger.@spawn x(args...)) , dimcatfuncs), a.chunks)))
196196
else
197-
treereduce_nd(dimcatfuncs, asyncmap(fetch, a.chunks))
197+
collect(treereduce_nd(dimcatfuncs, asyncmap(fetch, a.chunks)))
198198
end
199199
end
200200
Array{T,N}(A::DArray{S,N}) where {T,N,S} = convert(Array{T,N}, collect(A))
@@ -316,14 +316,8 @@ function Base.isequal(x::ArrayOp, y::ArrayOp)
316316
x === y
317317
end
318318

319-
struct AllocateUndef{S} end
320-
(::AllocateUndef{S})(T, dims::Dims{N}) where {S,N} = Array{S,N}(undef, dims)
321-
function Base.similar(A::DArray{T,N} where T, ::Type{S}, dims::Dims{N}) where {S,N}
322-
d = ArrayDomain(map(x->1:x, dims))
323-
p = A.partitioning
324-
a = AllocateArray(S, AllocateUndef{S}(), false, d, partition(p, d), p)
325-
return _to_darray(a)
326-
end
319+
Base.similar(D::DArray{T,N} where T, ::Type{S}, dims::Dims{N}) where {S,N} =
320+
DArray{S,N}(undef, D.partitioning, dims)
327321

328322
Base.copy(x::DArray{T,N,B,F}) where {T,N,B,F} =
329323
map(identity, x)::DArray{T,N,B,F}
@@ -570,6 +564,30 @@ DVector(A::AbstractVector{T}, ::AutoBlocks, assignment::AssignmentType{1} = :arb
570564
DMatrix(A::AbstractMatrix{T}, ::AutoBlocks, assignment::AssignmentType{2} = :arbitrary) where T = DMatrix(A, auto_blocks(A), assignment)
571565
DArray(A::AbstractArray, ::AutoBlocks, assignment::AssignmentType = :arbitrary) = DArray(A, auto_blocks(A), assignment)
572566

567+
struct AllocateUndef{S} end
568+
(::AllocateUndef{S})(T, dims::Dims{N}) where {S,N} = Array{S,N}(undef, dims)
569+
function DArray{T,N}(::UndefInitializer, dist::Blocks{N}, dims::NTuple{N,Int}; assignment::AssignmentType{N} = :arbitrary) where {T,N}
570+
domain = ArrayDomain(map(x->1:x, dims))
571+
subdomains = partition(dist, domain)
572+
a = AllocateArray(T, AllocateUndef{T}(), false, domain, subdomains, dist, assignment)
573+
return _to_darray(a)
574+
end
575+
DArray{T,N}(::UndefInitializer, dist::Blocks{N}, dims::Vararg{Int,N}; assignment::AssignmentType{N} = :arbitrary) where {T,N} =
576+
DArray{T,N}(undef, dist, (dims...,); assignment)
577+
DArray{T,N}(::UndefInitializer, dims::NTuple{N,Int}; assignment::AssignmentType{N} = :arbitrary) where {T,N} =
578+
DArray{T,N}(undef, auto_blocks(dims), dims; assignment)
579+
DArray{T,N}(::UndefInitializer, dims::Vararg{Int,N}; assignment::AssignmentType{N} = :arbitrary) where {T,N} =
580+
DArray{T,N}(undef, auto_blocks((dims...,)), (dims...,); assignment)
581+
582+
DArray{T}(::UndefInitializer, dist::Blocks{N}, dims::NTuple{N,Int}; assignment::AssignmentType{N} = :arbitrary) where {T,N} =
583+
DArray{T,N}(undef, dist, dims; assignment)
584+
DArray{T}(::UndefInitializer, dist::Blocks{N}, dims::Vararg{Int,N}; assignment::AssignmentType{N} = :arbitrary) where {T,N} =
585+
DArray{T,N}(undef, dist, (dims...,); assignment)
586+
DArray{T}(::UndefInitializer, dims::NTuple{N,Int}; assignment::AssignmentType{N} = :arbitrary) where {T,N} =
587+
DArray{T,N}(undef, auto_blocks(dims), dims; assignment)
588+
DArray{T}(::UndefInitializer, dims::Vararg{Int,N}; assignment::AssignmentType{N} = :arbitrary) where {T,N} =
589+
DArray{T,N}(undef, auto_blocks((dims...,)), (dims...,); assignment)
590+
573591
function Base.:(==)(x::ArrayOp{T,N}, y::AbstractArray{S,N}) where {T,S,N}
574592
collect(x) == y
575593
end

src/array/map-reduce.jl

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,18 @@ function stage(ctx::Context, r::MapReduce{T,N}) where {T,N}
6161

6262
# Tree-reduce intermediate reductions
6363
dims_materialized = dims === Colon() ? ntuple(identity, ndims(inp)) : dims
64-
treered_f(op, x, y) = op.(x, y)
64+
function to_array(x, N)
65+
A = Array{typeof(x),N}(undef, ntuple(i->1, N))
66+
A[1] = x
67+
return A
68+
end
69+
to_array(x::Array, N) = x
70+
function treered_f(op, x, y, N)
71+
value = op.(x, y)
72+
return to_array(value, N)
73+
end
6574
thunks = treereducedim(reduced_parts, dims_materialized) do x, y
66-
Dagger.@spawn treered_f(r.op_outer, x, y)
75+
Dagger.@spawn treered_f(r.op_outer, x, y, length(dims_materialized))
6776
end
6877

6978
c = domainchunks(inp)
@@ -86,7 +95,7 @@ _mapreduce_maybesync(f, op_inner, op_outer, x, ::Colon, init) =
8695
_mapreduce_maybesync(f, op_inner, op_outer, x, nothing, init)
8796
function _mapreduce_maybesync(f, op_inner, op_outer, x::DArray{T,N}, dims::Nothing, init) where {T,N}
8897
Dx = _to_darray(MapReduce(f, op_inner, op_outer, x, dims, init))
89-
return collect(Dx)
98+
return only(collect(Dx))
9099
end
91100

92101
function Base.size(r::MapReduce)

src/datadeps.jl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
595595
# Populate all task dependencies
596596
populate_task_info!(state, spec, task)
597597

598+
task_scope = @something(spec.options.compute_scope, spec.options.scope, DefaultScope())
598599
scheduler = queue.scheduler
599600
if scheduler == :naive
600601
raw_args = map(arg->tochunk(value(arg)), spec.fargs)
@@ -719,14 +720,24 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
719720
sstate.task_completions[task] = our_space_completed + move_time + task_time
720721
elseif scheduler == :roundrobin
721722
our_proc = all_procs[proc_idx]
723+
if task_scope == scope
724+
# all_procs is already limited to scope
725+
else
726+
if isa(constrain(task_scope, scope), InvalidScope)
727+
throw(Sch.SchedulingException("Scopes are not compatible: $(scope), $(task_scope)"))
728+
end
729+
while !proc_in_scope(our_proc, task_scope)
730+
proc_idx = mod1(proc_idx + 1, length(all_procs))
731+
our_proc = all_procs[proc_idx]
732+
end
733+
end
722734
else
723735
error("Invalid scheduler: $sched")
724736
end
725737
@assert our_proc in all_procs
726738
our_space = only(memory_spaces(our_proc))
727739

728740
# Find the scope for this task (and its copies)
729-
task_scope = @something(spec.options.compute_scope, spec.options.scope, DefaultScope())
730741
if task_scope == scope
731742
# Optimize for the common case, cache the proc=>scope mapping
732743
our_scope = get!(proc_to_scope_lfu, our_proc) do

src/thunk.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ wrap_weak(t::WeakThunk) = t
247247
wrap_weak(t) = t
248248
isweak(t::WeakThunk) = true
249249
isweak(t::Thunk) = false
250-
isweak(t) = true
250+
isweak(t) = false
251251
Base.show(io::IO, t::WeakThunk) = (print(io, "~"); Base.show(io, t.x.value))
252252
Base.convert(::Type{WeakThunk}, t::Thunk) = WeakThunk(t)
253253
chunktype(t::WeakThunk) = chunktype(unwrap_weak_checked(t))

test/array/allocation.jl

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,35 @@
3131
end
3232
end
3333

34+
@testset "undef" begin
35+
for T in [Float32, Float64, Int32, Int64]
36+
for dims in [(),
37+
(100,),
38+
(100, 100),
39+
(100, 100, 100)]
40+
# Automatic blocksize
41+
for DA in [DArray{T}(undef, dims; assignment=:arbitrary),
42+
DArray{T}(undef, dims...; assignment=:arbitrary)]
43+
@test DA isa DArray{T,length(dims)}
44+
A = collect(DA)
45+
@test eltype(DA) == eltype(A) == T
46+
@test size(DA) == size(A) == dims
47+
end
48+
49+
# Manual blocksize
50+
dist = Blocks(ntuple(i->10, length(dims))...)
51+
for DA in [DArray{T}(undef, dist, dims; assignment=:arbitrary),
52+
DArray{T}(undef, dist, dims...; assignment=:arbitrary)]
53+
@test DA isa DArray{T,length(dims)}
54+
A = collect(DA)
55+
@test eltype(DA) == eltype(A) == T
56+
@test size(DA) == size(A) == dims
57+
@test DA.partitioning == dist
58+
end
59+
end
60+
end
61+
end
62+
3463
@testset "random" begin
3564
for T in [Float32, Float64, Int32, Int64]
3665
for dims in [(),
@@ -68,9 +97,11 @@ end
6897
Xsp = sprand(dist, T, dims..., 0.1)
6998
@test Xsp isa DArray{T,length(dims)}
7099
@test size(Xsp) == dims
71-
AXsp = collect(Xsp)
72100
AT = length(dims) == 2 ? SparseMatrixCSC : SparseVector
73-
@test AXsp isa AT{T}
101+
Ach = fetch(Xsp.chunks[1])
102+
@test Ach isa AT{T}
103+
AXsp = collect(Xsp)
104+
@test AXsp isa Array{T,length(dims)}
74105
@test AXsp == collect(Xsp)
75106
@test AXsp != collect(sprand(dist, T, dims..., 0.1))
76107
@test !allunique(AXsp)
@@ -183,7 +214,7 @@ end
183214
for dist in [Blocks(ntuple(i->10, length(dims))...),
184215
AutoBlocks()]
185216
if fn === sprand
186-
if length(dims) > 2
217+
if length(dims) > 2 || length(dims) == 0
187218
continue
188219
end
189220
@test fn(dist, dims..., 0.1) isa DArray{Float64,length(dims)}
@@ -651,4 +682,4 @@ end
651682
@test typeof(X1) === typeof(X2) === typeof(X3)
652683
@test collect(X1) == collect(X2)
653684
@test collect(X1) != collect(X3)
654-
end
685+
end

0 commit comments

Comments
 (0)