Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.podDeletionCost</code></td>
<td>(none)</td>
<td>
Value to apply to the <code>controller.kubernetes.io/pod-deletion-cost</code> annotation
when Spark tells a deployment-based allocator to remove executor pods. Set this to steer
Kubernetes to remove the same pods that Spark selected when the deployment scales down.
</td>
<td>4.2.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.scheduler.name</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -1665,10 +1675,10 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.kubernetes.allocation.pods.allocator</code></td>
<td><code>direct</code></td>
<td>
Allocator to use for pods. Possible values are <code>direct</code> (the default)
and <code>statefulset</code>, or a full class name of a class implementing `AbstractPodsAllocator`.
Future version may add Job or replicaset. This is a developer API and may change
or be removed at anytime.
Allocator to use for pods. Possible values are <code>direct</code> (the default),
<code>statefulset</code>, <code>deployment</code>, or a full class name of a class
implementing `AbstractPodsAllocator`. Future version may add Job or replicaset.
This is a developer API and may change or be removed at anytime.
</td>
<td>3.3.0</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,19 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_POD_DELETION_COST =
ConfigBuilder("spark.kubernetes.executor.podDeletionCost")
.doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can emphasize that this must be set when dynamic allocation is enabled and the deployment-based allocator is enabled.

Also, do we need this configuration at all? can we just set this to some large number such as 100 when deployment-based allocator and dynamic allocation is enabled?

" annotation when Spark asks a deployment-based allocator to remove executor pods. This " +
"helps Kubernetes pick the same pods Spark selected when the deployment scales down.")
.version("4.2.0")
.intConf
.createOptional

val KUBERNETES_ALLOCATION_PODS_ALLOCATOR =
ConfigBuilder("spark.kubernetes.allocation.pods.allocator")
.doc("Allocator to use for pods. Possible values are direct (the default) and statefulset " +
", or a full class name of a class implementing AbstractPodsAllocator. " +
.doc("Allocator to use for pods. Possible values are direct (the default), statefulset," +
" deployment, or a full class name of a class implementing AbstractPodsAllocator. " +
"Future version may add Job or replicaset. This is a developer API and may change " +
"or be removed at anytime.")
.version("3.3.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.features

import java.util.Locale

import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model._
Expand Down Expand Up @@ -115,12 +117,17 @@ private[spark] class BasicExecutorFeatureStep(
// hostname must be no longer than `KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters,
// so take the last 63 characters of the pod name as the hostname.
// This preserves uniqueness since the end of name contains executorId
val hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
var hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
// Remove non-word characters from the start of the hostname
.replaceAll("^[^\\w]+", "")
// Replace dangerous characters in the remaining string with a safe alternative.
.replaceAll("[^\\w-]+", "_")

// Deployment resource does not support capital characters in the hostname
if (kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
hostname = hostname.toLowerCase(Locale.ROOT)
}

val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
val executorCpuQuantity = new Quantity(executorCoresRequest)
val executorResourceQuantities =
Expand Down Expand Up @@ -270,7 +277,7 @@ private[spark] class BasicExecutorFeatureStep(
}

val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
case "statefulset" => "Always"
case "statefulset" | "deployment" => "Always"
case _ => "Never"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder, PodTemplateSpec}
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.util.{Clock, Utils}

/**
* A pods allocator backed by Kubernetes Deployments.
*
* The Deployment controller honours the `controller.kubernetes.io/pod-deletion-cost`
* annotation, so executors selected by Spark for removal can be prioritised when the
* deployment scales down. This provides predictable downscale behaviour for dynamic
* allocation that is not possible with StatefulSets which only remove pods in ordinal order.
*/
class DeploymentPodsAllocator(
conf: SparkConf,
secMgr: SecurityManager,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
clock: Clock) extends AbstractPodsAllocator() with Logging {

private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]

private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)

private val namespace = conf.get(KUBERNETES_NAMESPACE)

private val kubernetesDriverPodName = conf.get(KUBERNETES_DRIVER_POD_NAME)

val driverPod: Option[Pod] = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
s"No pod was found named $name in the cluster in the " +
s"namespace $namespace (this was supposed to be the driver pod.).")))

private var appId: String = _

private val deploymentsCreated = new mutable.HashSet[Int]()

private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost"

override def start(
applicationId: String,
schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
appId = applicationId
driverPod.foreach { pod =>
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
}
}

override def setTotalExpectedExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
if (appId == null) {
throw new SparkException("setTotalExpectedExecutors called before start of allocator.")
}
resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
setTargetExecutorsDeployment(numExecs, appId, rp.id)
}
}

override def isDeleted(executorId: String): Boolean = false

private def setName(applicationId: String, resourceProfileId: Int): String = {
s"spark-d-$applicationId-$resourceProfileId"
}

private def setTargetExecutorsDeployment(
expected: Int,
applicationId: String,
resourceProfileId: Int): Unit = {
if (deploymentsCreated.contains(resourceProfileId)) {
kubernetesClient
.apps()
.deployments()
.inNamespace(namespace)
.withName(setName(applicationId, resourceProfileId))
.scale(expected)
} else {
val executorConf = KubernetesConf.createExecutorConf(
conf,
"EXECID",
applicationId,
driverPod,
resourceProfileId)
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(
executorConf,
secMgr,
kubernetesClient,
rpIdToResourceProfile(resourceProfileId))
val executorPod = resolvedExecutorSpec.pod

val podSpecBuilder = executorPod.pod.getSpec match {
case null => new PodSpecBuilder()
case s => new PodSpecBuilder(s)
}
val podWithAttachedContainer: PodSpec = podSpecBuilder
.addToContainers(executorPod.container)
.build()

val meta = executorPod.pod.getMetadata
val resources = resolvedExecutorSpec.executorKubernetesResources
val failureMessage =
"PersistentVolumeClaims are not supported with the deployment allocator. " +
"Please remove PVC requirements or choose a different pods allocator."
val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim")
if (dynamicVolumeClaims.nonEmpty) {
throw new SparkException(failureMessage)
}
val staticVolumeClaims = Option(podWithAttachedContainer.getVolumes)
.map(_.asScala.filter(_.getPersistentVolumeClaim != null))
.getOrElse(Seq.empty)
if (staticVolumeClaims.nonEmpty) {
throw new SparkException(failureMessage)
}

val currentAnnotations = Option(meta.getAnnotations)
.map(_.asScala).getOrElse(Map.empty[String, String])
if (!currentAnnotations.contains(podDeletionCostAnnotation)) {
val newAnnotations = currentAnnotations.concat(Seq(podDeletionCostAnnotation -> "0"))
meta.setAnnotations(newAnnotations.asJava)
}

val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer)

val deployment = new DeploymentBuilder()
.withNewMetadata()
.withName(setName(applicationId, resourceProfileId))
.withNamespace(namespace)
.endMetadata()
.withNewSpec()
.withReplicas(expected)
.withNewSelector()
.addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
.addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfileId.toString)
.endSelector()
.withTemplate(podTemplateSpec)
.endSpec()
.build()

addOwnerReference(driverPod.get, Seq(deployment))
kubernetesClient.apps().deployments().inNamespace(namespace).resource(deployment).create()
deploymentsCreated += resourceProfileId
}
}

override def stop(applicationId: String): Unit = {
deploymentsCreated.foreach { rpid =>
Utils.tryLogNonFatalError {
kubernetesClient
.apps()
.deployments()
.inNamespace(namespace)
.withName(setName(applicationId, rpid))
.delete()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex}
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkMasterRegex}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
Expand Down Expand Up @@ -160,9 +160,19 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore) = {
val executorPodsAllocatorName = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) &&
sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) {
throw new SparkException(
s"Dynamic allocation with the deployment pods allocator requires " +
s"'${KUBERNETES_EXECUTOR_POD_DELETION_COST.key}' to be configured.")
}

val executorPodsAllocatorName = allocator match {
case "statefulset" =>
classOf[StatefulSetPodsAllocator].getName
case "deployment" =>
classOf[DeploymentPodsAllocator].getName
case "direct" =>
classOf[ExecutorPodsAllocator].getName
case fullClass =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ private[spark] class KubernetesClusterSchedulerBackend(

private val namespace = conf.get(KUBERNETES_NAMESPACE)

// KEP 2255: When a Deployment or Replicaset is scaled down, the pods will be deleted in the
// order of the value of this annotation, ascending.
private val podDeletionCostAnnotation = "controller.kubernetes.io/pod-deletion-cost"

// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
Expand Down Expand Up @@ -195,6 +199,31 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.getExecutorIds()
}

private def annotateExecutorDeletionCost(execIds: Seq[String]): Unit = {
conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).foreach { cost =>
logInfo(s"Annotating executor pod(s) ${execIds.mkString(",")} with deletion cost $cost")
val annotateTask = new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
.resources()
.forEach { podResource =>
podResource.edit({ p: Pod =>
new PodBuilder(p).editOrNewMetadata()
.addToAnnotations(podDeletionCostAnnotation, cost.toString)
.endMetadata()
.build()})
}
}
}
executorService.execute(annotateTask)
}
}

private def labelDecommissioningExecs(execIds: Seq[String]) = {
// Only kick off the labeling task if we have a label.
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
Expand Down Expand Up @@ -228,13 +257,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
// picked the pod to evict so we don't need to update the labels.
if (!triggeredByExecutor) {
labelDecommissioningExecs(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
annotateExecutorDeletionCost(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
}
}
super.decommissionExecutors(executorsAndDecomInfo, adjustTargetNumExecutors,
triggeredByExecutor)
}

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
// If we've decided to remove some executors we should tell Kubernetes that we don't care.
if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
annotateExecutorDeletionCost(executorIds)
}
labelDecommissioningExecs(executorIds)

// Tell the executors to exit themselves.
Expand Down
Loading