Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions src/zenml/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,15 +678,20 @@ def list_schedules(
help="The cron expression to update the schedule with.",
)
def update_schedule(
schedule_name_or_id: str, cron_expression: Optional[str] = None
schedule_name_or_id: str,
cron_expression: str | None = None,
) -> None:
"""Update a pipeline schedule.

Args:
schedule_name_or_id: The name or ID of the schedule to update.
cron_expression: The cron expression to update the schedule with.
"""
if not cron_expression:
options = [
cron_expression,
]

if not any(options):
cli_utils.declare("No schedule update requested.")
return

Expand All @@ -701,6 +706,42 @@ def update_schedule(
cli_utils.declare(f"Updated schedule '{schedule_name_or_id}'.")


@schedule.command("activate", help="Activate a pipeline schedule.")
@click.argument("schedule_name_or_id", type=str, required=True)
def activate_schedule(schedule_name_or_id: str) -> None:
"""Activate a pipeline schedule.

Args:
schedule_name_or_id: The name or ID of the schedule to delete.
"""
try:
Client().update_schedule(
name_id_or_prefix=schedule_name_or_id, active=True
)
except KeyError as e:
cli_utils.exception(e)
else:
cli_utils.declare(f"Activated schedule '{schedule_name_or_id}'.")


@schedule.command("deactivate", help="Deactivate a pipeline schedule.")
@click.argument("schedule_name_or_id", type=str, required=True)
def deactivate_schedule(schedule_name_or_id: str) -> None:
"""Activate a pipeline schedule.

Args:
schedule_name_or_id: The name or ID of the schedule to delete.
"""
try:
Client().update_schedule(
name_id_or_prefix=schedule_name_or_id, active=False
)
except KeyError as e:
cli_utils.exception(e)
else:
cli_utils.declare(f"Deactivated schedule '{schedule_name_or_id}'.")


@schedule.command("delete", help="Delete a pipeline schedule.")
@click.argument("schedule_name_or_id", type=str, required=True)
@click.option(
Expand Down
10 changes: 9 additions & 1 deletion src/zenml/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4615,12 +4615,14 @@ def update_schedule(
self,
name_id_or_prefix: Union[str, UUID],
cron_expression: Optional[str] = None,
active: bool | None = None,
) -> ScheduleResponse:
"""Update a schedule.

Args:
name_id_or_prefix: The name, id or prefix of the schedule to update.
cron_expression: The new cron expression for the schedule.
active: Active status flag for the schedule.

Returns:
The updated schedule.
Expand All @@ -4644,7 +4646,13 @@ def update_schedule(
)
return schedule

update = ScheduleUpdate(cron_expression=cron_expression)
if schedule.active == active:
logger.warning(
f"Schedule active value is already {active}, skipping update."
)
return schedule

update = ScheduleUpdate(cron_expression=cron_expression, active=active)
orchestrator.update_schedule(schedule, update)
return self.zen_store.update_schedule(
schedule_id=schedule.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1217,11 +1217,19 @@ def update_schedule(
if not cron_job_name:
raise RuntimeError("Unable to find cron job name for schedule.")

spec = {}

if update.cron_expression:
spec["schedule"] = update.cron_expression

if update.active is not None:
spec["suspend"] = not update.active # type: ignore[assignment]

if spec:
self._k8s_batch_api.patch_namespaced_cron_job(
name=cron_job_name,
namespace=self.config.kubernetes_namespace,
body={"spec": {"schedule": update.cron_expression}},
body={"spec": spec},
)

def delete_schedule(self, schedule: "ScheduleResponse") -> None:
Expand Down
1 change: 1 addition & 0 deletions src/zenml/models/v2/core/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class ScheduleUpdate(BaseUpdate):

name: Optional[str] = None
cron_expression: Optional[str] = None
active: bool | None = None


# ------------------ Response Model ------------------
Expand Down
1 change: 1 addition & 0 deletions src/zenml/steps/step_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def step(
substitutions: Optional[Dict[str, str]] = None,
cache_policy: Optional["CachePolicyOrString"] = None,
runtime: Optional[StepRuntime] = None,
heartbeat_healthy_threshold: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be related to the rest of the PR, no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, just a quick UX fix I noticed.

) -> Callable[["F"], "BaseStep"]: ...


Expand Down
3 changes: 3 additions & 0 deletions src/zenml/zen_stores/schemas/schedule_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def update(self, schedule_update: ScheduleUpdate) -> "ScheduleSchema":
if schedule_update.cron_expression:
self.cron_expression = schedule_update.cron_expression

if schedule_update.active is not None:
self.active = schedule_update.active

self.updated = utc_now()
return self

Expand Down
Loading