diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index dc199127..5af05c3b 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -851,7 +851,7 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev if err != nil { return nil, err } - updateDiscoverHostsInConfigMap(newCM, mpiJob, podList) + updateDiscoverHostsInConfigMap(newCM, mpiJob, podList, c.clusterDomain) cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix) // If the ConfigMap doesn't exist, we'll create it. @@ -1333,7 +1333,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32, clusterDomain s } // updateDiscoverHostsInConfigMap updates the ConfigMap if the content of `discover_hosts.sh` changes. -func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflow.MPIJob, runningPods []*corev1.Pod) { +func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflow.MPIJob, runningPods []*corev1.Pod, clusterDomain string) { // Sort the slice of Pods to make sure the order of entries in `discover_hosts.sh` is maintained. sort.Slice(runningPods, func(i, j int) bool { return runningPods[i].Name < runningPods[j].Name @@ -1341,15 +1341,19 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo var buffer bytes.Buffer buffer.WriteString("#!/bin/sh\n") + domainFormat := "%s.%s.%s.svc" + if len(clusterDomain) > 0 { + domainFormat += fmt.Sprintf(".%s", clusterDomain) + } // We don't check if launcher is running here, launcher should always be there or the job failed if runLauncherAsWorker(mpiJob) { name := mpiJob.Name + launcherSuffix - buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace)) + buffer.WriteString(fmt.Sprintf("echo %s\n", fmt.Sprintf(domainFormat, name, mpiJob.Name, mpiJob.Namespace))) } for _, p := range runningPods { - buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, mpiJob.Name, p.Namespace)) + buffer.WriteString(fmt.Sprintf("echo %s\n", fmt.Sprintf(domainFormat, p.Name, mpiJob.Name, p.Namespace))) } configMap.Data[discoverHostsScriptName] = buffer.String() diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index c7724199..ea39f21c 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -546,7 +546,7 @@ func TestAllResourcesCreated(t *testing.T) { scheme.Scheme.Default(mpiJobCopy) f.expectCreateServiceAction(newJobService(mpiJobCopy)) cfgMap := newConfigMap(mpiJobCopy, 5, "") - updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "") f.expectCreateConfigMapAction(cfgMap) secret, err := newSSHAuthSecret(mpiJobCopy) if err != nil { @@ -709,7 +709,7 @@ func TestConfigMapNotControlledByUs(t *testing.T) { f.setUpService(newJobService(mpiJob)) configMap := newConfigMap(mpiJob, replicas, "") - updateDiscoverHostsInConfigMap(configMap, mpiJob, nil) + updateDiscoverHostsInConfigMap(configMap, mpiJob, nil, "") configMap.OwnerReferences = nil f.setUpConfigMap(configMap) @@ -755,7 +755,7 @@ func TestLauncherServiceNotControlledByUs(t *testing.T) { t.Fatalf("Creating SSH auth Secret: %v", err) } f.setUpSecret(secret) - updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "") f.setUpConfigMap(configMap) fmjc := f.newFakeMPIJobController() for i := 0; i < int(replicas); i++ { @@ -778,7 +778,7 @@ func TestSecretNotControlledByUs(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) configMap := newConfigMap(mpiJobCopy, replicas, "") - updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "") f.setUpConfigMap(configMap) f.setUpService(newJobService(mpiJobCopy)) @@ -861,7 +861,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) { scheme.Scheme.Default(mpiJob) f.expectCreateServiceAction(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas, "") - updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "") f.expectCreateConfigMapAction(cfgMap) secret, err := newSSHAuthSecret(mpiJob) if err != nil { @@ -932,7 +932,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) { f.setUpService(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas, "") - updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList, "") f.setUpConfigMap(cfgMap) secret, err := newSSHAuthSecret(mpiJob) if err != nil { @@ -1004,7 +1004,7 @@ func TestResumeMPIJob(t *testing.T) { scheme.Scheme.Default(mpiJob) f.expectCreateServiceAction(newJobService(mpiJob)) cfgMap := newConfigMap(mpiJob, replicas, "") - updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "") f.setUpConfigMap(cfgMap) secret, err := newSSHAuthSecret(mpiJob) if err != nil { @@ -1056,7 +1056,7 @@ func TestWorkerNotControlledByUs(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) configMap := newConfigMap(mpiJobCopy, replicas, "") - updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "") f.setUpConfigMap(configMap) f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) @@ -1087,7 +1087,7 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() scheme.Scheme.Default(mpiJobCopy) configMap := newConfigMap(mpiJobCopy, replicas, "") - updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "") f.setUpConfigMap(configMap) f.setUpService(newJobService(mpiJobCopy)) secret, err := newSSHAuthSecret(mpiJobCopy) @@ -1162,7 +1162,7 @@ func TestLauncherActiveWorkerReady(t *testing.T) { } configMap := newConfigMap(mpiJobCopy, replicas, "") - updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, "") f.setUpConfigMap(configMap) mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ @@ -1216,7 +1216,7 @@ func TestWorkerReady(t *testing.T) { } configMap := newConfigMap(mpiJobCopy, replicas, "") - updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList) + updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, "") f.setUpConfigMap(configMap) expLauncher := fmjc.newLauncherJob(mpiJobCopy) @@ -1981,6 +1981,247 @@ func TestNewConfigMap(t *testing.T) { } } +func TestUpdateDiscoverHostsInConfigMap(t *testing.T) { + testCases := map[string]struct { + mpiJob *kubeflow.MPIJob + runningPods []*corev1.Pod + clusterDomain string + wantConfigMap *corev1.ConfigMap + }{ + "no cluster domain, launcher as worker disabled, no running pods": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + }, + }, + runningPods: []*corev1.Pod{}, + clusterDomain: "", + wantConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-config", + Namespace: "default", + }, + Data: map[string]string{ + "discover_hosts.sh": "#!/bin/sh\n", + }, + }, + }, + "no cluster domain, launcher as worker disabled, with running pods": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + }, + }, + runningPods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-0", + Namespace: "default", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-1", + Namespace: "default", + }, + }, + }, + clusterDomain: "", + wantConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-config", + Namespace: "default", + }, + Data: map[string]string{ + "discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\n", + }, + }, + }, + "no cluster domain, launcher as worker enabled, no running pods": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: ptr.To(true), + }, + }, + runningPods: []*corev1.Pod{}, + clusterDomain: "", + wantConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-config", + Namespace: "default", + }, + Data: map[string]string{ + "discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.default.svc\n", + }, + }, + }, + "no cluster domain, launcher as worker enabled, with running pods": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: ptr.To(true), + }, + }, + runningPods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-0", + Namespace: "default", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-1", + Namespace: "default", + }, + }, + }, + clusterDomain: "", + wantConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-config", + Namespace: "default", + }, + Data: map[string]string{ + "discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.default.svc\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\n", + }, + }, + }, + "with cluster domain, launcher as worker disabled, with running pods": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "tenant-a", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + }, + }, + runningPods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-0", + Namespace: "tenant-a", + }, + }, + }, + clusterDomain: "cluster.local", + wantConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-config", + Namespace: "tenant-a", + }, + Data: map[string]string{ + "discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.tenant-a.svc.cluster.local\n", + }, + }, + }, + "with cluster domain, launcher as worker enabled, with running pods": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "tenant-a", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + RunLauncherAsWorker: ptr.To(true), + }, + }, + runningPods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-0", + Namespace: "tenant-a", + }, + }, + }, + clusterDomain: "cluster.local", + wantConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-config", + Namespace: "tenant-a", + }, + Data: map[string]string{ + "discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.tenant-a.svc.cluster.local\necho test-job-worker-0.test-job.tenant-a.svc.cluster.local\n", + }, + }, + }, + "pods are sorted by name": { + mpiJob: &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Spec: kubeflow.MPIJobSpec{ + MPIImplementation: kubeflow.MPIImplementationOpenMPI, + }, + }, + runningPods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-2", + Namespace: "default", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-0", + Namespace: "default", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-worker-1", + Namespace: "default", + }, + }, + }, + clusterDomain: "", + wantConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job-config", + Namespace: "default", + }, + Data: map[string]string{ + "discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\necho test-job-worker-2.test-job.default.svc\n", + }, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.mpiJob.Name + "-config", + Namespace: tc.mpiJob.Namespace, + }, + Data: make(map[string]string), + } + updateDiscoverHostsInConfigMap(configMap, tc.mpiJob, tc.runningPods, tc.clusterDomain) + if diff := cmp.Diff(tc.wantConfigMap, configMap); len(diff) != 0 { + t.Errorf("Unexpected ConfigMap (-want,+got):\n%s", diff) + } + }) + } +} + func joinEnvVars(evs ...interface{}) []corev1.EnvVar { var result []corev1.EnvVar for _, ev := range evs {