Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,10 @@ func (c *MPIJobController) getOrCreateService(job *kubeflow.MPIJob, newSvc *core
}

// If the Service selector is changed, update it.
if !equality.Semantic.DeepEqual(svc.Spec.Selector, newSvc.Spec.Selector) {
if !equality.Semantic.DeepEqual(svc.Spec.Selector, newSvc.Spec.Selector) || svc.Spec.PublishNotReadyAddresses != newSvc.Spec.PublishNotReadyAddresses {
svc = svc.DeepCopy()
svc.Spec.Selector = newSvc.Spec.Selector
svc.Spec.PublishNotReadyAddresses = newSvc.Spec.PublishNotReadyAddresses
return c.kubeClient.CoreV1().Services(svc.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
}

Expand Down Expand Up @@ -1343,10 +1344,10 @@ func newJobService(job *kubeflow.MPIJob) *corev1.Service {
kubeflow.OperatorNameLabel: kubeflow.OperatorName,
kubeflow.JobNameLabel: job.Name,
}
return newService(job, job.Name, labels)
return newService(job, job.Name, labels, ptr.Deref(job.Spec.RunLauncherAsWorker, false))
}

func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *corev1.Service {
func newService(job *kubeflow.MPIJob, name string, selector map[string]string, isRunLauncherAsWorker bool) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -1361,6 +1362,9 @@ func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: selector,
// The publishNotReadyAddresses must be true only for the MPIJob with runLauncherAsWorker
// to avoid deadlock to wait for Launcher is ready.
PublishNotReadyAddresses: isRunLauncherAsWorker,
},
}
}
Expand Down
Loading