Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class ProviderType(BetterEnum):

NONE = 0 #: no provider
SAGEMAKER = 1 #: AWS SageMaker
GCP = 2 #: GCP


def replace_enum_to_str(obj):
Expand Down
10 changes: 5 additions & 5 deletions jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def __init__(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -476,21 +476,21 @@ def __init__(
args = ArgNamespace.kwargs2namespace(kwargs, parser, True)
self.args = args
self._gateway_load_balancer = False
if self.args.provider == ProviderType.SAGEMAKER:
if self.args.provider in (ProviderType.SAGEMAKER, ProviderType.GCP):
if self._gateway_kwargs.get('port', 0) == 8080:
raise ValueError(
'Port 8080 is reserved for Sagemaker deployment. '
'Port 8080 is reserved for CSP deployment. '
'Please use another port'
)
if self.args.port != [8080]:
warnings.warn(
'Port is changed to 8080 for Sagemaker deployment. '
'Port is changed to 8080 for CSP deployment. '
f'Port {self.args.port} is ignored'
)
self.args.port = [8080]
if self.args.protocol != [ProtocolType.HTTP]:
warnings.warn(
'Protocol is changed to HTTP for Sagemaker deployment. '
'Protocol is changed to HTTP for CSP deployment. '
f'Protocol {self.args.protocol} is ignored'
)
self.args.protocol = [ProtocolType.HTTP]
Expand Down
12 changes: 6 additions & 6 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def __init__(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -464,7 +464,7 @@ def __init__(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -969,7 +969,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1132,7 +1132,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1396,7 +1396,7 @@ def config_gateway(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -1496,7 +1496,7 @@ def config_gateway(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down
2 changes: 1 addition & 1 deletion jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ def serve(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down
37 changes: 37 additions & 0 deletions jina/serve/runtimes/servers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,40 @@ def app(self):
cors=self.cors,
logger=self.logger,
)


class GCPHTTPServer(FastAPIBaseServer):
"""
:class:`GCPHTTPServer` is a FastAPIBaseServer that uses a custom FastAPI app for GCP endpoints

"""

@property
def port(self):
"""Get the port for the GCP server
:return: Return the port for the GCP server, always 8080"""
return 8080

@property
def ports(self):
"""Get the port for the GCP server
:return: Return the port for the GCP server, always 8080"""
return [8080]

@property
def app(self):
"""Get the GCP fastapi app
:return: Return a FastAPI app for the GCP container
"""
return self._request_handler._http_fastapi_gcp_app(
title=self.title,
description=self.description,
no_crud_endpoints=self.no_crud_endpoints,
no_debug_endpoints=self.no_debug_endpoints,
expose_endpoints=self.expose_endpoints,
expose_graphql_endpoint=self.expose_graphql_endpoint,
tracing=self.tracing,
tracer_provider=self.tracer_provider,
cors=self.cors,
logger=self.logger,
)
192 changes: 192 additions & 0 deletions jina/serve/runtimes/worker/http_gcp_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union, Any

from jina._docarray import docarray_v2
from jina.importer import ImportExtensions
from jina.types.request.data import DataRequest

if TYPE_CHECKING:
from jina.logging.logger import JinaLogger

if docarray_v2:
from docarray import BaseDoc, DocList


def get_fastapi_app(
request_models_map: Dict,
caller: Callable,
logger: 'JinaLogger',
cors: bool = False,
**kwargs,
):
"""
Get the app from FastAPI as the REST interface.

:param request_models_map: Map describing the endpoints and its Pydantic models
:param caller: Callable to be handled by the endpoints of the returned FastAPI app
:param logger: Logger object
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param kwargs: Extra kwargs to make it compatible with other methods
:return: fastapi app
"""
with ImportExtensions(required=True):
import pydantic
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from pydantic.config import BaseConfig, inherit_config

import os

from jina.proto import jina_pb2
from jina.serve.runtimes.gateway.models import _to_camel_case

if not docarray_v2:
logger.warning('Only docarray v2 is supported with Sagemaker. ')
return

class Header(BaseModel):
request_id: Optional[str] = Field(
description='Request ID', example=os.urandom(16).hex()
)

class Config(BaseConfig):
alias_generator = _to_camel_case
allow_population_by_field_name = True

class InnerConfig(BaseConfig):
alias_generator = _to_camel_case
allow_population_by_field_name = True

class VertexAIResponse(BaseModel):
predictions: Any = Field(
description='Prediction results',
)

app = FastAPI()

if cors:
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
logger.warning('CORS is enabled. This service is accessible from any website!')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why warning?


def add_post_route(
endpoint_path,
input_model,
output_model,
input_doc_list_model=None,
):
from docarray.base_doc.docarray_response import DocArrayResponse

app_kwargs = dict(
path=f'/{endpoint_path.strip("/")}',
methods=['POST'],
summary=f'Endpoint {endpoint_path}',
response_model=Union[output_model, List[output_model]],
response_class=DocArrayResponse,
)

def is_valid_csv(content: str) -> bool:
import csv
from io import StringIO

try:
f = StringIO(content)
reader = csv.DictReader(f)
for _ in reader:
pass

return True
except Exception:
return False

async def process(body) -> output_model:
req = DataRequest()
if body.header is not None:
req.header.request_id = body.header.request_id

if body.parameters is not None:
req.parameters = body.parameters
req.header.exec_endpoint = endpoint_path
req.document_array_cls = DocList[input_doc_model]

data = body.data
if isinstance(data, list):
req.data.docs = DocList[input_doc_list_model](data)
else:
req.data.docs = DocList[input_doc_list_model]([data])
if body.header is None:
req.header.request_id = req.docs[0].id

resp = await caller(req)
status = resp.header.status

if status.code == jina_pb2.StatusProto.ERROR:
raise HTTPException(status_code=499, detail=status.description)
else:
return {"predictions": resp.docs}
return output_model(predictions=resp.docs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the first return is not good


@app.api_route(**app_kwargs)
async def post(request: Request):
content_type = request.headers.get('content-type')
if content_type == 'application/json':
json_body = await request.json()
transformed_json_body = {"data": [{"text": instance} for instance in json_body["instances"]]}
return await process(input_model(**transformed_json_body))

elif content_type in ('text/csv', 'application/csv'):
# TODO: fix here
return await process(input_model(data=[]))
else:
raise HTTPException(
status_code=400,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use constants and not magic numbers.

detail=f'Invalid content-type: {content_type}. '
f'Please use either application/json or text/csv.',
)

for endpoint, input_output_map in request_models_map.items():
if endpoint != '_jina_dry_run_':
input_doc_model = input_output_map['input']['model']
parameters_model = input_output_map['parameters']['model'] or Optional[Dict]
default_parameters = (
... if input_output_map['parameters']['model'] else None
)

_config = inherit_config(InnerConfig, BaseDoc.__config__)
endpoint_input_model = pydantic.create_model(
f'{endpoint.strip("/")}_input_model',
data=(Union[List[input_doc_model], input_doc_model], ...),
parameters=(parameters_model, default_parameters),
header=(Optional[Header], None),
__config__=_config,
)

add_post_route(
endpoint,
input_model=endpoint_input_model,
output_model=VertexAIResponse,
input_doc_list_model=input_doc_model,
)

from jina.serve.runtimes.gateway.health_model import JinaHealthModel

# `/ping` route is required by AWS Sagemaker
@app.get(
path='/ping',
summary='Get the health of Jina Executor service',
response_model=JinaHealthModel,
)
async def _executor_health():
"""
Get the health of this Gateway service.
.. # noqa: DAR201

"""
return {}

return app
22 changes: 22 additions & 0 deletions jina/serve/runtimes/worker/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,28 @@ async def _shutdown():

return app

def _http_fastapi_gcp_app(self, **kwargs):
from jina.serve.runtimes.worker.http_gcp_app import get_fastapi_app

request_models_map = self._executor._get_endpoint_models_dict()

def call_handle(request):
is_generator = request_models_map[request.header.exec_endpoint][
'is_generator'
]

return self.process_single_data(request, None, is_generator=is_generator)

app = get_fastapi_app(
request_models_map=request_models_map, caller=call_handle, **kwargs
)

@app.on_event('shutdown')
async def _shutdown():
await self.close()

return app

async def _hot_reload(self):
import inspect

Expand Down
7 changes: 7 additions & 0 deletions tests/integration/docarray_v2/gcp/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jinaai/jina:test-pip

COPY . /executor_root/

WORKDIR /executor_root/SampleExecutor

ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]
2 changes: 2 additions & 0 deletions tests/integration/docarray_v2/gcp/SampleExecutor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SampleExecutor

8 changes: 8 additions & 0 deletions tests/integration/docarray_v2/gcp/SampleExecutor/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
jtype: SampleExecutor
py_modules:
- executor.py
metas:
name: SampleExecutor
description:
url:
keywords: []
Loading