1111import kubernetes_asyncio as kubernetes
1212from importlib_metadata import entry_points
1313from kubernetes_asyncio .client import ApiException
14+ from kr8s .asyncio .objects import APIObject
1415
1516from dask_kubernetes .common .auth import ClusterAuth
1617from dask_kubernetes .common .networking import get_scheduler_address
17- from dask_kubernetes .aiopykube import HTTPClient , KubeConfig
18- from dask_kubernetes .aiopykube .dask import DaskCluster
1918from distributed .core import rpc , clean_exception
2019from distributed .protocol .pickle import dumps
2120
@@ -40,6 +39,45 @@ class SchedulerCommError(Exception):
4039 """Raised when unable to communicate with a scheduler."""
4140
4241
42+ class DaskCluster (APIObject ):
43+ version = "kubernetes.dask.org/v1"
44+ endpoint = "daskclusters"
45+ kind = "DaskCluster"
46+ plural = "daskclusters"
47+ singular = "daskcluster"
48+ namespaced = True
49+ scalable = True
50+ scalable_spec = "worker.replicas"
51+
52+
53+ class DaskWorkerGroup (APIObject ):
54+ version = "kubernetes.dask.org/v1"
55+ endpoint = "daskworkergroups"
56+ kind = "DaskWorkerGroup"
57+ plural = "daskworkergroups"
58+ singular = "daskworkergroup"
59+ namespaced = True
60+ scalable = True
61+
62+
63+ class DaskAutoscaler (APIObject ):
64+ version = "kubernetes.dask.org/v1"
65+ endpoint = "daskautoscalers"
66+ kind = "DaskAutoscaler"
67+ plural = "daskautoscalers"
68+ singular = "daskautoscaler"
69+ namespaced = True
70+
71+
72+ class DaskJob (APIObject ):
73+ version = "kubernetes.dask.org/v1"
74+ endpoint = "daskjobs"
75+ kind = "DaskJob"
76+ plural = "daskjobs"
77+ singular = "daskjob"
78+ namespaced = True
79+
80+
4381def _get_annotations (meta ):
4482 return {
4583 annotation_key : annotation_value
@@ -347,10 +385,8 @@ async def handle_scheduler_service_status(
347385 # Otherwise mark it as Running
348386 else :
349387 phase = "Running"
350-
351- api = HTTPClient (KubeConfig .from_env ())
352- cluster = await DaskCluster .objects (api , namespace = namespace ).get_by_name (
353- labels ["dask.org/cluster-name" ]
388+ cluster = await DaskCluster .get (
389+ labels ["dask.org/cluster-name" ], namespace = namespace
354390 )
355391 await cluster .patch ({"status" : {"phase" : phase }})
356392
@@ -986,8 +1022,5 @@ async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs):
9861022 logger .warn ("Unable to connect to scheduler, skipping autoshutdown check." )
9871023 return
9881024 if idle_since and time .time () > idle_since + spec ["idleTimeout" ]:
989- api = HTTPClient (KubeConfig .from_env ())
990- cluster = await DaskCluster .objects (api , namespace = namespace ).get_by_name (
991- name
992- )
1025+ cluster = await DaskCluster .get (name , namespace = namespace )
9931026 await cluster .delete ()
0 commit comments