diff --git a/src/zenml/cli/pipeline.py b/src/zenml/cli/pipeline.py index 65bc3f01f78..048139db66d 100644 --- a/src/zenml/cli/pipeline.py +++ b/src/zenml/cli/pipeline.py @@ -678,7 +678,8 @@ def list_schedules( help="The cron expression to update the schedule with.", ) def update_schedule( - schedule_name_or_id: str, cron_expression: Optional[str] = None + schedule_name_or_id: str, + cron_expression: str | None = None, ) -> None: """Update a pipeline schedule. @@ -686,7 +687,11 @@ def update_schedule( schedule_name_or_id: The name or ID of the schedule to update. cron_expression: The cron expression to update the schedule with. """ - if not cron_expression: + options = [ + cron_expression, + ] + + if not any(options): cli_utils.declare("No schedule update requested.") return @@ -701,6 +706,42 @@ def update_schedule( cli_utils.declare(f"Updated schedule '{schedule_name_or_id}'.") +@schedule.command("activate", help="Activate a pipeline schedule.") +@click.argument("schedule_name_or_id", type=str, required=True) +def activate_schedule(schedule_name_or_id: str) -> None: + """Activate a pipeline schedule. + + Args: + schedule_name_or_id: The name or ID of the schedule to delete. + """ + try: + Client().update_schedule( + name_id_or_prefix=schedule_name_or_id, active=True + ) + except KeyError as e: + cli_utils.exception(e) + else: + cli_utils.declare(f"Activated schedule '{schedule_name_or_id}'.") + + +@schedule.command("deactivate", help="Deactivate a pipeline schedule.") +@click.argument("schedule_name_or_id", type=str, required=True) +def deactivate_schedule(schedule_name_or_id: str) -> None: + """Activate a pipeline schedule. + + Args: + schedule_name_or_id: The name or ID of the schedule to delete. + """ + try: + Client().update_schedule( + name_id_or_prefix=schedule_name_or_id, active=False + ) + except KeyError as e: + cli_utils.exception(e) + else: + cli_utils.declare(f"Deactivated schedule '{schedule_name_or_id}'.") + + @schedule.command("delete", help="Delete a pipeline schedule.") @click.argument("schedule_name_or_id", type=str, required=True) @click.option( diff --git a/src/zenml/client.py b/src/zenml/client.py index 8936fa7c9d2..426988414df 100644 --- a/src/zenml/client.py +++ b/src/zenml/client.py @@ -4615,12 +4615,14 @@ def update_schedule( self, name_id_or_prefix: Union[str, UUID], cron_expression: Optional[str] = None, + active: bool | None = None, ) -> ScheduleResponse: """Update a schedule. Args: name_id_or_prefix: The name, id or prefix of the schedule to update. cron_expression: The new cron expression for the schedule. + active: Active status flag for the schedule. Returns: The updated schedule. @@ -4644,7 +4646,13 @@ def update_schedule( ) return schedule - update = ScheduleUpdate(cron_expression=cron_expression) + if schedule.active == active: + logger.warning( + f"Schedule active value is already {active}, skipping update." + ) + return schedule + + update = ScheduleUpdate(cron_expression=cron_expression, active=active) orchestrator.update_schedule(schedule, update) return self.zen_store.update_schedule( schedule_id=schedule.id, diff --git a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py index 354cb6b53ad..7d740c4da67 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py +++ b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py @@ -1217,11 +1217,19 @@ def update_schedule( if not cron_job_name: raise RuntimeError("Unable to find cron job name for schedule.") + spec = {} + if update.cron_expression: + spec["schedule"] = update.cron_expression + + if update.active is not None: + spec["suspend"] = not update.active # type: ignore[assignment] + + if spec: self._k8s_batch_api.patch_namespaced_cron_job( name=cron_job_name, namespace=self.config.kubernetes_namespace, - body={"spec": {"schedule": update.cron_expression}}, + body={"spec": spec}, ) def delete_schedule(self, schedule: "ScheduleResponse") -> None: diff --git a/src/zenml/models/v2/core/schedule.py b/src/zenml/models/v2/core/schedule.py index 4870938460a..00d3ca1d4eb 100644 --- a/src/zenml/models/v2/core/schedule.py +++ b/src/zenml/models/v2/core/schedule.py @@ -128,6 +128,7 @@ class ScheduleUpdate(BaseUpdate): name: Optional[str] = None cron_expression: Optional[str] = None + active: bool | None = None # ------------------ Response Model ------------------ diff --git a/src/zenml/steps/step_decorator.py b/src/zenml/steps/step_decorator.py index 81d370ba752..4f2986ddec1 100644 --- a/src/zenml/steps/step_decorator.py +++ b/src/zenml/steps/step_decorator.py @@ -82,6 +82,7 @@ def step( substitutions: Optional[Dict[str, str]] = None, cache_policy: Optional["CachePolicyOrString"] = None, runtime: Optional[StepRuntime] = None, + heartbeat_healthy_threshold: Optional[int] = None, ) -> Callable[["F"], "BaseStep"]: ... diff --git a/src/zenml/zen_stores/schemas/schedule_schema.py b/src/zenml/zen_stores/schemas/schedule_schema.py index 5c1c8cbfedd..0310b561e91 100644 --- a/src/zenml/zen_stores/schemas/schedule_schema.py +++ b/src/zenml/zen_stores/schemas/schedule_schema.py @@ -205,6 +205,9 @@ def update(self, schedule_update: ScheduleUpdate) -> "ScheduleSchema": if schedule_update.cron_expression: self.cron_expression = schedule_update.cron_expression + if schedule_update.active is not None: + self.active = schedule_update.active + self.updated = utc_now() return self