Skip to content

Commit 62cc940

Browse files
committed
datadeps: Make round-robin always consider task scope
1 parent 0d95dba commit 62cc940

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

src/datadeps.jl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
595595
# Populate all task dependencies
596596
populate_task_info!(state, spec, task)
597597

598+
task_scope = @something(spec.options.compute_scope, spec.options.scope, DefaultScope())
598599
scheduler = queue.scheduler
599600
if scheduler == :naive
600601
raw_args = map(arg->tochunk(value(arg)), spec.fargs)
@@ -719,14 +720,24 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
719720
sstate.task_completions[task] = our_space_completed + move_time + task_time
720721
elseif scheduler == :roundrobin
721722
our_proc = all_procs[proc_idx]
723+
if task_scope == scope
724+
# all_procs is already limited to scope
725+
else
726+
if isa(constrain(task_scope, scope), InvalidScope)
727+
throw(Sch.SchedulingException("Scopes are not compatible: $(scope), $(task_scope)"))
728+
end
729+
while !proc_in_scope(our_proc, task_scope)
730+
proc_idx = mod1(proc_idx + 1, length(all_procs))
731+
our_proc = all_procs[proc_idx]
732+
end
733+
end
722734
else
723735
error("Invalid scheduler: $sched")
724736
end
725737
@assert our_proc in all_procs
726738
our_space = only(memory_spaces(our_proc))
727739

728740
# Find the scope for this task (and its copies)
729-
task_scope = @something(spec.options.compute_scope, spec.options.scope, DefaultScope())
730741
if task_scope == scope
731742
# Optimize for the common case, cache the proc=>scope mapping
732743
our_scope = get!(proc_to_scope_lfu, our_proc) do

0 commit comments

Comments
 (0)