Skip to content

Commit 4c74758

Browse files
Stop & Resume schedule on Kubernetes
1 parent b0c40ad commit 4c74758

File tree

5 files changed

+65
-4
lines changed

5 files changed

+65
-4
lines changed

src/zenml/cli/pipeline.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,23 +677,64 @@ def list_schedules(
677677
required=False,
678678
help="The cron expression to update the schedule with.",
679679
)
680+
@click.option(
681+
"--activate",
682+
"-a",
683+
type=bool,
684+
required=False,
685+
is_flag=True,
686+
help="Activate a pipeline schedule.",
687+
)
688+
@click.option(
689+
"--deactivate",
690+
"-d",
691+
type=bool,
692+
required=False,
693+
is_flag=True,
694+
help="Deactivate a pipeline schedule.",
695+
)
680696
def update_schedule(
681-
schedule_name_or_id: str, cron_expression: Optional[str] = None
697+
schedule_name_or_id: str,
698+
cron_expression: str | None = None,
699+
activate: bool | None = None,
700+
deactivate: bool | None = None,
682701
) -> None:
683702
"""Update a pipeline schedule.
684703
685704
Args:
686705
schedule_name_or_id: The name or ID of the schedule to update.
687706
cron_expression: The cron expression to update the schedule with.
707+
activate: Activate a pipeline schedule.
708+
deactivate: Deactivate a pipeline schedule.
688709
"""
689-
if not cron_expression:
710+
options = [
711+
cron_expression,
712+
activate,
713+
deactivate,
714+
]
715+
716+
if not any(options):
690717
cli_utils.declare("No schedule update requested.")
691718
return
692719

720+
if activate and deactivate:
721+
cli_utils.declare(
722+
"You can not both activate and deactivate at the same time."
723+
)
724+
return
725+
726+
if activate:
727+
is_active = True
728+
elif deactivate:
729+
is_active = False
730+
else:
731+
is_active = None
732+
693733
try:
694734
Client().update_schedule(
695735
name_id_or_prefix=schedule_name_or_id,
696736
cron_expression=cron_expression,
737+
active=is_active,
697738
)
698739
except Exception as e:
699740
cli_utils.exception(e)

src/zenml/client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4613,12 +4613,14 @@ def update_schedule(
46134613
self,
46144614
name_id_or_prefix: Union[str, UUID],
46154615
cron_expression: Optional[str] = None,
4616+
active: bool | None = None,
46164617
) -> ScheduleResponse:
46174618
"""Update a schedule.
46184619
46194620
Args:
46204621
name_id_or_prefix: The name, id or prefix of the schedule to update.
46214622
cron_expression: The new cron expression for the schedule.
4623+
active: Active status flag for the schedule.
46224624
46234625
Returns:
46244626
The updated schedule.
@@ -4642,7 +4644,13 @@ def update_schedule(
46424644
)
46434645
return schedule
46444646

4645-
update = ScheduleUpdate(cron_expression=cron_expression)
4647+
if schedule.active == active:
4648+
logger.warning(
4649+
f"Schedule active value is already {active}, skipping update."
4650+
)
4651+
return schedule
4652+
4653+
update = ScheduleUpdate(cron_expression=cron_expression, active=active)
46464654
orchestrator.update_schedule(schedule, update)
46474655
return self.zen_store.update_schedule(
46484656
schedule_id=schedule.id,

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1217,11 +1217,19 @@ def update_schedule(
12171217
if not cron_job_name:
12181218
raise RuntimeError("Unable to find cron job name for schedule.")
12191219

1220+
spec = {}
1221+
1222+
if update.cron_expression:
1223+
spec["schedule"] = update.cron_expression
1224+
1225+
if update.active is not None:
1226+
spec["suspend"] = not update.active
1227+
12201228
if update.cron_expression:
12211229
self._k8s_batch_api.patch_namespaced_cron_job(
12221230
name=cron_job_name,
12231231
namespace=self.config.kubernetes_namespace,
1224-
body={"spec": {"schedule": update.cron_expression}},
1232+
body={"spec": spec},
12251233
)
12261234

12271235
def delete_schedule(self, schedule: "ScheduleResponse") -> None:

src/zenml/models/v2/core/schedule.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class ScheduleUpdate(BaseUpdate):
128128

129129
name: Optional[str] = None
130130
cron_expression: Optional[str] = None
131+
active: bool | None = None
131132

132133

133134
# ------------------ Response Model ------------------

src/zenml/zen_stores/schemas/schedule_schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ def update(self, schedule_update: ScheduleUpdate) -> "ScheduleSchema":
205205
if schedule_update.cron_expression:
206206
self.cron_expression = schedule_update.cron_expression
207207

208+
if schedule_update.active is not None:
209+
self.active = schedule_update.active
210+
208211
self.updated = utc_now()
209212
return self
210213

0 commit comments

Comments
 (0)