Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ jobs:
pytest-xdist \
pytest-timeout \
pytest-env
pytest --ignore=integration-tests/test_task_worker.py --ignore=integration-tests/test_concurrency_leases.py integration-tests/ -n auto -vv
pytest --ignore=integration-tests/test_task_worker.py --ignore=integration-tests/test_concurrency_leases.py --ignore=integration-tests/test_deploy.py integration-tests/ -n auto -vv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we adding this ignore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test requires the flow-run execute CLI, which isn't available in prefect-client. I'm not quite sure how it was passing before, but pulling a flow from a different repo for the test caused issues.


sqlite-3-24-0:
name: Test SQLite 3.24.0 Compatibility
Expand Down
7 changes: 5 additions & 2 deletions integration-tests/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ async def read_flow_run(flow_run_id):

def test_deploy():
tmp_dir = Path(tempfile.mkdtemp())
runner_dir = tmp_dir / "runner"
runner_dir.mkdir()

try:
subprocess.check_call(
Expand All @@ -40,13 +42,13 @@ def test_deploy():
# Create GitRepository and set base path to temp directory
# to avoid race conditions with parallel tests
git_repo = GitRepository(
url="https://github.com/PrefectHQ/prefect-recipes.git",
url="https://github.com/PrefectHQ/examples.git",
)
git_repo.set_base_path(tmp_dir)

flow_instance = prefect.flow.from_source(
source=git_repo,
entrypoint="flows-starter/hello.py:hello",
entrypoint="flows/hello_world.py:hello",
)

flow_instance.deploy(
Expand All @@ -69,6 +71,7 @@ def test_deploy():
],
stdout=sys.stdout,
stderr=sys.stderr,
cwd=runner_dir,
)

flow_run = anyio.run(read_flow_run, flow_run.id)
Expand Down
31 changes: 18 additions & 13 deletions integration-tests/test_load_flows_concurrently.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
import asyncio
import tempfile
from pathlib import Path
from typing import Any

from prefect import Flow
from prefect.runner.storage import GitRepository


async def load_flow(entrypoint: str) -> Flow[..., Any]:
async def load_flow(entrypoint: str, base_dir: Path) -> Flow[..., Any]:
source = GitRepository(
url="https://github.com/PrefectHQ/examples.git",
)
source.set_base_path(base_dir)
return await Flow.from_source( # type: ignore # sync_compatible causes issues
source=GitRepository(
url="https://github.com/PrefectHQ/examples.git",
),
source=source,
entrypoint=entrypoint,
)


async def test_iteration():
entrypoints = [
"flows/hello_world.py:hello",
"flows/whoami.py:whoami",
] * 5 # Load each flow 5 times concurrently
futures = [load_flow(entrypoint) for entrypoint in entrypoints]
flows = await asyncio.gather(*futures)
return len(flows)
async def run_iteration():
with tempfile.TemporaryDirectory() as tmpdir:
entrypoints = [
"flows/hello_world.py:hello",
"flows/whoami.py:whoami",
] * 5 # Load each flow 5 times concurrently
futures = [load_flow(entrypoint, Path(tmpdir)) for entrypoint in entrypoints]
flows = await asyncio.gather(*futures)
return len(flows)


async def test_load_flows_concurrently():
for i in range(10): # Run 10 iterations
try:
count = await test_iteration()
count = await run_iteration()
print(f"Iteration {i + 1}: Successfully loaded {count} flows")
except Exception as e:
print(f"Iteration {i + 1}: Failed with error: {str(e)}")
Expand Down
96 changes: 1 addition & 95 deletions src/prefect/blocks/system.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import annotations

import json
from datetime import datetime
from typing import Annotated, Any, Generic, TypeVar, Union
from typing import Annotated, Generic, TypeVar, Union

from pydantic import (
Field,
Expand All @@ -14,9 +13,7 @@
)
from pydantic import Secret as PydanticSecret

from prefect._internal.compatibility.deprecated import deprecated_class
from prefect.blocks.core import Block
from prefect.types import DateTime as PydanticDateTime

_SecretValueType = Union[
Annotated[StrictStr, Field(title="string")],
Expand All @@ -26,97 +23,6 @@
T = TypeVar("T", bound=_SecretValueType)


@deprecated_class(
start_date=datetime(2024, 6, 1),
end_date=datetime(2025, 6, 1),
help="Use Variables to store json data instead.",
)
class JSON(Block):
"""
A block that represents JSON. Deprecated, please use Variables to store JSON data instead.

Attributes:
value: A JSON-compatible value.

Example:
Load a stored JSON value:
```python
from prefect.blocks.system import JSON

json_block = JSON.load("BLOCK_NAME")
```
"""

_logo_url = HttpUrl(
"https://cdn.sanity.io/images/3ugk85nk/production/4fcef2294b6eeb423b1332d1ece5156bf296ff96-48x48.png"
)
_documentation_url = HttpUrl("https://docs.prefect.io/latest/develop/blocks")

value: Any = Field(default=..., description="A JSON-compatible value.")


@deprecated_class(
start_date=datetime(2024, 6, 1),
end_date=datetime(2025, 6, 1),
help="Use Variables to store string data instead.",
)
class String(Block):
"""
A block that represents a string. Deprecated, please use Variables to store string data instead.

Attributes:
value: A string value.

Example:
Load a stored string value:
```python
from prefect.blocks.system import String

string_block = String.load("BLOCK_NAME")
```
"""

_logo_url = HttpUrl(
"https://cdn.sanity.io/images/3ugk85nk/production/c262ea2c80a2c043564e8763f3370c3db5a6b3e6-48x48.png"
)
_documentation_url = HttpUrl("https://docs.prefect.io/latest/develop/blocks")

value: str = Field(default=..., description="A string value.")


@deprecated_class(
start_date=datetime(2024, 6, 1),
end_date=datetime(2025, 6, 1),
help="Use Variables to store datetime data instead.",
)
class DateTime(Block):
"""
A block that represents a datetime. Deprecated, please use Variables to store datetime data instead.

Attributes:
value: An ISO 8601-compatible datetime value.

Example:
Load a stored JSON value:
```python
from prefect.blocks.system import DateTime

data_time_block = DateTime.load("BLOCK_NAME")
```
"""

_block_type_name = "Date Time"
_logo_url = HttpUrl(
"https://cdn.sanity.io/images/3ugk85nk/production/8b3da9a6621e92108b8e6a75b82e15374e170ff7-48x48.png"
)
_documentation_url = HttpUrl("https://docs.prefect.io/latest/develop/blocks")

value: PydanticDateTime = Field(
default=...,
description="An ISO 8601-compatible datetime value.",
)


class Secret(Block, Generic[T]):
"""
A block that represents a secret value. The value stored in this block will be obfuscated when
Expand Down
29 changes: 0 additions & 29 deletions src/prefect/cli/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import warnings
import webbrowser
from contextlib import asynccontextmanager
from datetime import datetime
from typing import (
TYPE_CHECKING,
Iterable,
Expand Down Expand Up @@ -615,34 +614,6 @@ async def logout():
exit_with_success("Logged out from Prefect Cloud.")


@cloud_app.command(
deprecated=True,
deprecated_name="prefect cloud open",
deprecated_start_date=datetime(2024, 10, 1),
deprecated_help="Use `prefect dashboard open` to open the Prefect UI.",
)
async def open():
"""
Open the Prefect Cloud UI in the browser.
"""
confirm_logged_in()

async with get_cloud_client() as client:
try:
current_workspace = await client.read_current_workspace()
except ValueError:
exit_with_error(
"There is no current workspace set - set one with `prefect cloud workspace"
" set --workspace <workspace>`."
)

ui_url = current_workspace.ui_url()

await run_sync_in_worker_thread(webbrowser.open_new_tab, ui_url)

exit_with_success(f"Opened {current_workspace.handle!r} in browser.")


@workspace_app.command()
async def ls():
"""List available workspaces."""
Expand Down
46 changes: 0 additions & 46 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import socket
import threading
import uuid
from datetime import datetime
from functools import partial
from operator import methodcaller
from pathlib import Path
Expand Down Expand Up @@ -35,7 +34,6 @@
import prefect.types._datetime
from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect._internal.compatibility.blocks import call_explicitly_async_block_method
from prefect._internal.compatibility.deprecated import deprecated_callable
from prefect._internal.concurrency.event_loop import get_running_loop
from prefect._result_records import R, ResultRecord, ResultRecordMetadata
from prefect.blocks.core import Block
Expand Down Expand Up @@ -965,50 +963,6 @@ async def await_for_lock(self, key: str, timeout: float | None = None) -> bool:
)
return await self.lock_manager.await_for_lock(key, timeout)

# TODO: These two methods need to find a new home

@deprecated_callable(
start_date=datetime(2025, 5, 10),
end_date=datetime(2025, 11, 10),
help="Use `store_parameters` from `prefect.task_worker` instead.",
)
@sync_compatible
async def store_parameters(self, identifier: UUID, parameters: dict[str, Any]):
record = ResultRecord(
result=parameters,
metadata=ResultRecordMetadata(
serializer=self.serializer, storage_key=str(identifier)
),
)

await call_explicitly_async_block_method(
self.result_storage,
"write_path",
(f"parameters/{identifier}",),
{"content": record.serialize()},
)

@deprecated_callable(
start_date=datetime(2025, 5, 10),
end_date=datetime(2025, 11, 10),
help="Use `read_parameters` from `prefect.task_worker` instead.",
)
@sync_compatible
async def read_parameters(self, identifier: UUID) -> dict[str, Any]:
if self.result_storage is None:
raise ValueError(
"Result store is not configured - must have a result storage block to read parameters"
)
record: ResultRecord[Any] = ResultRecord[Any].deserialize(
await call_explicitly_async_block_method(
self.result_storage,
"read_path",
(f"parameters/{identifier}",),
{},
)
)
return record.result


def get_result_store() -> ResultStore:
"""
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .runner import Runner
from .submit import submit_to_runner, wait_for_submitted_runs

__all__ = ["Runner", "submit_to_runner", "wait_for_submitted_runs"]
__all__ = ["Runner"]
Loading