from __future__ import annotations
import typing as t
from bentoml._internal.types import LazyType
from .io_models import IODescriptor
from .method import APIMethod
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
R = t.TypeVar("R")
T = t.TypeVar("T", bound="APIMethod[..., t.Any]")
if t.TYPE_CHECKING:
from fastapi import FastAPI # noqa: F401
from bentoml._internal.external_typing import ASGIApp
P = t.ParamSpec("P")
else:
P = t.TypeVar("P")
def on_shutdown(func: F) -> F:
"""Mark a method as a shutdown hook for the service."""
setattr(func, "__bentoml_shutdown_hook__", True)
return func
def on_deployment(func: t.Callable[P, R] | staticmethod[P, R]) -> staticmethod[P, R]:
inner = func.__func__ if isinstance(func, staticmethod) else func
setattr(inner, "__bentoml_deployment_hook__", True)
return func if isinstance(func, staticmethod) else staticmethod(func) # type: ignore
@t.overload
def api(func: t.Callable[t.Concatenate[t.Any, P], R]) -> APIMethod[P, R]: ...
@t.overload
def api(
*,
route: str | None = ...,
name: str | None = ...,
input_spec: type[IODescriptor] | None = ...,
output_spec: type[IODescriptor] | None = ...,
batchable: bool = ...,
batch_dim: int | tuple[int, int] = ...,
max_batch_size: int = ...,
max_latency_ms: int = ...,
) -> t.Callable[[t.Callable[t.Concatenate[t.Any, P], R]], APIMethod[P, R]]: ...
[docs]
def api(
func: t.Callable[t.Concatenate[t.Any, P], R] | None = None,
*,
route: str | None = None,
name: str | None = None,
input_spec: type[IODescriptor] | None = None,
output_spec: type[IODescriptor] | None = None,
batchable: bool = False,
batch_dim: int | tuple[int, int] = 0,
max_batch_size: int = 100,
max_latency_ms: int = 60000,
) -> (
APIMethod[P, R]
| t.Callable[[t.Callable[t.Concatenate[t.Any, P], R]], APIMethod[P, R]]
):
"""Make a BentoML API method.
This decorator can be used either with or without arguments.
Args:
func: The function to be wrapped.
route: The route of the API. e.g. "/predict"
name: The name of the API.
input_spec: The input spec of the API, should be a subclass of ``pydantic.BaseModel``.
output_spec: The output spec of the API, should be a subclass of ``pydantic.BaseModel``.
batchable: Whether the API is batchable.
batch_dim: The batch dimension of the API.
max_batch_size: The maximum batch size of the API.
max_latency_ms: The maximum latency of the API.
"""
def wrapper(func: t.Callable[t.Concatenate[t.Any, P], R]) -> APIMethod[P, R]:
if func.__name__.startswith("__"):
raise ValueError("API methods cannot start with '__'")
params: dict[str, t.Any] = {
"batchable": batchable,
"batch_dim": batch_dim,
"max_batch_size": max_batch_size,
"max_latency_ms": max_latency_ms,
}
if route is not None:
params["route"] = route
if input_spec is not None:
params["input_spec"] = input_spec
if output_spec is not None:
params["output_spec"] = output_spec
return APIMethod(func, **params)
if func is not None:
return wrapper(func)
return wrapper
def asgi_app(
app: ASGIApp, *, path: str = "/", name: str | None = None
) -> t.Callable[[R], R]:
"""Mount an ASGI app to the service.
Args:
app: The ASGI app to be mounted.
path: The path to mount the app.
name: The name of the app.
"""
from ._internals import make_fastapi_class_views
from .service import Service
def decorator(obj: R) -> R:
lazy_fastapi = LazyType["FastAPI"]("fastapi.FastAPI")
if isinstance(obj, Service):
obj.mount_asgi_app(app, path=path, name=name)
if lazy_fastapi.isinstance(app):
make_fastapi_class_views(obj.inner, app)
else:
mount_apps = getattr(obj, "__bentoml_mounted_apps__", [])
mount_apps.append((app, path, name))
setattr(obj, "__bentoml_mounted_apps__", mount_apps)
if lazy_fastapi.isinstance(app):
make_fastapi_class_views(obj, app)
return obj
return decorator
@t.overload
def task(func: t.Callable[t.Concatenate[t.Any, P], R]) -> APIMethod[P, R]: ...
@t.overload
def task(
*,
route: str | None = ...,
name: str | None = ...,
input_spec: type[IODescriptor] | None = ...,
output_spec: type[IODescriptor] | None = ...,
batchable: bool = ...,
batch_dim: int | tuple[int, int] = ...,
max_batch_size: int = ...,
max_latency_ms: int = ...,
) -> t.Callable[[t.Callable[t.Concatenate[t.Any, P], R]], APIMethod[P, R]]: ...
[docs]
def task(
func: t.Callable[t.Concatenate[t.Any, P], R] | None = None,
*,
route: str | None = None,
name: str | None = None,
input_spec: type[IODescriptor] | None = None,
output_spec: type[IODescriptor] | None = None,
batchable: bool = False,
batch_dim: int | tuple[int, int] = 0,
max_batch_size: int = 100,
max_latency_ms: int = 60000,
) -> (
APIMethod[P, R]
| t.Callable[[t.Callable[t.Concatenate[t.Any, P], R]], APIMethod[P, R]]
):
"""Mark a method as a BentoML async task.
This decorator can be used either with or without arguments.
Args:
func: The function to be wrapped.
route: The route of the API. e.g. "/predict"
name: The name of the API.
input_spec: The input spec of the API, should be a subclass of ``pydantic.BaseModel``.
output_spec: The output spec of the API, should be a subclass of ``pydantic.BaseModel``.
batchable: Whether the API is batchable.
batch_dim: The batch dimension of the API.
max_batch_size: The maximum batch size of the API.
max_latency_ms: The maximum latency of the API.
"""
def wrapper(func: t.Callable[t.Concatenate[t.Any, P], R]) -> APIMethod[P, R]:
if func.__name__.startswith("__"):
raise ValueError("API methods cannot start with '__'")
params: dict[str, t.Any] = {
"batchable": batchable,
"batch_dim": batch_dim,
"max_batch_size": max_batch_size,
"max_latency_ms": max_latency_ms,
}
if route is not None:
params["route"] = route
if input_spec is not None:
params["input_spec"] = input_spec
if output_spec is not None:
params["output_spec"] = output_spec
meth = APIMethod(func, **params, is_task=True)
if meth.is_stream:
raise ValueError("Async task cannot return a stream.")
return meth
if func is not None:
return wrapper(func)
return wrapper