Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# crowdsec_service_api

**crowdsec_service_api** is a Python SDK for the [CrowdSec Service API](https://docs.crowdsec.net/u/service_api/intro/).
**crowdsec_service_api** is a Python SDK for the [CrowdSec Service API](https://docs.crowdsec.net/u/console/service_api/getting_started).
This library enables you to manage CrowdSec resources such as blocklists, integrations in your python applications.

## Installation
Expand All @@ -11,10 +11,11 @@ pip install crowdsec_service_api

## Usage

You can follow this [documentation](https://docs.crowdsec.net/u/service_api/quickstart/blocklists) to see the basic usage of the SDK.
You can follow this [documentation](https://docs.crowdsec.net/u/console/service_api/sdks/python) to see the basic usage of the SDK.

## Documentation
You can access the full usage documentation [here](https://github.com/crowdsecurity/crowdsec-service-api-sdk-python/tree/main/doc).
You can access [the quickstart guide here](https://docs.crowdsec.net/u/console/service_api/quickstart/authentication).
Or you have the full usage documentation [here](https://github.com/crowdsecurity/crowdsec-service-api-sdk-python/tree/main/doc).

## Contributing

Expand Down
20 changes: 2 additions & 18 deletions crowdsec_service_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from .services.info import Info
from .services.metrics import Metrics
from .services.hub import Hub
from .services.cves import Cves
from .http_client import ApiKeyAuth

class Server(Enum):
production_server = 'https://admin.api.crowdsec.net/v1/'
production_server = 'https://admin.api.crowdsec.net/v1'

__all__ = [
'Allowlists',
Expand All @@ -20,7 +19,6 @@ class Server(Enum):
'Info',
'Metrics',
'Hub',
'Cves',
'AllowlistCreateRequest',
'AllowlistCreateResponse',
'AllowlistGetItemsResponse',
Expand Down Expand Up @@ -61,6 +59,7 @@ class Server(Enum):
'BlocklistUpdateRequest',
'BlocklistUsageStats',
'Body_uploadBlocklistContent',
'CVESubscription',
'ComputedMetrics',
'ComputedSavedMetrics',
'CtiAs',
Expand Down Expand Up @@ -106,21 +105,6 @@ class Server(Enum):
'PostoverflowIndex',
'ScenarioIndex',
'VersionDetail',
'AffectedComponent',
'AttackDetail',
'Behavior',
'Classification',
'Classifications',
'GetCVEIPsResponsePage',
'GetCVEResponse',
'History',
'IPItem',
'Location',
'MitreTechnique',
'Reference',
'ScoreBreakdown',
'Scores',
'SubscribeCVEIntegrationRequest',
'ApiKeyAuth',
'Server',
'Page'
Expand Down
Binary file modified crowdsec_service_api/__pycache__/base_model.cpython-311.pyc
Binary file not shown.
Binary file modified crowdsec_service_api/__pycache__/http_client.cpython-311.pyc
Binary file not shown.
42 changes: 26 additions & 16 deletions crowdsec_service_api/base_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from urllib.parse import urlparse
from pydantic import BaseModel, ConfigDict
from typing import Generic, Sequence, Optional, TypeVar
from pydantic import BaseModel, ConfigDict, PrivateAttr, RootModel
from typing import Generic, Sequence, Optional, TypeVar, Any
from httpx import Auth
from .http_client import HttpClient

Expand All @@ -9,42 +9,52 @@ class BaseModelSdk(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
_client: Optional["Service"] = PrivateAttr(default=None)

def __init__(self, /, _client: "Service" = None, **data):
super().__init__(**data)
self._client = _client

def next(self, client: "Service" = None) -> Optional["BaseModelSdk"]:
return (client if client is not None else self._client).next_page(self)


class RootModelSdk(RootModel):
def __getattr__(self, item: str) -> Any:
return getattr(self.root, item)


T = TypeVar("T")


class Page(BaseModelSdk, Generic[T]):
_client: "Service"
items: Sequence[T]
total: Optional[int]
page: Optional[int]
size: Optional[int]
pages: Optional[int] = None
links: Optional[dict] = None

def __init__(self, _client: "Service", **data):
super().__init__(**data)
self._client = _client

def next(self, client: "Service" = None) -> "Page[T]":
return (client if client is not None else self._client).next_page(self)


class Service:
def __init__(self, base_url: str, auth: Auth) -> None:
self.http_client = HttpClient(base_url=base_url, auth=auth)
def __init__(self, base_url: str, auth: Auth, user_agent: str = None) -> None:
self.http_client = HttpClient(
base_url=base_url, auth=auth, user_agent=user_agent
)

def next_page(self, page: Page[T]) -> Page[T]:
if not page.links:
def next_page(self, page: BaseModelSdk) -> Optional[BaseModelSdk]:
if not hasattr(page, "links") or not page.links:
raise ValueError(
"No links found in the response, this is not a paginated response."
)
if page.links.get("next"):
if page.links.next:
# links are relative to host not to full base url. We need to pass a full formatted url here
parsed_url = urlparse(self.http_client.base_url)
response = self.http_client.get(
f"{parsed_url.scheme}://{parsed_url.netloc}{page.links['next']}", path_params=None, params=None, headers=None
f"{parsed_url.scheme}://{parsed_url.netloc}{page.links.next}",
path_params=None,
params=None,
headers=None,
)
return page.__class__(_client=self, **response.json())
return None
13 changes: 11 additions & 2 deletions crowdsec_service_api/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,20 @@ def auth_flow(self, request):


class HttpClient:
def __init__(self, base_url: str, auth: httpx.Auth, aws_region="eu-west-1") -> None:
def __init__(
self,
base_url: str,
auth: httpx.Auth,
user_agent: str = None,
aws_region="eu-west-1",
) -> None:
self.aws_region = aws_region
self.base_url = base_url
self.auth = auth
self.client = httpx.Client()
headers = {"Accept-Encoding": "gzip"}
if user_agent:
headers["User-Agent"] = user_agent
self.client = httpx.Client(headers=headers)
self.timeout = 30

def _replace_path_params(self, url: str, path_params: dict):
Expand Down
Loading