diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index ad8a68159bc9..7297625c6681 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -161,7 +161,7 @@ jobs: pytest-xdist \ pytest-timeout \ pytest-env - pytest --ignore=integration-tests/test_task_worker.py --ignore=integration-tests/test_concurrency_leases.py integration-tests/ -n auto -vv + pytest --ignore=integration-tests/test_task_worker.py --ignore=integration-tests/test_concurrency_leases.py --ignore=integration-tests/test_deploy.py integration-tests/ -n auto -vv sqlite-3-24-0: name: Test SQLite 3.24.0 Compatibility diff --git a/integration-tests/test_deploy.py b/integration-tests/test_deploy.py index 56a11d424d0e..bf999e5362d6 100644 --- a/integration-tests/test_deploy.py +++ b/integration-tests/test_deploy.py @@ -19,6 +19,8 @@ async def read_flow_run(flow_run_id): def test_deploy(): tmp_dir = Path(tempfile.mkdtemp()) + runner_dir = tmp_dir / "runner" + runner_dir.mkdir() try: subprocess.check_call( @@ -40,13 +42,13 @@ def test_deploy(): # Create GitRepository and set base path to temp directory # to avoid race conditions with parallel tests git_repo = GitRepository( - url="https://github.com/PrefectHQ/prefect-recipes.git", + url="https://github.com/PrefectHQ/examples.git", ) git_repo.set_base_path(tmp_dir) flow_instance = prefect.flow.from_source( source=git_repo, - entrypoint="flows-starter/hello.py:hello", + entrypoint="flows/hello_world.py:hello", ) flow_instance.deploy( @@ -69,6 +71,7 @@ def test_deploy(): ], stdout=sys.stdout, stderr=sys.stderr, + cwd=runner_dir, ) flow_run = anyio.run(read_flow_run, flow_run.id) diff --git a/integration-tests/test_load_flows_concurrently.py b/integration-tests/test_load_flows_concurrently.py index e1b1cca108f0..4893d9d7e363 100644 --- a/integration-tests/test_load_flows_concurrently.py +++ b/integration-tests/test_load_flows_concurrently.py @@ -1,33 +1,38 @@ import asyncio +import tempfile +from pathlib import Path from typing import Any from prefect import Flow from prefect.runner.storage import GitRepository -async def load_flow(entrypoint: str) -> Flow[..., Any]: +async def load_flow(entrypoint: str, base_dir: Path) -> Flow[..., Any]: + source = GitRepository( + url="https://github.com/PrefectHQ/examples.git", + ) + source.set_base_path(base_dir) return await Flow.from_source( # type: ignore # sync_compatible causes issues - source=GitRepository( - url="https://github.com/PrefectHQ/examples.git", - ), + source=source, entrypoint=entrypoint, ) -async def test_iteration(): - entrypoints = [ - "flows/hello_world.py:hello", - "flows/whoami.py:whoami", - ] * 5 # Load each flow 5 times concurrently - futures = [load_flow(entrypoint) for entrypoint in entrypoints] - flows = await asyncio.gather(*futures) - return len(flows) +async def run_iteration(): + with tempfile.TemporaryDirectory() as tmpdir: + entrypoints = [ + "flows/hello_world.py:hello", + "flows/whoami.py:whoami", + ] * 5 # Load each flow 5 times concurrently + futures = [load_flow(entrypoint, Path(tmpdir)) for entrypoint in entrypoints] + flows = await asyncio.gather(*futures) + return len(flows) async def test_load_flows_concurrently(): for i in range(10): # Run 10 iterations try: - count = await test_iteration() + count = await run_iteration() print(f"Iteration {i + 1}: Successfully loaded {count} flows") except Exception as e: print(f"Iteration {i + 1}: Failed with error: {str(e)}") diff --git a/src/prefect/blocks/system.py b/src/prefect/blocks/system.py index 9ee12b8218a8..3a265568232f 100644 --- a/src/prefect/blocks/system.py +++ b/src/prefect/blocks/system.py @@ -1,8 +1,7 @@ from __future__ import annotations import json -from datetime import datetime -from typing import Annotated, Any, Generic, TypeVar, Union +from typing import Annotated, Generic, TypeVar, Union from pydantic import ( Field, @@ -14,9 +13,7 @@ ) from pydantic import Secret as PydanticSecret -from prefect._internal.compatibility.deprecated import deprecated_class from prefect.blocks.core import Block -from prefect.types import DateTime as PydanticDateTime _SecretValueType = Union[ Annotated[StrictStr, Field(title="string")], @@ -26,97 +23,6 @@ T = TypeVar("T", bound=_SecretValueType) -@deprecated_class( - start_date=datetime(2024, 6, 1), - end_date=datetime(2025, 6, 1), - help="Use Variables to store json data instead.", -) -class JSON(Block): - """ - A block that represents JSON. Deprecated, please use Variables to store JSON data instead. - - Attributes: - value: A JSON-compatible value. - - Example: - Load a stored JSON value: - ```python - from prefect.blocks.system import JSON - - json_block = JSON.load("BLOCK_NAME") - ``` - """ - - _logo_url = HttpUrl( - "https://cdn.sanity.io/images/3ugk85nk/production/4fcef2294b6eeb423b1332d1ece5156bf296ff96-48x48.png" - ) - _documentation_url = HttpUrl("https://docs.prefect.io/latest/develop/blocks") - - value: Any = Field(default=..., description="A JSON-compatible value.") - - -@deprecated_class( - start_date=datetime(2024, 6, 1), - end_date=datetime(2025, 6, 1), - help="Use Variables to store string data instead.", -) -class String(Block): - """ - A block that represents a string. Deprecated, please use Variables to store string data instead. - - Attributes: - value: A string value. - - Example: - Load a stored string value: - ```python - from prefect.blocks.system import String - - string_block = String.load("BLOCK_NAME") - ``` - """ - - _logo_url = HttpUrl( - "https://cdn.sanity.io/images/3ugk85nk/production/c262ea2c80a2c043564e8763f3370c3db5a6b3e6-48x48.png" - ) - _documentation_url = HttpUrl("https://docs.prefect.io/latest/develop/blocks") - - value: str = Field(default=..., description="A string value.") - - -@deprecated_class( - start_date=datetime(2024, 6, 1), - end_date=datetime(2025, 6, 1), - help="Use Variables to store datetime data instead.", -) -class DateTime(Block): - """ - A block that represents a datetime. Deprecated, please use Variables to store datetime data instead. - - Attributes: - value: An ISO 8601-compatible datetime value. - - Example: - Load a stored JSON value: - ```python - from prefect.blocks.system import DateTime - - data_time_block = DateTime.load("BLOCK_NAME") - ``` - """ - - _block_type_name = "Date Time" - _logo_url = HttpUrl( - "https://cdn.sanity.io/images/3ugk85nk/production/8b3da9a6621e92108b8e6a75b82e15374e170ff7-48x48.png" - ) - _documentation_url = HttpUrl("https://docs.prefect.io/latest/develop/blocks") - - value: PydanticDateTime = Field( - default=..., - description="An ISO 8601-compatible datetime value.", - ) - - class Secret(Block, Generic[T]): """ A block that represents a secret value. The value stored in this block will be obfuscated when diff --git a/src/prefect/cli/cloud/__init__.py b/src/prefect/cli/cloud/__init__.py index 74b42a4f41dc..24155cf1af98 100644 --- a/src/prefect/cli/cloud/__init__.py +++ b/src/prefect/cli/cloud/__init__.py @@ -11,7 +11,6 @@ import warnings import webbrowser from contextlib import asynccontextmanager -from datetime import datetime from typing import ( TYPE_CHECKING, Iterable, @@ -615,34 +614,6 @@ async def logout(): exit_with_success("Logged out from Prefect Cloud.") -@cloud_app.command( - deprecated=True, - deprecated_name="prefect cloud open", - deprecated_start_date=datetime(2024, 10, 1), - deprecated_help="Use `prefect dashboard open` to open the Prefect UI.", -) -async def open(): - """ - Open the Prefect Cloud UI in the browser. - """ - confirm_logged_in() - - async with get_cloud_client() as client: - try: - current_workspace = await client.read_current_workspace() - except ValueError: - exit_with_error( - "There is no current workspace set - set one with `prefect cloud workspace" - " set --workspace `." - ) - - ui_url = current_workspace.ui_url() - - await run_sync_in_worker_thread(webbrowser.open_new_tab, ui_url) - - exit_with_success(f"Opened {current_workspace.handle!r} in browser.") - - @workspace_app.command() async def ls(): """List available workspaces.""" diff --git a/src/prefect/results.py b/src/prefect/results.py index b64aa28d8a0e..93435683763d 100644 --- a/src/prefect/results.py +++ b/src/prefect/results.py @@ -5,7 +5,6 @@ import socket import threading import uuid -from datetime import datetime from functools import partial from operator import methodcaller from pathlib import Path @@ -35,7 +34,6 @@ import prefect.types._datetime from prefect._internal.compatibility.async_dispatch import async_dispatch from prefect._internal.compatibility.blocks import call_explicitly_async_block_method -from prefect._internal.compatibility.deprecated import deprecated_callable from prefect._internal.concurrency.event_loop import get_running_loop from prefect._result_records import R, ResultRecord, ResultRecordMetadata from prefect.blocks.core import Block @@ -965,50 +963,6 @@ async def await_for_lock(self, key: str, timeout: float | None = None) -> bool: ) return await self.lock_manager.await_for_lock(key, timeout) - # TODO: These two methods need to find a new home - - @deprecated_callable( - start_date=datetime(2025, 5, 10), - end_date=datetime(2025, 11, 10), - help="Use `store_parameters` from `prefect.task_worker` instead.", - ) - @sync_compatible - async def store_parameters(self, identifier: UUID, parameters: dict[str, Any]): - record = ResultRecord( - result=parameters, - metadata=ResultRecordMetadata( - serializer=self.serializer, storage_key=str(identifier) - ), - ) - - await call_explicitly_async_block_method( - self.result_storage, - "write_path", - (f"parameters/{identifier}",), - {"content": record.serialize()}, - ) - - @deprecated_callable( - start_date=datetime(2025, 5, 10), - end_date=datetime(2025, 11, 10), - help="Use `read_parameters` from `prefect.task_worker` instead.", - ) - @sync_compatible - async def read_parameters(self, identifier: UUID) -> dict[str, Any]: - if self.result_storage is None: - raise ValueError( - "Result store is not configured - must have a result storage block to read parameters" - ) - record: ResultRecord[Any] = ResultRecord[Any].deserialize( - await call_explicitly_async_block_method( - self.result_storage, - "read_path", - (f"parameters/{identifier}",), - {}, - ) - ) - return record.result - def get_result_store() -> ResultStore: """ diff --git a/src/prefect/runner/__init__.py b/src/prefect/runner/__init__.py index 0d059ee6528a..e4062f65dd44 100644 --- a/src/prefect/runner/__init__.py +++ b/src/prefect/runner/__init__.py @@ -1,4 +1,3 @@ from .runner import Runner -from .submit import submit_to_runner, wait_for_submitted_runs -__all__ = ["Runner", "submit_to_runner", "wait_for_submitted_runs"] +__all__ = ["Runner"] diff --git a/src/prefect/runner/server.py b/src/prefect/runner/server.py index 2648b6d8dbac..8f170dc6ff99 100644 --- a/src/prefect/runner/server.py +++ b/src/prefect/runner/server.py @@ -1,23 +1,14 @@ from __future__ import annotations import uuid -from datetime import datetime -from typing import TYPE_CHECKING, Any, Callable, Coroutine, Hashable, Optional +from typing import TYPE_CHECKING, Any, Callable, Optional import uvicorn -from fastapi import APIRouter, FastAPI, HTTPException, status +from fastapi import APIRouter, FastAPI, status from fastapi.responses import JSONResponse from typing_extensions import Literal -from prefect._internal.compatibility.deprecated import deprecated_callable -from prefect._internal.schemas.validators import validate_values_conform_to_schema -from prefect.client.orchestration import get_client -from prefect.exceptions import MissingFlowError, ScriptError -from prefect.flows import Flow, load_flow_from_entrypoint from prefect.logging import get_logger -from prefect.runner.utils import ( - inject_schemas_into_openapi, -) from prefect.settings import ( PREFECT_RUNNER_POLL_FREQUENCY, PREFECT_RUNNER_SERVER_HOST, @@ -27,12 +18,10 @@ ) from prefect.types._datetime import now as now_fn from prefect.utilities.asyncutils import run_coro_as_sync -from prefect.utilities.importtools import load_script_as_module if TYPE_CHECKING: import logging - from prefect.client.schemas.responses import DeploymentResponse from prefect.runner import Runner from pydantic import BaseModel @@ -90,182 +79,12 @@ def _shutdown(): return _shutdown -async def _build_endpoint_for_deployment( - deployment: "DeploymentResponse", runner: "Runner" -) -> Callable[..., Coroutine[Any, Any, JSONResponse]]: - async def _create_flow_run_for_deployment( - body: Optional[dict[Any, Any]] = None, - ) -> JSONResponse: - body = body or {} - if deployment.enforce_parameter_schema and deployment.parameter_openapi_schema: - try: - validate_values_conform_to_schema( - body, deployment.parameter_openapi_schema - ) - except ValueError as exc: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - detail=f"Error creating flow run: {exc}", - ) - - async with get_client() as client: - flow_run = await client.create_flow_run_from_deployment( - deployment_id=deployment.id, parameters=body - ) - logger.info( - f"Created flow run {flow_run.name!r} from deployment" - f" {deployment.name!r}" - ) - runner.execute_in_background(runner.execute_flow_run, flow_run.id) - return JSONResponse( - status_code=status.HTTP_201_CREATED, - content={"flow_run_id": str(flow_run.id)}, - ) - - return _create_flow_run_for_deployment - - -async def get_deployment_router( - runner: "Runner", -) -> tuple[APIRouter, dict[Hashable, Any]]: - router = APIRouter() - schemas: dict[Hashable, Any] = {} - async with get_client() as client: - for deployment_id in runner._deployment_ids: # pyright: ignore[reportPrivateUsage] - deployment = await client.read_deployment(deployment_id) - router.add_api_route( - f"/deployment/{deployment.id}/run", - await _build_endpoint_for_deployment(deployment, runner), - methods=["POST"], - name=f"Create flow run for deployment {deployment.name}", - description=( - "Trigger a flow run for a deployment as a background task on the" - " runner." - ), - summary=f"Run {deployment.name}", - ) - - # Used for updating the route schemas later on - schemas[f"{deployment.name}-{deployment_id}"] = ( - deployment.parameter_openapi_schema - ) - schemas[deployment_id] = deployment.name - return router, schemas - - -async def get_subflow_schemas(runner: "Runner") -> dict[str, dict[str, Any]]: - """ - Load available subflow schemas by filtering for only those subflows in the - deployment entrypoint's import space. - """ - schemas: dict[str, dict[str, Any]] = {} - async with get_client() as client: - for deployment_id in runner._deployment_ids: # pyright: ignore[reportPrivateUsage] - deployment = await client.read_deployment(deployment_id) - if deployment.entrypoint is None: - continue - - script = deployment.entrypoint.split(":")[0] - module = load_script_as_module(script) - subflows: list[Flow[Any, Any]] = [ - obj for obj in module.__dict__.values() if isinstance(obj, Flow) - ] - for flow in subflows: - schemas[flow.name] = flow.parameters.model_dump() - - return schemas - - -def _flow_in_schemas(flow: Flow[Any, Any], schemas: dict[str, dict[str, Any]]) -> bool: - """ - Check if a flow is in the schemas dict, either by name or by name with - dashes replaced with underscores. - """ - flow_name_with_dashes = flow.name.replace("_", "-") - return flow.name in schemas or flow_name_with_dashes in schemas - - -def _flow_schema_changed( - flow: Flow[Any, Any], schemas: dict[str, dict[str, Any]] -) -> bool: - """ - Check if a flow's schemas have changed, either by bame of by name with - dashes replaced with underscores. - """ - flow_name_with_dashes = flow.name.replace("_", "-") - - schema = schemas.get(flow.name, None) or schemas.get(flow_name_with_dashes, None) - if schema is not None and flow.parameters.model_dump() != schema: - return True - return False - - -def _build_generic_endpoint_for_flows( - runner: "Runner", schemas: dict[str, dict[str, Any]] -) -> Callable[..., Coroutine[Any, Any, JSONResponse]]: - async def _create_flow_run_for_flow_from_fqn( - body: RunnerGenericFlowRunRequest, - ) -> JSONResponse: - if not runner.has_slots_available(): - return JSONResponse( - status_code=status.HTTP_429_TOO_MANY_REQUESTS, - content={"message": "Runner has no available slots"}, - ) - - try: - flow = load_flow_from_entrypoint(body.entrypoint) - except (FileNotFoundError, MissingFlowError, ScriptError, ModuleNotFoundError): - return JSONResponse( - status_code=status.HTTP_404_NOT_FOUND, - content={"message": "Flow not found"}, - ) - - # Verify that the flow we're loading is a subflow this runner is - # managing - if not _flow_in_schemas(flow, schemas): - logger.warning( - f"Flow {flow.name} is not directly managed by the runner. Please " - "include it in the runner's served flows' import namespace." - ) - # Verify that the flow we're loading hasn't changed since the webserver - # was started - if _flow_schema_changed(flow, schemas): - logger.warning( - "A change in flow parameters has been detected. Please " - "restart the runner." - ) - - async with get_client() as client: - flow_run = await client.create_flow_run( - flow=flow, - parameters=body.parameters, - parent_task_run_id=body.parent_task_run_id, - ) - logger.info(f"Created flow run {flow_run.name!r} from flow {flow.name!r}") - runner.execute_in_background( - runner.execute_flow_run, flow_run.id, body.entrypoint - ) - - return JSONResponse( - status_code=status.HTTP_201_CREATED, - content=flow_run.model_dump(mode="json"), - ) - - return _create_flow_run_for_flow_from_fqn - - -@deprecated_callable( - start_date=datetime(2025, 4, 1), - end_date=datetime(2025, 10, 1), - help="Use background tasks (https://docs.prefect.io/v3/concepts/tasks#background-tasks) or `run_deployment` and `.serve` instead of submitting runs to the Runner webserver.", -) async def build_server(runner: "Runner") -> FastAPI: """ Build a FastAPI server for a runner. Args: - runner (Runner): the runner this server interacts with and monitors - log_level (str): the log level to use for the server + runner: the runner this server interacts with and monitors """ webserver = FastAPI() router = APIRouter() @@ -277,44 +96,16 @@ async def build_server(runner: "Runner") -> FastAPI: router.add_api_route("/shutdown", shutdown(runner=runner), methods=["POST"]) webserver.include_router(router) - deployments_router, deployment_schemas = await get_deployment_router(runner) - webserver.include_router(deployments_router) - - subflow_schemas = await get_subflow_schemas(runner) - webserver.add_api_route( - "/flow/run", - _build_generic_endpoint_for_flows(runner=runner, schemas=subflow_schemas), - methods=["POST"], - name="Run flow in background", - description="Trigger any flow run as a background task on the runner.", - summary="Run flow", - ) - - def customize_openapi(): - if webserver.openapi_schema: - return webserver.openapi_schema - - openapi_schema = inject_schemas_into_openapi(webserver, deployment_schemas) - webserver.openapi_schema = openapi_schema - return webserver.openapi_schema - - webserver.openapi = customize_openapi - return webserver -@deprecated_callable( - start_date=datetime(2025, 4, 1), - end_date=datetime(2025, 10, 1), - help="Use background tasks (https://docs.prefect.io/v3/concepts/flows-and-tasks#background-tasks) or `run_deployment` and `.serve` instead of submitting runs to the Runner webserver.", -) def start_webserver(runner: "Runner", log_level: str | None = None) -> None: """ Run a FastAPI server for a runner. Args: - runner (Runner): the runner this server interacts with and monitors - log_level (str): the log level to use for the server + runner: the runner this server interacts with and monitors + log_level: the log level to use for the server """ host = PREFECT_RUNNER_SERVER_HOST.value() port = PREFECT_RUNNER_SERVER_PORT.value() diff --git a/src/prefect/runner/submit.py b/src/prefect/runner/submit.py deleted file mode 100644 index 377775fd1ed4..000000000000 --- a/src/prefect/runner/submit.py +++ /dev/null @@ -1,263 +0,0 @@ -from __future__ import annotations - -import asyncio -import inspect -import uuid -from datetime import datetime -from typing import TYPE_CHECKING, Any, Union, overload - -import anyio -import httpx -from typing_extensions import Literal, TypeAlias - -from prefect._internal.compatibility.deprecated import deprecated_callable -from prefect.client.orchestration import get_client -from prefect.client.schemas.filters import ( - FlowRunFilter, - FlowRunFilterParentFlowRunId, - TaskRunFilter, -) -from prefect.client.schemas.objects import ( - Constant, - FlowRun, - FlowRunResult, - Parameter, - TaskRunResult, -) -from prefect.context import FlowRunContext -from prefect.flows import Flow -from prefect.logging import get_logger -from prefect.settings import ( - PREFECT_RUNNER_PROCESS_LIMIT, - PREFECT_RUNNER_SERVER_HOST, - PREFECT_RUNNER_SERVER_PORT, -) -from prefect.states import Pending -from prefect.tasks import Task -from prefect.utilities.asyncutils import sync_compatible - -if TYPE_CHECKING: - import logging - -logger: "logging.Logger" = get_logger("webserver") - -FlowOrTask: TypeAlias = Union[Flow[Any, Any], Task[Any, Any]] - - -async def _submit_flow_to_runner( - flow: Flow[Any, Any], - parameters: dict[str, Any], - retry_failed_submissions: bool = True, -) -> FlowRun: - """ - Run a flow in the background via the runner webserver. - - Args: - flow: the flow to create a run for and execute in the background - parameters: the keyword arguments to pass to the callable - timeout: the maximum time to wait for the callable to finish - poll_interval: the interval (in seconds) to wait between polling the callable - - Returns: - A `FlowRun` object representing the flow run that was submitted. - """ - from prefect.utilities._engine import dynamic_key_for_task_run - from prefect.utilities.engine import collect_task_run_inputs, resolve_inputs - - async with get_client() as client: - if not retry_failed_submissions: - # TODO - configure the client to not retry 429s coming from the - # webserver - pass - - parent_flow_run_context = FlowRunContext.get() - - task_inputs: dict[ - str, list[Union[TaskRunResult, FlowRunResult, Parameter, Constant]] - ] = {k: list(await collect_task_run_inputs(v)) for k, v in parameters.items()} - parameters = await resolve_inputs(parameters) - dummy_task = Task(name=flow.name, fn=flow.fn, version=flow.version) - parent_task_run = await client.create_task_run( - task=dummy_task, - flow_run_id=( - parent_flow_run_context.flow_run.id - if parent_flow_run_context and parent_flow_run_context.flow_run - else None - ), - dynamic_key=( - str(dynamic_key_for_task_run(parent_flow_run_context, dummy_task)) - if parent_flow_run_context - else str(uuid.uuid4()) - ), - task_inputs=task_inputs, - state=Pending(), - ) - - httpx_client = getattr(client, "_client") - response = await httpx_client.post( - ( - f"http://{PREFECT_RUNNER_SERVER_HOST.value()}" - f":{PREFECT_RUNNER_SERVER_PORT.value()}" - "/flow/run" - ), - json={ - "entrypoint": getattr(flow, "_entrypoint"), - "parameters": flow.serialize_parameters(parameters), - "parent_task_run_id": str(parent_task_run.id), - }, - ) - response.raise_for_status() - - return FlowRun.model_validate(response.json()) - - -@overload -def submit_to_runner( - prefect_callable: Flow[Any, Any] | Task[Any, Any], - parameters: dict[str, Any], - retry_failed_submissions: bool = True, -) -> FlowRun: ... - - -@overload -def submit_to_runner( - prefect_callable: Flow[Any, Any] | Task[Any, Any], - parameters: list[dict[str, Any]], - retry_failed_submissions: bool = True, -) -> list[FlowRun]: ... - - -@deprecated_callable( - start_date=datetime(2025, 4, 1), - end_date=datetime(2025, 10, 1), - help="Use background tasks (https://docs.prefect.io/v3/concepts/flows-and-tasks#background-tasks) or `run_deployment` and `.serve` instead of submitting runs to the Runner webserver.", -) -@sync_compatible -async def submit_to_runner( - prefect_callable: Flow[Any, Any], - parameters: dict[str, Any] | list[dict[str, Any]] | None = None, - retry_failed_submissions: bool = True, -) -> FlowRun | list[FlowRun]: - """ - Submit a callable in the background via the runner webserver one or more times. - - Args: - prefect_callable: the callable to run (only flows are supported for now, but eventually tasks) - parameters: keyword arguments to pass to the callable. May be a list of dictionaries where - each dictionary represents a discrete invocation of the callable - retry_failed_submissions: Whether to retry failed submissions to the runner webserver. - """ - if not isinstance(prefect_callable, Flow): # pyright: ignore[reportUnnecessaryIsInstance] - raise TypeError( - "The `submit_to_runner` utility only supports submitting flows and tasks." - ) - - parameters = parameters or {} - if isinstance(parameters, list): - return_single = False - elif isinstance(parameters, dict): # pyright: ignore[reportUnnecessaryIsInstance] - parameters = [parameters] - return_single = True - else: - raise TypeError("Parameters must be a dictionary or a list of dictionaries.") - - submitted_runs: list[FlowRun] = [] - unsubmitted_parameters: list[dict[str, Any]] = [] - - for p in parameters: - try: - flow_run = await _submit_flow_to_runner( - prefect_callable, p, retry_failed_submissions - ) - if inspect.isawaitable(flow_run): - flow_run = await flow_run - submitted_runs.append(flow_run) - except httpx.ConnectError as exc: - raise RuntimeError( - "Failed to connect to the `Runner` webserver. Ensure that the server is" - " running and reachable. You can run the webserver either by starting" - " your `serve` process with `webserver=True`, or by setting" - " `PREFECT_RUNNER_SERVER_ENABLE=True`." - ) from exc - except httpx.HTTPStatusError as exc: - if exc.response.status_code == 429: - unsubmitted_parameters.append(p) - else: - raise exc - - if unsubmitted_parameters: - logger.warning( - f"Failed to submit {len(unsubmitted_parameters)} runs to the runner, as all" - f" of the available {PREFECT_RUNNER_PROCESS_LIMIT.value()} slots were" - " occupied. To increase the number of available slots, configure" - " the`PREFECT_RUNNER_PROCESS_LIMIT` setting." - ) - - # If one run was submitted, return the corresponding FlowRun directly - if return_single: - return submitted_runs[0] - return submitted_runs - - -@deprecated_callable( - start_date=datetime(2025, 4, 1), - end_date=datetime(2025, 10, 1), - help="Use background tasks (https://docs.prefect.io/v3/concepts/flows-and-tasks#background-tasks) or `run_deployment` and `.serve` instead of submitting runs to the Runner webserver.", -) -@sync_compatible -async def wait_for_submitted_runs( - flow_run_filter: FlowRunFilter | None = None, - task_run_filter: TaskRunFilter | None = None, - timeout: float | None = None, - poll_interval: float = 3.0, -) -> uuid.UUID | None: - """ - Wait for completion of any provided flow runs (eventually task runs), as well as subflow runs - of the current flow run (if called from within a flow run and subflow runs exist). - - Args: - flow_run_filter: A filter to apply to the flow runs to wait for. - task_run_filter: A filter to apply to the task runs to wait for. # TODO: /task/run - timeout: How long to wait for completion of all runs (seconds). - poll_interval: How long to wait between polling each run's state (seconds). - """ - - parent_flow_run_id = ( - ctx.flow_run.id if ((ctx := FlowRunContext.get()) and ctx.flow_run) else None - ) - - if task_run_filter: - raise NotImplementedError("Waiting for task runs is not yet supported.") - - async def wait_for_final_state( - run_type: Literal["flow", "task"], run_id: uuid.UUID - ): - read_run_method = getattr(client, f"read_{run_type}_run") - while True: - run = await read_run_method(run_id) - if run.state and run.state.is_final(): - return run_id - await anyio.sleep(poll_interval) - - async with get_client() as client: - with anyio.move_on_after(timeout): - flow_runs_to_wait_for = ( - await client.read_flow_runs(flow_run_filter=flow_run_filter) - if flow_run_filter - else [] - ) - - if parent_flow_run_id is not None: - subflow_runs = await client.read_flow_runs( - flow_run_filter=FlowRunFilter( - parent_flow_run_id=FlowRunFilterParentFlowRunId( - any_=[parent_flow_run_id] - ) - ) - ) - - flow_runs_to_wait_for.extend(subflow_runs) - - await asyncio.gather( - *(wait_for_final_state("flow", run.id) for run in flow_runs_to_wait_for) - ) diff --git a/src/prefect/runner/utils.py b/src/prefect/runner/utils.py deleted file mode 100644 index 744b77cee5d6..000000000000 --- a/src/prefect/runner/utils.py +++ /dev/null @@ -1,96 +0,0 @@ -from __future__ import annotations - -from copy import deepcopy -from typing import Any, Hashable - -from fastapi import FastAPI -from fastapi.openapi.utils import get_openapi - -from prefect import __version__ as PREFECT_VERSION - - -def inject_schemas_into_openapi( - webserver: FastAPI, schemas_to_inject: dict[Hashable, Any] -) -> dict[str, Any]: - """ - Augments the webserver's OpenAPI schema with additional schemas from deployments / flows / tasks. - - Args: - webserver: The FastAPI instance representing the webserver. - schemas_to_inject: A dictionary of OpenAPI schemas to integrate. - - Returns: - The augmented OpenAPI schema dictionary. - """ - openapi_schema = get_openapi( - title="FastAPI Prefect Runner", version=PREFECT_VERSION, routes=webserver.routes - ) - - augmented_schema = merge_definitions(schemas_to_inject, openapi_schema) - return update_refs_to_components(augmented_schema) - - -def merge_definitions( - injected_schemas: dict[Hashable, Any], openapi_schema: dict[str, Any] -) -> dict[str, Any]: - """ - Integrates definitions from injected schemas into the OpenAPI components. - - Args: - injected_schemas: A dictionary of deployment-specific schemas. - openapi_schema: The base OpenAPI schema to update. - """ - openapi_schema_copy = deepcopy(openapi_schema) - components = openapi_schema_copy.setdefault("components", {}).setdefault( - "schemas", {} - ) - for definitions in injected_schemas.values(): - if "definitions" in definitions: - for def_name, def_schema in definitions["definitions"].items(): - def_schema_copy = deepcopy(def_schema) - update_refs_in_schema(def_schema_copy, "#/components/schemas/") - components[def_name] = def_schema_copy - return openapi_schema_copy - - -def update_refs_in_schema( - schema_item: dict[str, Any] | list[Any], new_ref: str -) -> None: - """ - Recursively replaces `$ref` with a new reference base in a schema item. - - Args: - schema_item: A schema or part of a schema to update references in. - new_ref: The new base string to replace in `$ref` values. - """ - if isinstance(schema_item, dict): - if "$ref" in schema_item: - schema_item["$ref"] = schema_item["$ref"].replace("#/definitions/", new_ref) - for value in schema_item.values(): - update_refs_in_schema(value, new_ref) - elif isinstance(schema_item, list): # pyright: ignore[reportUnnecessaryIsInstance] - for item in schema_item: - update_refs_in_schema(item, new_ref) - - -def update_refs_to_components(openapi_schema: dict[str, Any]) -> dict[str, Any]: - """ - Updates all `$ref` fields in the OpenAPI schema to reference the components section. - - Args: - openapi_schema: The OpenAPI schema to modify `$ref` fields in. - """ - for path_item in openapi_schema.get("paths", {}).values(): - for operation in path_item.values(): - schema = ( - operation.get("requestBody", {}) - .get("content", {}) - .get("application/json", {}) - .get("schema", {}) - ) - update_refs_in_schema(schema, "#/components/schemas/") - - for definition in openapi_schema.get("definitions", {}).values(): - update_refs_in_schema(definition, "#/components/schemas/") - - return openapi_schema diff --git a/src/prefect/server/models/block_registration.py b/src/prefect/server/models/block_registration.py index 98bf72692cbc..e7b1b9ab81eb 100644 --- a/src/prefect/server/models/block_registration.py +++ b/src/prefect/server/models/block_registration.py @@ -6,7 +6,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from prefect.blocks.core import Block -from prefect.blocks.system import JSON, DateTime, Secret +from prefect.blocks.system import Secret from prefect.blocks.webhook import Webhook from prefect.filesystems import LocalFileSystem from prefect.logging import get_logger @@ -27,9 +27,7 @@ async def _install_protected_system_blocks(session: AsyncSession) -> None: """Install block types that the system expects to be present""" - protected_system_blocks = cast( - List[Block], [Webhook, JSON, DateTime, Secret, LocalFileSystem] - ) + protected_system_blocks = cast(List[Block], [Webhook, Secret, LocalFileSystem]) for block in protected_system_blocks: async with session.begin(): diff --git a/tests/blocks/test_system.py b/tests/blocks/test_system.py index 9bf9cae8a66d..9301b61c9638 100644 --- a/tests/blocks/test_system.py +++ b/tests/blocks/test_system.py @@ -1,21 +1,10 @@ from typing import Any -from zoneinfo import ZoneInfo import pytest from pydantic import Secret as PydanticSecret from pydantic import SecretStr -from prefect.blocks.system import DateTime, Secret -from prefect.types._datetime import DateTime as PydanticDateTime - - -@pytest.mark.usefixtures("ignore_prefect_deprecation_warnings") -def test_datetime(): - DateTime(value=PydanticDateTime(2022, 1, 1, tzinfo=ZoneInfo("UTC"))).save( - name="test" - ) - api_block = DateTime.load("test") - assert api_block.value == PydanticDateTime(2022, 1, 1, tzinfo=ZoneInfo("UTC")) +from prefect.blocks.system import Secret @pytest.mark.parametrize( diff --git a/tests/cli/test_block.py b/tests/cli/test_block.py index c5c74e760a00..0876dfad7151 100644 --- a/tests/cli/test_block.py +++ b/tests/cli/test_block.py @@ -305,8 +305,6 @@ def test_listing_system_block_types(register_block_types): "Slug", "Description", "slack", - "date-time", - "json", "local-file-system", "remote-file-system", "secret", @@ -320,14 +318,14 @@ def test_listing_system_block_types(register_block_types): ) -async def test_inspecting_a_block(ignore_prefect_deprecation_warnings): - await system.JSON(value="a simple json blob").save("jsonblob") +async def test_inspecting_a_block(): + await system.Secret(value="sk-1234567890").save("secretblob") - expected_output = ("Block Type", "Block id", "value", "a simple json blob") + expected_output = ("Block Type", "Block id", "value", "********") await run_sync_in_worker_thread( invoke_and_assert, - ["block", "inspect", "json/jsonblob"], + ["block", "inspect", "secret/secretblob"], expected_code=0, expected_output_contains=expected_output, ) @@ -432,7 +430,7 @@ def test_deleting_a_protected_block_type( expected_output = "is a protected block" invoke_and_assert( - ["block", "type", "delete", "json"], + ["block", "type", "delete", "secret"], expected_code=1, user_input="y", expected_output_contains=expected_output, diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index a8dc01f1a71a..c8a9c29d6804 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -59,7 +59,7 @@ from prefect.flows import Flow from prefect.logging.loggers import flow_run_logger from prefect.runner.runner import Runner -from prefect.runner.server import perform_health_check, start_webserver +from prefect.runner.server import perform_health_check from prefect.schedules import Cron, Interval from prefect.settings import ( PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE, @@ -428,28 +428,6 @@ def test_serve_starts_a_runner( mock_runner_start.assert_awaited_once() - def test_log_level_lowercasing(self, monkeypatch: pytest.MonkeyPatch): - runner_mock = mock.MagicMock() - log_level = "DEBUG" - - # Mock build_server to return a webserver mock object - with mock.patch( - "prefect.runner.server.build_server", new_callable=mock.AsyncMock - ) as mock_build_server: - webserver_mock = mock.MagicMock() - mock_build_server.return_value = webserver_mock - - # Patch uvicorn.run to verify it's called with the correct arguments - with mock.patch("uvicorn.run") as mock_uvicorn: - start_webserver(runner_mock, log_level=log_level) - # Assert build_server was called once with the runner - mock_build_server.assert_called_once_with(runner_mock) - - # Assert uvicorn.run was called with the lowercase log_level and the webserver mock - mock_uvicorn.assert_called_once_with( - webserver_mock, host=mock.ANY, port=mock.ANY, log_level="debug" - ) - def test_serve_in_async_context_raises_error(self, monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr( "asyncio.get_running_loop", lambda: asyncio.get_event_loop() diff --git a/tests/runner/test_submit.py b/tests/runner/test_submit.py deleted file mode 100644 index f85ec30333f1..000000000000 --- a/tests/runner/test_submit.py +++ /dev/null @@ -1,191 +0,0 @@ -import uuid -import warnings -from typing import Any, Callable, Generator, Union -from unittest import mock - -import httpx -import pytest - -from prefect import flow -from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning -from prefect.client.schemas.objects import FlowRun -from prefect.runner import submit_to_runner -from prefect.settings import ( - PREFECT_RUNNER_SERVER_ENABLE, - temporary_settings, -) -from prefect.states import Running - - -@flow -def identity(whatever: Any): - return whatever - - -@flow -async def async_identity(whatever: Any): - return whatever - - -@flow -def super_identity(*args: Any, **kwargs: Any): - return args, kwargs - - -@flow(log_prints=True) -def independent(): - print("i don't need no stinkin' parameters") - - -@pytest.fixture -def mock_webserver(monkeypatch: pytest.MonkeyPatch): - async def mock_submit_flow_to_runner(_: Any, parameters: Any, *__: Any) -> FlowRun: - return FlowRun(flow_id=uuid.uuid4(), state=Running(), parameters=parameters) - - monkeypatch.setattr( - "prefect.runner.submit._submit_flow_to_runner", mock_submit_flow_to_runner - ) - - -@pytest.fixture -def mock_webserver_not_running(monkeypatch: pytest.MonkeyPatch): - async def mock_submit_flow_to_runner(*_: Any, **__: Any): - raise httpx.ConnectError("Mocked connection error") - - monkeypatch.setattr( - "prefect.runner.submit._submit_flow_to_runner", mock_submit_flow_to_runner - ) - - -@pytest.fixture(autouse=True) -def runner_settings() -> Generator[None, None, None]: - with temporary_settings({PREFECT_RUNNER_SERVER_ENABLE: True}): - yield - - -@pytest.fixture(autouse=True) -def suppress_deprecation_warning() -> Generator[None, None, None]: - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=PrefectDeprecationWarning) - yield - - -@pytest.mark.parametrize("prefect_callable", [identity, async_identity]) -@pytest.mark.usefixtures("mock_webserver") -def test_submit_to_runner_happy_path_sync_context(prefect_callable: Callable[..., Any]): - @flow - def test_flow() -> FlowRun: - return submit_to_runner(prefect_callable, {"whatever": 42}) - - flow_run = test_flow() - assert flow_run.state is not None - assert flow_run.state.is_running() - assert flow_run.parameters == {"whatever": 42} - - -@pytest.mark.parametrize("prefect_callable", [identity, async_identity]) -@pytest.mark.usefixtures("mock_webserver") -async def test_submit_to_runner_happy_path_async_context( - prefect_callable: Callable[..., Any], -): - flow_run = await submit_to_runner(prefect_callable, {"whatever": 42}) - - assert flow_run.state.is_running() - assert flow_run.parameters == {"whatever": 42} - - -async def test_submit_to_runner_raises_if_not_prefect_callable(): - with pytest.raises( - TypeError, - match=( - "The `submit_to_runner` utility only supports submitting flows and tasks." - ), - ): - await submit_to_runner(lambda: None) - - -@pytest.mark.usefixtures("mock_webserver") -async def test_submission_with_optional_parameters(): - flow_run = await submit_to_runner(independent) - - assert flow_run.state.is_running() - assert flow_run.parameters == {} - - -@pytest.mark.usefixtures("mock_webserver_not_running") -async def test_submission_raises_if_webserver_not_running(): - with temporary_settings({PREFECT_RUNNER_SERVER_ENABLE: False}): - with pytest.raises( - (httpx.ConnectTimeout, RuntimeError), - match="Ensure that the server is running", - ): - await submit_to_runner(identity, {"d": {"input": 9001}}) - - -@pytest.mark.parametrize("input_", [[{"input": 1}, {"input": 2}], {"input": 3}]) -@pytest.mark.usefixtures("mock_webserver") -async def test_return_for_submissions_matches_input( - input_: Union[list[dict[str, Any]], dict[str, Any]], -): - def _flow_run_generator(*_: Any, **__: Any) -> FlowRun: - return FlowRun(flow_id=uuid.uuid4()) - - with mock.patch( - "prefect.runner.submit._submit_flow_to_runner", - side_effect=_flow_run_generator, - ): - results = await submit_to_runner(identity, input_) - - if isinstance(input_, dict): - assert isinstance(results, FlowRun) - else: - assert len(results) == len(input_) - assert all(isinstance(r, FlowRun) for r in results) - - -@pytest.mark.parametrize( - "input_", - [ - { - "name": "Schleeb", - "age": 99, - "young": True, - "metadata": [{"nested": "info"}], - "data": [True, False, True], - "info": {"nested": "info"}, - }, - [ - { - "name": "Schleeb", - "age": 99, - "young": True, - "metadata": [{"nested": "info"}], - "data": [True, False, True], - "info": {"nested": "info"}, - } - ], - [ - { - "name": "Schleeb", - "age": 99, - "young": True, - "metadata": [{"nested": "info"}], - "data": [True, False, True], - "info": {"nested": "info"}, - } - ], - [{"1": {2: {3: {4: None}}}}], - ], -) -@pytest.mark.usefixtures("mock_webserver") -async def test_types_in_submission( - input_: Union[list[dict[str, Any]], dict[str, Any]], -): - results = await submit_to_runner(super_identity, input_) - - if isinstance(input_, list): - assert len(results) == len(input_) - for r in results: - assert isinstance(r, FlowRun) - else: - assert isinstance(results, FlowRun) diff --git a/tests/runner/test_utils.py b/tests/runner/test_utils.py deleted file mode 100644 index d27c846591b1..000000000000 --- a/tests/runner/test_utils.py +++ /dev/null @@ -1,204 +0,0 @@ -from typing import Any, Callable -from unittest.mock import create_autospec - -import pytest -from fastapi import FastAPI -from fastapi.routing import APIRoute - -from prefect import __version__ as PREFECT_VERSION -from prefect.runner.utils import ( - inject_schemas_into_openapi, - merge_definitions, - update_refs_to_components, -) - - -class MockRoute(APIRoute): - def __init__(self, path: str, endpoint: Callable[..., Any]): - super().__init__(path, endpoint) - - -def mock_endpoint(): - pass - - -@pytest.fixture -def mock_app(): - app = create_autospec(FastAPI, instance=True) - mock_route = MockRoute("/dummy", mock_endpoint) - app.routes = [mock_route] - return app - - -@pytest.fixture -def deployment_schemas(): - return { - "deployment1": { - "definitions": { - "Model1": { - "type": "object", - "properties": {"field1": {"type": "string"}}, - } - } - }, - "deployment2": { - "definitions": { - "Model2": { - "type": "object", - "properties": {"field2": {"type": "integer"}}, - } - } - }, - } - - -@pytest.fixture -def openapi_schema(): - return { - "openapi": "3.1.0", - "info": {"title": "FastAPI Prefect Runner", "version": PREFECT_VERSION}, - "components": {"schemas": {}}, - "paths": {}, - } - - -@pytest.fixture -def schema_with_refs(): - return { - "paths": { - "/path": { - "get": { - "requestBody": { - "content": { - "application/json": { - "schema": {"$ref": "#/definitions/Model1"} - } - } - } - } - } - } - } - - -@pytest.fixture -def nested_schema_with_refs(): - return { - "definitions": { - "NestedModel": { - "type": "object", - "properties": {"nestedField": {"$ref": "#/definitions/Model1"}}, - } - }, - "paths": { - "/nested": { - "get": { - "requestBody": { - "content": { - "application/json": { - "schema": {"$ref": "#/definitions/NestedModel"} - } - } - } - } - } - }, - } - - -@pytest.fixture -def augmented_openapi_schema(): - return { - "openapi": "3.1.0", - "info": {"title": "FastAPI Prefect Runner", "version": PREFECT_VERSION}, - "paths": { - "/dummy": { - "get": { - "summary": "Mock Endpoint", - "operationId": "mock_endpoint_dummy_get", - "responses": { - "200": { - "description": "Successful Response", - "content": {"application/json": {"schema": {}}}, - } - }, - } - } - }, - "components": { - "schemas": { - "Model1": { - "type": "object", - "properties": {"field1": {"type": "string"}}, - }, - "Model2": { - "type": "object", - "properties": {"field2": {"type": "integer"}}, - }, - } - }, - } - - -def test_inject_schemas_into_openapi( - mock_app, deployment_schemas, augmented_openapi_schema -): - result_schema = inject_schemas_into_openapi(mock_app, deployment_schemas) - assert result_schema == augmented_openapi_schema - - -class TestMergeDefinitions: - def test_merge_definitions(self, deployment_schemas, openapi_schema): - result_schema = merge_definitions(deployment_schemas, openapi_schema) - - expected_models = {} - for definitions in deployment_schemas.values(): - if "definitions" in definitions: - expected_models.update(definitions["definitions"]) - - assert result_schema["components"]["schemas"] == expected_models - - def test_merge_definitions_empty_schemas(self, openapi_schema): - result_schema = merge_definitions({}, openapi_schema) - assert result_schema["components"]["schemas"] == {} - - def test_merge_definitions_preserves_unrelated_schema_parts( - self, deployment_schemas, openapi_schema - ): - original_paths = {"dummy_path": "dummy_value"} - openapi_schema["paths"] = original_paths - result_schema = merge_definitions(deployment_schemas, openapi_schema) - assert result_schema["paths"] == original_paths - - -class TestUpdateRefsToComponents: - def test_update_refs_to_components( - self, openapi_schema, deployment_schemas, schema_with_refs - ): - result_schema = update_refs_to_components(schema_with_refs) - assert ( - result_schema["paths"]["/path"]["get"]["requestBody"]["content"][ - "application/json" - ]["schema"]["$ref"] - == "#/components/schemas/Model1" - ) - - def test_update_refs_to_components_empty_schema(self): - empty_schema = {"paths": {}} - result_schema = update_refs_to_components(empty_schema) - assert result_schema == empty_schema - - def test_update_refs_to_components_nested_schema(self, nested_schema_with_refs): - result_schema = update_refs_to_components(nested_schema_with_refs) - assert ( - result_schema["paths"]["/nested"]["get"]["requestBody"]["content"][ - "application/json" - ]["schema"]["$ref"] - == "#/components/schemas/NestedModel" - ) - assert ( - result_schema["definitions"]["NestedModel"]["properties"]["nestedField"][ - "$ref" - ] - == "#/components/schemas/Model1" - ) diff --git a/tests/runner/test_webserver.py b/tests/runner/test_webserver.py deleted file mode 100644 index eca17d1b3370..000000000000 --- a/tests/runner/test_webserver.py +++ /dev/null @@ -1,281 +0,0 @@ -import uuid -import warnings -from typing import Callable, Generator, List -from unittest import mock - -import pydantic -import pytest -from fastapi.testclient import TestClient - -from prefect import flow -from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning -from prefect.client.orchestration import PrefectClient, get_client -from prefect.client.schemas.objects import FlowRun -from prefect.runner import Runner -from prefect.runner.server import build_server -from prefect.settings import ( - PREFECT_RUNNER_SERVER_HOST, - PREFECT_RUNNER_SERVER_PORT, - temporary_settings, -) - - -class A(pydantic.BaseModel): - a: int = 0 - - -class B(pydantic.BaseModel): - a: A = A() - b: bool = False - - -@flow(version="test") -def simple_flow(verb: str = "party"): - print(f"I'm just here to {verb}") - - -@flow -def complex_flow( - x: int, y: str = "hello", z: List[bool] = [True], a: A = A(), b: B = B() -): - print(x, y, z, a, b) - - -def a_non_flow_function(): - print("This is not a flow!") - - -@pytest.fixture(autouse=True) -def tmp_runner_settings(): - with temporary_settings( - updates={ - PREFECT_RUNNER_SERVER_HOST: "0.0.0.0", - PREFECT_RUNNER_SERVER_PORT: 0, - } - ): - yield - - -@pytest.fixture(autouse=True) -def suppress_deprecation_warnings() -> Generator[None, None, None]: - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=PrefectDeprecationWarning) - yield - - -@pytest.fixture -async def runner() -> Runner: - return Runner() - - -async def create_deployment(runner: Runner, func: Callable): - # Use unique names to force multiple deployments to be created - deployment_id = await runner.add_flow( - func, f"{uuid.uuid4()}", enforce_parameter_schema=True - ) - return str(deployment_id) - - -class TestWebserverSettings: - async def test_webserver_settings_are_respected(self, runner: Runner): - with temporary_settings( - updates={ - PREFECT_RUNNER_SERVER_HOST: "127.0.0.1", - PREFECT_RUNNER_SERVER_PORT: 4200, - } - ): - assert PREFECT_RUNNER_SERVER_HOST.value() == "127.0.0.1" - assert PREFECT_RUNNER_SERVER_PORT.value() == 4200 - - -class TestWebserverDeploymentRoutes: - async def test_runners_deployment_run_routes_exist(self, runner: Runner): - deployment_ids = [ - await create_deployment(runner, simple_flow) for _ in range(3) - ] - webserver = await build_server(runner) - - deployment_run_routes = [ - r - for r in webserver.routes - if r.path.startswith("/deployment") and r.path.endswith("/run") - ] - deployment_run_paths = {r.path for r in deployment_run_routes} - - # verify that all deployment routes correspond to one of the deployments - for route in deployment_run_routes: - id_ = route.path.split("/")[2] - assert id_ in deployment_ids - - # verify that all deployments have a route - for id_ in deployment_ids: - route = f"/deployment/{id_}/run" - assert route in deployment_run_paths - - @pytest.mark.skip(reason="This test is flaky and needs to be fixed") - async def test_runners_deployment_run_route_does_input_validation( - self, runner: Runner - ): - async with runner: - deployment_id = await create_deployment(runner, simple_flow) - webserver = await build_server(runner) - - client = TestClient(webserver) - response = client.post( - f"/deployment/{deployment_id}/run", json={"verb": False} - ) - assert response.status_code == 400 - - response = client.post( - f"/deployment/{deployment_id}/run", json={"verb": "clobber"} - ) - assert response.status_code == 201 - flow_run_id = response.json()["flow_run_id"] - assert isinstance(uuid.UUID(flow_run_id), uuid.UUID) - - @pytest.mark.skip(reason="This test is flaky and needs to be fixed") - async def test_runners_deployment_run_route_with_complex_args(self, runner: Runner): - async with runner: - deployment_id = await runner.add_flow( - complex_flow, f"{uuid.uuid4()}", enforce_parameter_schema=True - ) - webserver = await build_server(runner) - client = TestClient(webserver) - response = client.post(f"/deployment/{deployment_id}/run", json={"x": 100}) - assert response.status_code == 201, response.json() - flow_run_id = response.json()["flow_run_id"] - assert isinstance(uuid.UUID(flow_run_id), uuid.UUID) - - async def test_runners_deployment_run_route_execs_flow_run(self, runner: Runner): - mock_flow_run_id = str(uuid.uuid4()) - - mock_client = mock.create_autospec(PrefectClient, spec_set=True) - mock_client.create_flow_run_from_deployment.return_value.id = mock_flow_run_id - mock_get_client = mock.create_autospec(get_client, spec_set=True) - mock_get_client.return_value.__aenter__.return_value = mock_client - mock_get_client.return_value.__aexit__.return_value = None - - async with runner: - deployment_id = await create_deployment(runner, simple_flow) - webserver = await build_server(runner) - client = TestClient(webserver) - - with ( - mock.patch("prefect.runner.server.get_client", new=mock_get_client), - mock.patch.object(runner, "execute_in_background"), - ): - with client: - response = client.post(f"/deployment/{deployment_id}/run") - assert response.status_code == 201, response.json() - flow_run_id = response.json()["flow_run_id"] - assert flow_run_id == mock_flow_run_id - assert isinstance(uuid.UUID(flow_run_id), uuid.UUID) - mock_client.create_flow_run_from_deployment.assert_called_once_with( - deployment_id=uuid.UUID(deployment_id), parameters={} - ) - - -class TestWebserverFlowRoutes: - async def test_flow_router_runs_managed_flow(self, runner: Runner): - async with runner: - await create_deployment(runner, simple_flow) - webserver = await build_server(runner) - client = TestClient(webserver) - - with mock.patch.object( - runner, "execute_flow_run", new_callable=mock.AsyncMock - ) as mock_run: - response = client.post( - "/flow/run", - json={"entrypoint": f"{__file__}:simple_flow", "parameters": {}}, - ) - assert response.status_code == 201, response.status_code - assert isinstance(FlowRun.model_validate(response.json()), FlowRun) - mock_run.assert_called() - - @pytest.mark.parametrize("flow_name", ["a_missing_flow"]) - @pytest.mark.parametrize( - "flow_file", [__file__, "/not/a/path.py", "not/a/python/file.txt"] - ) - async def test_missing_flow_raises_a_404( - self, - runner: Runner, - flow_file: str, - flow_name: str, - ): - async with runner: - await create_deployment(runner, simple_flow) - webserver = await build_server(runner) - client = TestClient(webserver) - - response = client.post( - "/flow/run", - json={"entrypoint": f"{flow_file}:{flow_name}", "parameters": {}}, - ) - assert response.status_code == 404, response.status_code - - @mock.patch("prefect.runner.server.load_flow_from_entrypoint") - async def test_flow_router_complains_about_running_unmanaged_flow( - self, mocked_load: mock.MagicMock, runner: Runner, caplog - ): - async with runner: - await create_deployment(runner, simple_flow) - webserver = await build_server(runner) - client = TestClient(webserver) - - @flow - def new_flow(): - pass - - # force load_flow_from_entrypoint to return a different flow - mocked_load.return_value = new_flow - with mock.patch.object( - runner, "execute_flow_run", new_callable=mock.AsyncMock - ) as mock_run: - response = client.post( - "/flow/run", json={"entrypoint": "doesnt_matter", "parameters": {}} - ) - # the flow should still be run even though it's not managed - assert response.status_code == 201, response.status_code - assert isinstance(FlowRun.model_validate(response.json()), FlowRun) - mock_run.assert_called() - - # we should have logged a warning - assert ( - "Flow new-flow is not directly managed by the runner. Please " - "include it in the runner's served flows' import namespace." - in caplog.text - ) - - @mock.patch("prefect.runner.server.load_flow_from_entrypoint") - async def test_flow_router_complains_about_flow_with_different_schema( - self, mocked_load: mock.MagicMock, runner: Runner, caplog - ): - async with runner: - await create_deployment(runner, simple_flow) - webserver = await build_server(runner) - client = TestClient(webserver) - - @flow - def simple_flow2(age: int = 99): - pass - - # force load_flow_from_entrypoint to return the updated flow - simple_flow2.name = "simple_flow" - mocked_load.return_value = simple_flow2 - with mock.patch.object( - runner, "execute_flow_run", new_callable=mock.AsyncMock - ) as mock_run: - response = client.post( - "/flow/run", json={"entrypoint": "doesnt_matter", "parameters": {}} - ) - # we'll still attempt to run the changed flow - assert response.status_code == 201, response.status_code - assert isinstance(FlowRun.model_validate(response.json()), FlowRun) - mock_run.assert_called() - - # we should have logged a warning - assert ( - "A change in flow parameters has been detected. Please restart the runner." - in caplog.text - ) diff --git a/tests/server/models/test_block_registration.py b/tests/server/models/test_block_registration.py index 5a017e6b6c45..a1792b807977 100644 --- a/tests/server/models/test_block_registration.py +++ b/tests/server/models/test_block_registration.py @@ -32,8 +32,6 @@ async def test_full_registration_with_empty_database( self, session, expected_number_of_registered_block_types ): PROTECTED_BLOCKS = { - "json", - "date-time", "secret", "local-file-system", "webhook", diff --git a/tests/server/orchestration/api/test_block_types.py b/tests/server/orchestration/api/test_block_types.py index 2174e1b0fa87..87ff66c90e37 100644 --- a/tests/server/orchestration/api/test_block_types.py +++ b/tests/server/orchestration/api/test_block_types.py @@ -1,4 +1,3 @@ -from datetime import datetime, timezone from textwrap import dedent from typing import List from unittest.mock import AsyncMock @@ -498,38 +497,39 @@ async def test_create_system_block_type( await hosted_api_client.post("/block_types/install_system_block_types") # create a datetime block - datetime_block_type = await hosted_api_client.get("/block_types/slug/date-time") - datetime_block_schema = await hosted_api_client.post( + secret_block_type = await hosted_api_client.get("/block_types/slug/secret") + secret_block_schema = await hosted_api_client.post( "/block_schemas/filter", json=dict( block_schemas=dict( - block_type_id=dict(any_=[datetime_block_type.json()["id"]]) + block_type_id=dict(any_=[secret_block_type.json()["id"]]) ), limit=1, ), ) - block = prefect.blocks.system.DateTime(value="2022-01-01T00:00:00+00:00") + block = prefect.blocks.system.Secret(value="sk-1234567890") response = await hosted_api_client.post( "/block_documents/", json=block._to_block_document( - name="my-test-date-time", - block_type_id=datetime_block_type.json()["id"], - block_schema_id=datetime_block_schema.json()[0]["id"], + name="my-test-secret", + block_type_id=secret_block_type.json()["id"], + block_schema_id=secret_block_schema.json()[0]["id"], ).model_dump( mode="json", exclude_unset=True, exclude={"id", "block_schema", "block_type", "block_type_name"}, + context={"include_secrets": True}, ), ) assert response.status_code == status.HTTP_201_CREATED, response.text - # load the datetime block - api_block = await prefect.blocks.system.DateTime.load("my-test-date-time") - assert api_block.value == datetime(2022, 1, 1, tzinfo=timezone.utc) + # load the secret block + api_block = await prefect.blocks.system.Secret.load("my-test-secret") + assert api_block.get() == "sk-1234567890" async def test_system_block_types_are_protected(self, client, session): # install system blocks await client.post("/block_types/install_system_block_types") # read date-time system block - response = await client.get("/block_types/slug/date-time") + response = await client.get("/block_types/slug/secret") assert response.json()["is_protected"] diff --git a/tests/utilities/test_templating.py b/tests/utilities/test_templating.py index 567ec885ee37..7a74bb8f2748 100644 --- a/tests/utilities/test_templating.py +++ b/tests/utilities/test_templating.py @@ -4,7 +4,6 @@ import pytest from prefect.blocks.core import Block -from prefect.blocks.system import JSON, DateTime, Secret, String from prefect.blocks.webhook import Webhook from prefect.client.orchestration import PrefectClient from prefect.utilities.annotations import NotSet @@ -392,110 +391,6 @@ async def test_resolve_block_document_references_does_not_change_standard_placeh assert result == template - async def test_resolve_block_document_unpacks_system_blocks(self): - await JSON(value={"key": "value"}).save(name="json-block") - await Secret(value="N1nj4C0d3rP@ssw0rd!").save(name="secret-block") - await DateTime(value="2020-01-01T00:00:00Z").save(name="datetime-block") - await String(value="hello").save(name="string-block") - - template = { - "json": "{{ prefect.blocks.json.json-block }}", - "secret": "{{ prefect.blocks.secret.secret-block }}", - "datetime": "{{ prefect.blocks.date-time.datetime-block }}", - "string": "{{ prefect.blocks.string.string-block }}", - } - - result = await resolve_block_document_references(template) - assert result == { - "json": {"key": "value"}, - "secret": "N1nj4C0d3rP@ssw0rd!", - "datetime": "2020-01-01T00:00:00Z", - "string": "hello", - } - - async def test_resolve_block_document_system_block_resolves_dict_keypath(self): - # for backwards compatibility system blocks can be referenced directly - # they should still be able to access nested keys - await JSON(value={"key": {"nested-key": "nested_value"}}).save( - name="nested-json-block" - ) - template = { - "value": "{{ prefect.blocks.json.nested-json-block}}", - "keypath": "{{ prefect.blocks.json.nested-json-block.key }}", - "nested_keypath": "{{ prefect.blocks.json.nested-json-block.key.nested-key }}", - } - - result = await resolve_block_document_references(template) - assert result == { - "value": {"key": {"nested-key": "nested_value"}}, - "keypath": {"nested-key": "nested_value"}, - "nested_keypath": "nested_value", - } - - async def test_resolve_block_document_resolves_dict_keypath(self): - await JSON(value={"key": {"nested-key": "nested_value"}}).save( - name="nested-json-block-2" - ) - template = { - "value": "{{ prefect.blocks.json.nested-json-block-2.value }}", - "keypath": "{{ prefect.blocks.json.nested-json-block-2.value.key }}", - "nested_keypath": ( - "{{ prefect.blocks.json.nested-json-block-2.value.key.nested-key }}" - ), - } - - result = await resolve_block_document_references(template) - assert result == { - "value": {"key": {"nested-key": "nested_value"}}, - "keypath": {"nested-key": "nested_value"}, - "nested_keypath": "nested_value", - } - - async def test_resolve_block_document_resolves_list_keypath(self): - await JSON(value={"key": ["value1", "value2"]}).save(name="json-list-block") - await JSON(value=["value1", "value2"]).save(name="list-block") - await JSON( - value={"key": ["value1", {"nested": ["value2", "value3"]}, "value4"]} - ).save(name="nested-json-list-block") - template = { - "json_list": "{{ prefect.blocks.json.json-list-block.value.key[0] }}", - "list": "{{ prefect.blocks.json.list-block.value[1] }}", - "nested_json_list": ( - "{{ prefect.blocks.json.nested-json-list-block.value.key[1].nested[1] }}" - ), - } - - result = await resolve_block_document_references(template) - assert result == { - "json_list": "value1", - "list": "value2", - "nested_json_list": "value3", - } - - async def test_resolve_block_document_raises_on_invalid_keypath(self): - await JSON(value={"key": {"nested_key": "value"}}).save( - name="nested-json-block-3" - ) - json_template = { - "json": "{{ prefect.blocks.json.nested-json-block-3.value.key.does_not_exist }}", - } - with pytest.raises(ValueError, match="Could not resolve the keypath"): - await resolve_block_document_references(json_template) - - await JSON(value=["value1", "value2"]).save(name="index-error-block") - index_error_template = { - "index_error": "{{ prefect.blocks.json.index-error-block.value[3] }}", - } - with pytest.raises(ValueError, match="Could not resolve the keypath"): - await resolve_block_document_references(index_error_template) - - await Webhook(url="https://example.com").save(name="webhook-block") - webhook_template = { - "webhook": "{{ prefect.blocks.webhook.webhook-block.value }}", - } - with pytest.raises(ValueError, match="Could not resolve the keypath"): - await resolve_block_document_references(webhook_template) - async def test_resolve_block_document_resolves_block_attribute(self): await Webhook(url="https://example.com").save(name="webhook-block-2")