Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ keywords = [
"scraping",
]
dependencies = [
"apify-client>=2.3.0,<3.0.0",
"apify-client @ git+https://github.com/apify/apify-client-python.git@typed-clients",
"apify-shared>=2.0.0,<3.0.0",
"crawlee>=1.0.4,<2.0.0",
"cachetools>=5.5.0",
Expand Down
6 changes: 3 additions & 3 deletions src/apify/storage_clients/_apify/_alias_resolving.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from collections.abc import Callable
from types import TracebackType

from apify_client.clients import (
from apify_client._resource_clients import (
DatasetClientAsync,
DatasetCollectionClientAsync,
KeyValueStoreClientAsync,
Expand Down Expand Up @@ -105,8 +105,8 @@ async def open_by_alias(
# Create new unnamed storage and store alias mapping
raw_metadata = await collection_client.get_or_create()

await alias_resolver.store_mapping(storage_id=raw_metadata['id'])
return get_resource_client_by_id(raw_metadata['id'])
await alias_resolver.store_mapping(storage_id=raw_metadata.id)
return get_resource_client_by_id(raw_metadata.id)


class AliasResolver:
Expand Down
6 changes: 3 additions & 3 deletions src/apify/storage_clients/_apify/_api_client_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from apify.storage_clients._apify._alias_resolving import open_by_alias

if TYPE_CHECKING:
from apify_client.clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync
from apify_client._resource_clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync

from apify._configuration import Configuration

Expand Down Expand Up @@ -137,13 +137,13 @@ def get_resource_client(storage_id: str) -> DatasetClientAsync:
# Default storage does not exist. Create a new one.
if not raw_metadata:
raw_metadata = await collection_client.get_or_create()
resource_client = get_resource_client(raw_metadata['id'])
resource_client = get_resource_client(raw_metadata.id)
return resource_client

# Open by name.
case (None, str(), None, _):
raw_metadata = await collection_client.get_or_create(name=name)
return get_resource_client(raw_metadata['id'])
return get_resource_client(raw_metadata.id)

# Open by ID.
case (None, None, str(), _):
Expand Down
16 changes: 14 additions & 2 deletions src/apify/storage_clients/_apify/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import warnings
from datetime import datetime
from logging import getLogger
from typing import TYPE_CHECKING, Any

Expand All @@ -17,7 +18,7 @@
if TYPE_CHECKING:
from collections.abc import AsyncIterator

from apify_client.clients import DatasetClientAsync
from apify_client._resource_clients import DatasetClientAsync
from crawlee._types import JsonSerializable

from apify import Configuration
Expand Down Expand Up @@ -65,7 +66,18 @@ def __init__(
@override
async def get_metadata(self) -> DatasetMetadata:
metadata = await self._api_client.get()
return DatasetMetadata.model_validate(metadata)

if metadata is None:
raise ValueError('Failed to retrieve dataset metadata.')

return DatasetMetadata(
id=metadata.id,
name=metadata.name,
created_at=datetime.fromisoformat(metadata.created_at.replace('Z', '+00:00')),
modified_at=datetime.fromisoformat(metadata.modified_at.replace('Z', '+00:00')),
accessed_at=datetime.fromisoformat(metadata.accessed_at.replace('Z', '+00:00')),
item_count=int(metadata.item_count),
)

@classmethod
async def open(
Expand Down
23 changes: 17 additions & 6 deletions src/apify/storage_clients/_apify/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import warnings
from datetime import datetime
from logging import getLogger
from typing import TYPE_CHECKING, Any

Expand All @@ -11,12 +12,12 @@
from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata

from ._api_client_creation import create_storage_api_client
from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage
from ._models import ApifyKeyValueStoreMetadata

if TYPE_CHECKING:
from collections.abc import AsyncIterator

from apify_client.clients import KeyValueStoreClientAsync
from apify_client._resource_clients import KeyValueStoreClientAsync

from apify import Configuration

Expand Down Expand Up @@ -54,7 +55,18 @@ def __init__(
@override
async def get_metadata(self) -> ApifyKeyValueStoreMetadata:
metadata = await self._api_client.get()
return ApifyKeyValueStoreMetadata.model_validate(metadata)

if metadata is None:
raise ValueError('Failed to retrieve dataset metadata.')

return ApifyKeyValueStoreMetadata(
id=metadata.id,
name=metadata.name,
created_at=datetime.fromisoformat(metadata.created_at.replace('Z', '+00:00')),
modified_at=datetime.fromisoformat(metadata.modified_at.replace('Z', '+00:00')),
accessed_at=datetime.fromisoformat(metadata.accessed_at.replace('Z', '+00:00')),
url_signing_secret_key=metadata.url_signing_secret_key,
)

@classmethod
async def open(
Expand Down Expand Up @@ -143,14 +155,13 @@ async def iterate_keys(
count = 0

while True:
response = await self._api_client.list_keys(exclusive_start_key=exclusive_start_key)
list_key_page = KeyValueStoreListKeysPage.model_validate(response)
list_key_page = await self._api_client.list_keys(exclusive_start_key=exclusive_start_key)

for item in list_key_page.items:
# Convert KeyValueStoreKeyInfo to KeyValueStoreRecordMetadata
record_metadata = KeyValueStoreRecordMetadata(
key=item.key,
size=item.size,
size=int(item.size),
content_type='application/octet-stream', # Content type not available from list_keys
)
yield record_metadata
Expand Down
39 changes: 24 additions & 15 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime
from logging import getLogger
from typing import TYPE_CHECKING, Final, Literal

Expand All @@ -15,7 +16,7 @@
if TYPE_CHECKING:
from collections.abc import Sequence

from apify_client.clients import RequestQueueClientAsync
from apify_client._resource_clients import RequestQueueClientAsync
from crawlee import Request
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

Expand Down Expand Up @@ -77,26 +78,34 @@ async def get_metadata(self) -> ApifyRequestQueueMetadata:
Returns:
Request queue metadata with accurate counts and timestamps, combining API data with local estimates.
"""
response = await self._api_client.get()
metadata = await self._api_client.get()

if response is None:
if metadata is None:
raise ValueError('Failed to fetch request queue metadata from the API.')

total_request_count = int(metadata.total_request_count)
handled_request_count = int(metadata.handled_request_count)
pending_request_count = int(metadata.pending_request_count)
created_at = datetime.fromisoformat(metadata.created_at.replace('Z', '+00:00'))
modified_at = datetime.fromisoformat(metadata.modified_at.replace('Z', '+00:00'))
accessed_at = datetime.fromisoformat(metadata.accessed_at.replace('Z', '+00:00'))

# Enhance API response with local estimations to account for propagation delays (API data can be delayed
# by a few seconds, while local estimates are immediately accurate).
return ApifyRequestQueueMetadata(
id=response['id'],
name=response['name'],
total_request_count=max(response['totalRequestCount'], self._implementation.metadata.total_request_count),
handled_request_count=max(
response['handledRequestCount'], self._implementation.metadata.handled_request_count
id=metadata.id,
name=metadata.name,
total_request_count=max(total_request_count, self._implementation.metadata.total_request_count),
handled_request_count=max(handled_request_count, self._implementation.metadata.handled_request_count),
pending_request_count=pending_request_count,
created_at=min(created_at, self._implementation.metadata.created_at),
modified_at=max(modified_at, self._implementation.metadata.modified_at),
accessed_at=max(accessed_at, self._implementation.metadata.accessed_at),
had_multiple_clients=metadata.had_multiple_clients or self._implementation.metadata.had_multiple_clients,
stats=RequestQueueStats.model_validate(
metadata.stats.model_dump(by_alias=True) if metadata.stats else {},
by_alias=True,
),
pending_request_count=response['pendingRequestCount'],
created_at=min(response['createdAt'], self._implementation.metadata.created_at),
modified_at=max(response['modifiedAt'], self._implementation.metadata.modified_at),
accessed_at=max(response['accessedAt'], self._implementation.metadata.accessed_at),
had_multiple_clients=response['hadMultipleClients'] or self._implementation.metadata.had_multiple_clients,
stats=RequestQueueStats.model_validate(response['stats'], by_alias=True),
)

@classmethod
Expand Down Expand Up @@ -145,7 +154,7 @@ async def open(
raw_metadata = await api_client.get()
if raw_metadata is None:
raise ValueError('Failed to retrieve request queue metadata from the API.')
metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata)
metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata.model_dump(by_alias=True))

return cls(
api_client=api_client,
Expand Down
37 changes: 19 additions & 18 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Sequence

from apify_client.clients import RequestQueueClientAsync
from apify_client._resource_clients import RequestQueueClientAsync

logger = getLogger(__name__)

Expand Down Expand Up @@ -121,18 +121,17 @@ async def add_batch_of_requests(

if new_requests:
# Prepare requests for API by converting to dictionaries.
requests_dict = [
request.model_dump(
by_alias=True,
)
for request in new_requests
]
requests_dict = [request.model_dump(by_alias=True) for request in new_requests]

# Send requests to API.
api_response = AddRequestsResponse.model_validate(
await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront)
batch_response = await self._api_client.batch_add_requests(
requests=requests_dict,
forefront=forefront,
)

batch_response_dict = batch_response.model_dump(by_alias=True)
api_response = AddRequestsResponse.model_validate(batch_response_dict)

# Add the locally known already present processed requests based on the local cache.
api_response.processed_requests.extend(already_present_requests)

Expand Down Expand Up @@ -312,7 +311,8 @@ async def _get_request_by_id(self, request_id: str) -> Request | None:
if response is None:
return None

return Request.model_validate(response)
response_dict = response.model_dump(by_alias=True)
return Request.model_validate(response_dict)

async def _ensure_head_is_non_empty(self) -> None:
"""Ensure that the queue head has requests if they are available in the queue."""
Expand Down Expand Up @@ -388,7 +388,7 @@ async def _update_request(
)

return ProcessedRequest.model_validate(
{'uniqueKey': request.unique_key} | response,
{'uniqueKey': request.unique_key} | response.model_dump(by_alias=True),
)

async def _list_head(
Expand Down Expand Up @@ -431,19 +431,19 @@ async def _list_head(
self._should_check_for_forefront_requests = False

# Otherwise fetch from API
response = await self._api_client.list_and_lock_head(
list_and_lost_data = await self._api_client.list_and_lock_head(
lock_secs=int(self._DEFAULT_LOCK_TIME.total_seconds()),
limit=limit,
)

# Update the queue head cache
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
self._queue_has_locked_requests = list_and_lost_data.queue_has_locked_requests
# Check if there is another client working with the RequestQueue
self.metadata.had_multiple_clients = response.get('hadMultipleClients', False)
self.metadata.had_multiple_clients = list_and_lost_data.had_multiple_clients

for request_data in response.get('items', []):
request = Request.model_validate(request_data)
request_id = request_data.get('id')
for request_data in list_and_lost_data.items:
request = Request.model_validate(request_data.model_dump(by_alias=True))
request_id = request_data.id

# Skip requests without ID or unique key
if not request.unique_key or not request_id:
Expand Down Expand Up @@ -473,7 +473,8 @@ async def _list_head(
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
self._queue_head.append(leftover_id)

return RequestQueueHead.model_validate(response)
list_and_lost_dict = list_and_lost_data.model_dump(by_alias=True)
return RequestQueueHead.model_validate(list_and_lost_dict)

def _cache_request(
self,
Expand Down
Loading
Loading