@@ -32,6 +32,7 @@ import (
3232 "github.com/prometheus/client_golang/prometheus"
3333 "github.com/prometheus/client_golang/prometheus/promauto"
3434 "golang.org/x/crypto/ssh"
35+ "golang.org/x/time/rate"
3536 batchv1 "k8s.io/api/batch/v1"
3637 corev1 "k8s.io/api/core/v1"
3738 "k8s.io/apimachinery/pkg/api/equality"
@@ -117,7 +118,10 @@ const (
117118)
118119
119120var (
120- mpiJobsCreatedCount = promauto .NewCounter (prometheus.CounterOpts {
121+ //exponential workqueue rate limiting config
122+ workqueueExponentialBaseDelay = 5 * time .Millisecond
123+ workqueueExponentialMaxDelay = 1000 * time .Second
124+ mpiJobsCreatedCount = promauto .NewCounter (prometheus.CounterOpts {
121125 Name : "mpi_operator_jobs_created_total" ,
122126 Help : "Counts number of MPI jobs created" ,
123127 })
@@ -252,6 +256,9 @@ type MPIJobController struct {
252256 // To allow injection of updateStatus for testing.
253257 updateStatusHandler func (mpijob * kubeflow.MPIJob ) error
254258
259+ // clusterDomain is the FQDN for the HostFile.
260+ clusterDomain string
261+
255262 // Clock for internal use of unit-testing
256263 clock clock.WithTicker
257264}
@@ -269,11 +276,10 @@ func NewMPIJobController(
269276 podInformer coreinformers.PodInformer ,
270277 priorityClassInformer schedulinginformers.PriorityClassInformer ,
271278 mpiJobInformer informers.MPIJobInformer ,
272- namespace , gangSchedulingName string ,
273- workqueueRateLimiter workqueue.TypedRateLimiter [any ]) (* MPIJobController , error ) {
279+ opt * options.ServerOption ) (* MPIJobController , error ) {
274280 return NewMPIJobControllerWithClock (kubeClient , kubeflowClient , volcanoClient , schedClient ,
275281 configMapInformer , secretInformer , serviceInformer , jobInformer , podInformer ,
276- priorityClassInformer , mpiJobInformer , & clock.RealClock {}, namespace , gangSchedulingName , workqueueRateLimiter )
282+ priorityClassInformer , mpiJobInformer , & clock.RealClock {}, opt )
277283}
278284
279285// NewMPIJobControllerWithClock returns a new MPIJob controller.
@@ -289,9 +295,7 @@ func NewMPIJobControllerWithClock(
289295 podInformer coreinformers.PodInformer ,
290296 priorityClassInformer schedulinginformers.PriorityClassInformer ,
291297 mpiJobInformer informers.MPIJobInformer ,
292- clock clock.WithTicker ,
293- namespace , gangSchedulingName string ,
294- workqueueRateLimiter workqueue.TypedRateLimiter [any ]) (* MPIJobController , error ) {
298+ clock clock.WithTicker , opt * options.ServerOption ) (* MPIJobController , error ) {
295299
296300 // Create event broadcaster.
297301 klog .V (4 ).Info ("Creating event broadcaster" )
@@ -309,11 +313,11 @@ func NewMPIJobControllerWithClock(
309313 )
310314 priorityClassLister = priorityClassInformer .Lister ()
311315 priorityClassSynced = priorityClassInformer .Informer ().HasSynced
312- if gangSchedulingName == options .GangSchedulerVolcano {
313- podGroupCtrl = NewVolcanoCtrl (volcanoClient , namespace , priorityClassLister )
314- } else if len (gangSchedulingName ) != 0 {
316+ if opt . GangSchedulingName == options .GangSchedulerVolcano {
317+ podGroupCtrl = NewVolcanoCtrl (volcanoClient , opt . Namespace , priorityClassLister )
318+ } else if len (opt . GangSchedulingName ) != 0 {
315319 // Use scheduler-plugins as a default gang-scheduler.
316- podGroupCtrl = NewSchedulerPluginsCtrl (schedClient , namespace , gangSchedulingName , priorityClassLister )
320+ podGroupCtrl = NewSchedulerPluginsCtrl (schedClient , opt . Namespace , opt . GangSchedulingName , priorityClassLister )
317321 }
318322 if podGroupCtrl != nil {
319323 podGroupSynced = podGroupCtrl .PodGroupSharedIndexInformer ().HasSynced
@@ -338,9 +342,15 @@ func NewMPIJobControllerWithClock(
338342 priorityClassSynced : priorityClassSynced ,
339343 mpiJobLister : mpiJobInformer .Lister (),
340344 mpiJobSynced : mpiJobInformer .Informer ().HasSynced ,
341- queue : workqueue .NewTypedRateLimitingQueueWithConfig (workqueueRateLimiter , workqueue.TypedRateLimitingQueueConfig [any ]{Name : "MPIJob" }),
342- recorder : recorder ,
343- clock : clock ,
345+ queue : workqueue .NewTypedRateLimitingQueueWithConfig (
346+ workqueue .NewTypedMaxOfRateLimiter (
347+ workqueue .NewTypedItemExponentialFailureRateLimiter [any ](workqueueExponentialBaseDelay , workqueueExponentialMaxDelay ),
348+ & workqueue.TypedBucketRateLimiter [any ]{Limiter : rate .NewLimiter (rate .Limit (opt .ControllerRateLimit ), opt .ControllerBurst )},
349+ ),
350+ workqueue.TypedRateLimitingQueueConfig [any ]{Name : "MPIJob" },
351+ ),
352+ recorder : recorder ,
353+ clock : clock ,
344354 }
345355
346356 controller .updateStatusHandler = controller .doUpdateJobStatus
@@ -833,7 +843,7 @@ func (c *MPIJobController) countReadyWorkerPods(workers []*corev1.Pod) int {
833843// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates
834844// one if it doesn't exist.
835845func (c * MPIJobController ) getOrCreateConfigMap (mpiJob * kubeflow.MPIJob ) (* corev1.ConfigMap , error ) {
836- newCM := newConfigMap (mpiJob , workerReplicas (mpiJob ))
846+ newCM := newConfigMap (mpiJob , workerReplicas (mpiJob ), c . clusterDomain )
837847 podList , err := c .getRunningWorkerPods (mpiJob )
838848 if err != nil {
839849 return nil , err
@@ -1272,29 +1282,33 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error {
12721282// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
12731283// resource. It also sets the appropriate OwnerReferences on the resource so
12741284// handleObject can discover the MPIJob resource that 'owns' it.
1275- func newConfigMap (mpiJob * kubeflow.MPIJob , workerReplicas int32 ) * corev1.ConfigMap {
1285+ func newConfigMap (mpiJob * kubeflow.MPIJob , workerReplicas int32 , clusterDomain string ) * corev1.ConfigMap {
12761286 var buffer bytes.Buffer
12771287 slots := ptr .Deref (mpiJob .Spec .SlotsPerWorker , 1 )
1288+ domainFormat := "%s.%s.%s.svc"
1289+ if len (clusterDomain ) > 0 {
1290+ domainFormat += fmt .Sprintf (".%s" , clusterDomain )
1291+ }
12781292 // note that pod.spec.dnsConfig also affect the svc resolution
12791293 // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
12801294 // launcher can be reach with hostname or service name
12811295 if runLauncherAsWorker (mpiJob ) {
12821296 name := mpiJob .Name + launcherSuffix
12831297 switch mpiJob .Spec .MPIImplementation {
12841298 case kubeflow .MPIImplementationOpenMPI :
1285- buffer .WriteString (fmt .Sprintf ("%s.%s.%s.svc slots=%d\n " , name , mpiJob .Name , mpiJob .Namespace , slots ))
1299+ buffer .WriteString (fmt .Sprintf ("%s slots=%d\n " , fmt . Sprintf ( domainFormat , name , mpiJob .Name , mpiJob .Namespace ) , slots ))
12861300 case kubeflow .MPIImplementationIntel , kubeflow .MPIImplementationMPICH :
1287- buffer .WriteString (fmt .Sprintf ("%s.%s.%s.svc :%d\n " , name , mpiJob .Name , mpiJob .Namespace , slots ))
1301+ buffer .WriteString (fmt .Sprintf ("%s:%d\n " , fmt . Sprintf ( domainFormat , name , mpiJob .Name , mpiJob .Namespace ) , slots ))
12881302 }
12891303 }
12901304
12911305 for i := 0 ; i < int (workerReplicas ); i ++ {
12921306 name := workerName (mpiJob , i )
12931307 switch mpiJob .Spec .MPIImplementation {
12941308 case kubeflow .MPIImplementationOpenMPI :
1295- buffer .WriteString (fmt .Sprintf ("%s.%s.%s.svc slots=%d\n " , name , mpiJob .Name , mpiJob .Namespace , slots ))
1309+ buffer .WriteString (fmt .Sprintf ("%s slots=%d\n " , fmt . Sprintf ( domainFormat , name , mpiJob .Name , mpiJob .Namespace ) , slots ))
12961310 case kubeflow .MPIImplementationIntel , kubeflow .MPIImplementationMPICH :
1297- buffer .WriteString (fmt .Sprintf ("%s.%s.%s.svc :%d\n " , name , mpiJob .Name , mpiJob .Namespace , slots ))
1311+ buffer .WriteString (fmt .Sprintf ("%s:%d\n " , fmt . Sprintf ( domainFormat , name , mpiJob .Name , mpiJob .Namespace ) , slots ))
12981312 }
12991313 }
13001314
0 commit comments