Skip to content

API reference

Generated from the installed package. For the full list of re-exports, see FasterAPI/__init__.py.

FasterAPI

FasterAPI — A high-performance ASGI web framework.

Drop-in FastAPI replacement powered by msgspec (C extension JSON), radix-tree routing, uvloop, and Python 3.13 sub-interpreters.

BackgroundTask

A single background task to be executed after a response is sent.

Source code in FasterAPI/background.py
class BackgroundTask:
    """A single background task to be executed after a response is sent."""

    __slots__ = ("func", "args", "kwargs")

    def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs

    async def run(self) -> None:
        """Execute the background task."""
        if is_coroutine(self.func):
            await self.func(*self.args, **self.kwargs)
        else:
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, self._run_sync)

    def _run_sync(self) -> None:
        self.func(*self.args, **self.kwargs)

run() async

Execute the background task.

Source code in FasterAPI/background.py
async def run(self) -> None:
    """Execute the background task."""
    if is_coroutine(self.func):
        await self.func(*self.args, **self.kwargs)
    else:
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self._run_sync)

BackgroundTasks

A collection of background tasks to be executed after a response is sent.

Source code in FasterAPI/background.py
class BackgroundTasks:
    """A collection of background tasks to be executed after a response is sent."""

    def __init__(self) -> None:
        self._tasks: list[BackgroundTask] = []

    def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
        """Add a new background task to the collection."""
        self._tasks.append(BackgroundTask(func, *args, **kwargs))

    async def run(self) -> None:
        """Execute all background tasks sequentially."""
        for task in self._tasks:
            await task.run()

add_task(func, *args, **kwargs)

Add a new background task to the collection.

Source code in FasterAPI/background.py
def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
    """Add a new background task to the collection."""
    self._tasks.append(BackgroundTask(func, *args, **kwargs))

run() async

Execute all background tasks sequentially.

Source code in FasterAPI/background.py
async def run(self) -> None:
    """Execute all background tasks sequentially."""
    for task in self._tasks:
        await task.run()

BaseHTTPMiddleware

Base class for HTTP middleware that wraps an ASGI application.

Source code in FasterAPI/middleware.py
class BaseHTTPMiddleware:
    """Base class for HTTP middleware that wraps an ASGI application."""

    def __init__(self, app: ASGIApp) -> None:
        self.app = app

    async def __call__(self, scope: dict[str, Any], receive: ASGIApp, send: ASGIApp) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        await self.dispatch(scope, receive, send)

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        """Process the request. Override this method in subclasses."""

        async def call_next() -> None:
            await self.app(scope, receive, send)

        await call_next()

dispatch(scope, receive, send) async

Process the request. Override this method in subclasses.

Source code in FasterAPI/middleware.py
async def dispatch(
    self,
    scope: dict[str, Any],
    receive: ASGIApp,
    send: ASGIApp,
) -> None:
    """Process the request. Override this method in subclasses."""

    async def call_next() -> None:
        await self.app(scope, receive, send)

    await call_next()

Body

Declare a request body parameter with optional default and embed mode.

Source code in FasterAPI/params.py
class Body:
    """Declare a request body parameter with optional default and embed mode."""

    __slots__ = ("default", "description", "embed")

    def __init__(
        self,
        default: Any = _MISSING,
        *,
        description: str = "",
        embed: bool = False,
    ) -> None:
        self.default = default
        self.description = description
        self.embed = embed

    def __repr__(self) -> str:
        if self.default is _MISSING:
            return "Body()"
        return f"Body(default={self.default!r})"

CORSMiddleware

Bases: BaseHTTPMiddleware

Middleware that handles Cross-Origin Resource Sharing (CORS) headers.

Source code in FasterAPI/middleware.py
class CORSMiddleware(BaseHTTPMiddleware):
    """Middleware that handles Cross-Origin Resource Sharing (CORS) headers."""

    def __init__(
        self,
        app: ASGIApp,
        *,
        allow_origins: Sequence[str] = ("*",),
        allow_methods: Sequence[str] = ("*",),
        allow_headers: Sequence[str] = ("*",),
        allow_credentials: bool = False,
        max_age: int = 600,
        expose_headers: Sequence[str] = (),
    ) -> None:
        super().__init__(app)
        self.allow_origins = set(allow_origins)
        self.allow_methods = set(allow_methods)
        self.allow_headers = set(allow_headers)
        self.allow_credentials = allow_credentials
        self.max_age = max_age
        self.expose_headers = set(expose_headers)
        self.allow_all_origins = "*" in self.allow_origins
        self.allow_all_methods = "*" in self.allow_methods
        self.allow_all_headers = "*" in self.allow_headers

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        """Handle CORS preflight requests and inject CORS headers into responses."""
        headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
        request_headers = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in headers_raw}
        origin = request_headers.get("origin")
        method = scope.get("method", "GET")

        # Preflight
        if method == "OPTIONS" and "access-control-request-method" in request_headers:
            await self._preflight_response(send, origin, request_headers)
            return

        # Normal request — intercept send to inject CORS headers
        cors_headers = self._build_cors_headers(origin)

        async def send_with_cors(message: dict[str, Any]) -> None:
            if message["type"] == "http.response.start":
                existing = list(message.get("headers", []))
                existing.extend(cors_headers)
                message = {**message, "headers": existing}
            await send(message)

        await self.app(scope, receive, send_with_cors)

    def _origin_allowed(self, origin: str | None) -> bool:
        if origin is None:
            return False
        if self.allow_all_origins:
            return True
        return origin in self.allow_origins

    def _build_cors_headers(self, origin: str | None) -> list[tuple[bytes, bytes]]:
        if not self._origin_allowed(origin):
            return []

        headers: list[tuple[bytes, bytes]] = []
        if self.allow_all_origins and not self.allow_credentials:
            headers.append((b"access-control-allow-origin", b"*"))
        elif origin:
            headers.append((b"access-control-allow-origin", origin.encode("latin-1")))
            headers.append((b"vary", b"Origin"))

        if self.allow_credentials:
            headers.append((b"access-control-allow-credentials", b"true"))

        if self.expose_headers:
            headers.append(
                (
                    b"access-control-expose-headers",
                    ", ".join(self.expose_headers).encode("latin-1"),
                )
            )
        return headers

    async def _preflight_response(
        self,
        send: ASGIApp,
        origin: str | None,
        request_headers: dict[str, str],
    ) -> None:
        headers: list[tuple[bytes, bytes]] = []

        if self._origin_allowed(origin):
            if self.allow_all_origins and not self.allow_credentials:
                headers.append((b"access-control-allow-origin", b"*"))
            elif origin:
                headers.append((b"access-control-allow-origin", origin.encode("latin-1")))
                headers.append((b"vary", b"Origin"))

            # Methods
            if self.allow_all_methods:
                req_method = request_headers.get("access-control-request-method", "")
                headers.append((b"access-control-allow-methods", req_method.encode("latin-1")))
            else:
                headers.append(
                    (
                        b"access-control-allow-methods",
                        ", ".join(self.allow_methods).encode("latin-1"),
                    )
                )

            # Headers
            if self.allow_all_headers:
                req_headers = request_headers.get("access-control-request-headers", "")
                headers.append((b"access-control-allow-headers", req_headers.encode("latin-1")))
            else:
                headers.append(
                    (
                        b"access-control-allow-headers",
                        ", ".join(self.allow_headers).encode("latin-1"),
                    )
                )

            if self.allow_credentials:
                headers.append((b"access-control-allow-credentials", b"true"))

            headers.append((b"access-control-max-age", str(self.max_age).encode()))

        await send(
            {
                "type": "http.response.start",
                "status": 200,
                "headers": headers,
            }
        )
        await send({"type": "http.response.body", "body": b""})

dispatch(scope, receive, send) async

Handle CORS preflight requests and inject CORS headers into responses.

Source code in FasterAPI/middleware.py
async def dispatch(
    self,
    scope: dict[str, Any],
    receive: ASGIApp,
    send: ASGIApp,
) -> None:
    """Handle CORS preflight requests and inject CORS headers into responses."""
    headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
    request_headers = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in headers_raw}
    origin = request_headers.get("origin")
    method = scope.get("method", "GET")

    # Preflight
    if method == "OPTIONS" and "access-control-request-method" in request_headers:
        await self._preflight_response(send, origin, request_headers)
        return

    # Normal request — intercept send to inject CORS headers
    cors_headers = self._build_cors_headers(origin)

    async def send_with_cors(message: dict[str, Any]) -> None:
        if message["type"] == "http.response.start":
            existing = list(message.get("headers", []))
            existing.extend(cors_headers)
            message = {**message, "headers": existing}
        await send(message)

    await self.app(scope, receive, send_with_cors)

Cookie

Declare a cookie parameter with an optional default value.

Source code in FasterAPI/params.py
class Cookie:
    """Declare a cookie parameter with an optional default value."""

    __slots__ = ("default",)

    def __init__(self, default: Any = None) -> None:
        self.default = default

    def __repr__(self) -> str:
        return f"Cookie(default={self.default!r})"

Depends

Declare a dependency to be resolved and injected into a route handler.

Source code in FasterAPI/dependencies.py
class Depends:
    """Declare a dependency to be resolved and injected into a route handler."""

    __slots__ = ("dependency", "use_cache")

    def __init__(self, dependency: Callable[..., Any], *, use_cache: bool = True) -> None:
        self.dependency = dependency
        self.use_cache = use_cache

    def __repr__(self) -> str:
        return f"Depends({self.dependency.__name__})"

Faster

The main FasterAPI application class, implementing the ASGI interface.

Source code in FasterAPI/app.py
class Faster:
    """The main FasterAPI application class, implementing the ASGI interface."""

    __slots__ = (
        "title",
        "version",
        "description",
        "openapi_url",
        "docs_url",
        "redoc_url",
        "routes",
        "startup_handlers",
        "shutdown_handlers",
        "middleware",
        "exception_handlers",
        "_router",
        "_openapi_cache",
        "_middleware_app",
        "_ws_routes",
    )

    def __init__(
        self,
        *,
        title: str = "FasterAPI",
        version: str | None = None,
        description: str = "",
        openapi_url: str | None = "/openapi.json",
        docs_url: str | None = "/docs",
        redoc_url: str | None = "/redoc",
    ) -> None:
        self.title = title
        self.version = version if version is not None else get_version()
        self.description = description
        self.openapi_url = openapi_url
        self.docs_url = docs_url
        self.redoc_url = redoc_url
        self.routes: list[dict[str, Any]] = []
        self.startup_handlers: list[ASGIApp] = []
        self.shutdown_handlers: list[ASGIApp] = []
        self.middleware: list[dict[str, Any]] = []
        self.exception_handlers: dict[type, ASGIApp] = {}
        self._router = RadixRouter()
        self._openapi_cache: dict[str, Any] | None = None
        self._middleware_app: ASGIApp | None = None
        self._ws_routes: dict[str, ASGIApp] = {}
        self._setup_openapi_routes()

    def __repr__(self) -> str:
        return f"<Faster routes={len(self.routes)}>"

    # ------------------------------------------------------------------
    #  OpenAPI auto-routes
    # ------------------------------------------------------------------

    def _setup_openapi_routes(self) -> None:
        if self.openapi_url is not None:
            api_url = self.openapi_url
            app_ref = self

            async def openapi_schema() -> JSONResponse:
                spec = generate_openapi(
                    app_ref,
                    title=app_ref.title,
                    version=app_ref.version,
                    description=app_ref.description,
                )
                return JSONResponse(spec)

            self._add_route(
                "GET",
                api_url,
                openapi_schema,
                tags=["openapi"],
                summary="OpenAPI Schema",
                response_model=None,
                status_code=200,
                deprecated=False,
            )

        if self.docs_url is not None and self.openapi_url is not None:
            ourl, t = self.openapi_url, self.title

            async def swagger_docs() -> HTMLResponse:
                return HTMLResponse(swagger_ui_html(ourl, title=f"{t} - Swagger UI"))

            self._add_route(
                "GET",
                self.docs_url,
                swagger_docs,
                tags=["openapi"],
                summary="Swagger UI",
                response_model=None,
                status_code=200,
                deprecated=False,
            )

        if self.redoc_url is not None and self.openapi_url is not None:
            ourl, t = self.openapi_url, self.title

            async def redoc_docs() -> HTMLResponse:
                return HTMLResponse(redoc_html(ourl, title=f"{t} - ReDoc"))

            self._add_route(
                "GET",
                self.redoc_url,
                redoc_docs,
                tags=["openapi"],
                summary="ReDoc",
                response_model=None,
                status_code=200,
                deprecated=False,
            )

    # ------------------------------------------------------------------
    #  ASGI interface
    # ------------------------------------------------------------------

    async def __call__(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        if self.middleware:
            if self._middleware_app is None:
                self._middleware_app = self._build_middleware_chain()
            await self._middleware_app(scope, receive, send)
        else:
            await self._asgi_app(scope, receive, send)

    async def _asgi_app(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        scope_type = scope["type"]
        if scope_type == "http":
            await self._handle_http(scope, receive, send)
        elif scope_type == "websocket":
            await self._handle_websocket(scope, receive, send)
        elif scope_type == "lifespan":
            await self._handle_lifespan(scope, receive, send)

    def _build_middleware_chain(self) -> ASGIApp:
        app = self._asgi_app
        for entry in reversed(self.middleware):
            app = entry["class"](app, **entry["kwargs"])
        return app

    # ------------------------------------------------------------------
    #  HTTP dispatch — hot path
    # ------------------------------------------------------------------

    async def _handle_http(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        result = self._router.resolve(scope["method"], scope["path"])
        if result is None:
            await _send_error(send, 404, "Not Found")
            return

        handler, path_params, metadata = result
        scope["path_params"] = path_params
        request = Request(scope, receive)
        bg_tasks = None

        try:
            response, bg_tasks = await _resolve_handler(handler, request, path_params)
        except RequestValidationError as exc:
            status, body, headers = await self._handle_exc(
                request,
                exc,
                RequestValidationError,
                _default_validation_exception_handler,
            )
            await _send_raw(send, status, body, headers)
            return
        except HTTPException as exc:
            status, body, headers = await self._handle_exc(
                request,
                exc,
                HTTPException,
                _default_http_exception_handler,
            )
            await _send_raw(send, status, body, headers)
            return
        except Exception as exc:
            for exc_class in type(exc).__mro__:
                if exc_class in self.exception_handlers:
                    response = self.exception_handlers[exc_class](request, exc)
                    if asyncio.iscoroutine(response):
                        response = await response
                    break
            else:
                await _send_error(send, 500, "Internal Server Error")
                return

        await _send_response(send, metadata.get("status_code", 200), response)

        if bg_tasks is not None:
            await bg_tasks.run()

    async def _handle_exc(
        self,
        request: Request,
        exc: Exception,
        exc_class: type,
        default_handler: ASGIApp,
    ) -> tuple[int, bytes, list[tuple[bytes, bytes]]]:
        handler = self.exception_handlers.get(exc_class, default_handler)
        result = handler(request, exc)
        if asyncio.iscoroutine(result):
            result = await result
        return cast(tuple[int, bytes, list[tuple[bytes, bytes]]], result)

    # ------------------------------------------------------------------
    #  WebSocket dispatch
    # ------------------------------------------------------------------

    async def _handle_websocket(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        path = scope.get("path", "/")
        handler = self._ws_routes.get(path.rstrip("/") or "/")
        if handler is None:
            await send({"type": "websocket.close", "code": 4004})
            return
        ws = WebSocket(scope, receive, send)
        await handler(ws)

    # ------------------------------------------------------------------
    #  Lifespan
    # ------------------------------------------------------------------

    async def _handle_lifespan(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        while True:
            message = await receive()
            if message["type"] == "lifespan.startup":
                try:
                    for h in self.startup_handlers:
                        r = h()
                        if asyncio.iscoroutine(r):
                            await r
                    await send({"type": "lifespan.startup.complete"})
                except Exception as exc:
                    await send({"type": "lifespan.startup.failed", "message": str(exc)})
                    return
            elif message["type"] == "lifespan.shutdown":
                try:
                    for h in self.shutdown_handlers:
                        r = h()
                        if asyncio.iscoroutine(r):
                            await r
                    await send({"type": "lifespan.shutdown.complete"})
                except Exception:
                    pass
                return

    # ------------------------------------------------------------------
    #  Route registration
    # ------------------------------------------------------------------

    def _add_route(
        self,
        method: str,
        path: str,
        handler: ASGIApp,
        *,
        tags: list[str],
        summary: str,
        response_model: Any,
        status_code: int,
        deprecated: bool,
    ) -> None:
        metadata = {
            "tags": tags,
            "summary": summary,
            "response_model": response_model,
            "status_code": status_code,
            "deprecated": deprecated,
        }
        self.routes.append({"method": method, "path": path, "handler": handler, **metadata})
        self._router.add_route(method, path, handler, metadata)
        compile_handler(handler)  # pre-compile at registration time

    def _route_decorator(self, method: str, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route(
                method,
                path,
                handler,
                tags=kw.get("tags") or [],
                summary=kw.get("summary", ""),
                response_model=kw.get("response_model"),
                status_code=kw.get("status_code", 200),
                deprecated=kw.get("deprecated", False),
            )
            return handler

        return decorator

    def get(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("GET", path, **kw)

    def post(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("POST", path, **kw)

    def put(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("PUT", path, **kw)

    def delete(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("DELETE", path, **kw)

    def patch(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("PATCH", path, **kw)

    def websocket(self, path: str) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._ws_routes[path.rstrip("/") or "/"] = handler
            return handler

        return decorator

    # ------------------------------------------------------------------
    #  Lifecycle hooks
    # ------------------------------------------------------------------

    def on_startup(self, handler: ASGIApp) -> ASGIApp:
        self.startup_handlers.append(handler)
        return handler

    def on_shutdown(self, handler: ASGIApp) -> ASGIApp:
        self.shutdown_handlers.append(handler)
        return handler

    # ------------------------------------------------------------------
    #  Middleware & exception handlers
    # ------------------------------------------------------------------

    def add_middleware(self, middleware_class: type, **kwargs: Any) -> None:
        self.middleware.append({"class": middleware_class, "kwargs": kwargs})
        self._middleware_app = None  # invalidate cached chain

    def add_exception_handler(self, exc_class: type, handler: ASGIApp) -> None:
        self.exception_handlers[exc_class] = handler

    # ------------------------------------------------------------------
    #  Router inclusion
    # ------------------------------------------------------------------

    def include_router(
        self,
        router: Any,
        *,
        prefix: str = "",
        tags: Sequence[str] = (),
    ) -> None:
        pfx = prefix.rstrip("/")
        for route in router.routes:
            merged = dict(route)
            merged["path"] = pfx + merged["path"]
            merged["tags"] = list(tags) + merged["tags"]
            self.routes.append(merged)
            metadata = {k: v for k, v in merged.items() if k not in ("method", "path", "handler")}
            self._router.add_route(merged["method"], merged["path"], merged["handler"], metadata)
            compile_handler(merged["handler"])

FasterRouter

API router for grouping routes with a common prefix and tags.

Source code in FasterAPI/router.py
class FasterRouter:
    """API router for grouping routes with a common prefix and tags."""

    __slots__ = ("prefix", "tags", "routes")

    def __init__(self, prefix: str = "", tags: list[str] | None = None) -> None:
        self.prefix = prefix.rstrip("/")
        self.tags: list[str] = tags or []
        self.routes: list[dict[str, Any]] = []

    def _add_route(
        self,
        method: str,
        path: str,
        handler: ASGIApp,
        *,
        tags: list[str],
        summary: str,
        response_model: Any,
        status_code: int,
        deprecated: bool,
    ) -> None:
        full_path = self.prefix + path
        self.routes.append(
            {
                "method": method,
                "path": full_path,
                "handler": handler,
                "tags": self.tags + tags,
                "summary": summary,
                "response_model": response_model,
                "status_code": status_code,
                "deprecated": deprecated,
            }
        )

    # Decorator factories — identical API to Faster app

    def get(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("GET", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def post(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("POST", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def put(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("PUT", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def delete(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("DELETE", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def patch(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("PATCH", path, handler, **_route_kw(kw))
            return handler

        return decorator

File

Declare a file upload parameter.

Source code in FasterAPI/params.py
class File:
    """Declare a file upload parameter."""

    __slots__ = ("description",)

    def __init__(self, *, description: str = "") -> None:
        self.description = description

    def __repr__(self) -> str:
        return "File()"

FileResponse

Response that sends a file as an attachment.

Source code in FasterAPI/response.py
class FileResponse:
    """Response that sends a file as an attachment."""

    def __init__(
        self,
        path: str | Path,
        filename: str | None = None,
        media_type: str | None = None,
        headers: dict[str, str] | None = None,
        status_code: int = 200,
    ) -> None:
        self.path = Path(path)
        self.filename = filename or self.path.name
        self.status_code = status_code
        self.headers = headers or {}
        if media_type is not None:
            self.media_type = media_type
        else:
            mt, _ = mimetypes.guess_type(str(self.path))
            self.media_type = mt or "application/octet-stream"

    def _build_headers(self) -> list[tuple[bytes, bytes]]:
        raw: list[tuple[bytes, bytes]] = [
            (b"content-type", self.media_type.encode("latin-1")),
            (
                b"content-disposition",
                f'attachment; filename="{self.filename}"'.encode("latin-1"),
            ),
        ]
        for key, value in self.headers.items():
            raw.append((key.lower().encode("latin-1"), value.encode("latin-1")))
        return raw

    async def to_asgi(self, send: ASGIApp) -> None:
        """Read the file and send it through the ASGI interface."""
        content = await asyncio.get_running_loop().run_in_executor(
            None,
            self.path.read_bytes,
        )
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self._build_headers(),
            }
        )
        await send({"type": "http.response.body", "body": content})

to_asgi(send) async

Read the file and send it through the ASGI interface.

Source code in FasterAPI/response.py
async def to_asgi(self, send: ASGIApp) -> None:
    """Read the file and send it through the ASGI interface."""
    content = await asyncio.get_running_loop().run_in_executor(
        None,
        self.path.read_bytes,
    )
    await send(
        {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self._build_headers(),
        }
    )
    await send({"type": "http.response.body", "body": content})

Form

Declare a form field parameter with an optional default value.

Source code in FasterAPI/params.py
class Form:
    """Declare a form field parameter with an optional default value."""

    __slots__ = ("default", "description")

    def __init__(self, default: Any = _MISSING, *, description: str = "") -> None:
        self.default = default
        self.description = description

    def __repr__(self) -> str:
        if self.default is _MISSING:
            return "Form()"
        return f"Form(default={self.default!r})"

FormData

Bases: dict[str, Any]

Dict subclass for form data that may contain UploadFile values.

Source code in FasterAPI/datastructures.py
class FormData(dict[str, Any]):
    """Dict subclass for form data that may contain UploadFile values."""

    async def close(self) -> None:
        """Close all UploadFile instances in this form data."""
        for value in self.values():
            if isinstance(value, UploadFile):
                await value.close()

close() async

Close all UploadFile instances in this form data.

Source code in FasterAPI/datastructures.py
async def close(self) -> None:
    """Close all UploadFile instances in this form data."""
    for value in self.values():
        if isinstance(value, UploadFile):
            await value.close()

GZipMiddleware

Bases: BaseHTTPMiddleware

Middleware that compresses responses using gzip when the client supports it.

Source code in FasterAPI/middleware.py
class GZipMiddleware(BaseHTTPMiddleware):
    """Middleware that compresses responses using gzip when the client supports it."""

    def __init__(self, app: ASGIApp, *, minimum_size: int = 1000) -> None:
        super().__init__(app)
        self.minimum_size = minimum_size

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        """Compress the response body with gzip if it exceeds the minimum size."""
        headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
        accept_encoding = ""
        for k, v in headers_raw:
            if k.decode("latin-1").lower() == "accept-encoding":
                accept_encoding = v.decode("latin-1")
                break

        if "gzip" not in accept_encoding.lower():
            await self.app(scope, receive, send)
            return

        # Collect response to potentially compress
        initial_message: dict[str, Any] | None = None
        body_parts: list[bytes] = []

        async def buffered_send(message: dict[str, Any]) -> None:
            nonlocal initial_message
            if message["type"] == "http.response.start":
                initial_message = message
            elif message["type"] == "http.response.body":
                body_parts.append(message.get("body", b""))
                if not message.get("more_body", False):
                    # All body collected — decide whether to compress
                    full_body = b"".join(body_parts)
                    if len(full_body) >= self.minimum_size and initial_message is not None:
                        compressed = gzip.compress(full_body)
                        headers = list(initial_message.get("headers", []))
                        headers.append((b"content-encoding", b"gzip"))
                        headers.append((b"vary", b"Accept-Encoding"))
                        # Update content-length
                        headers = [(k, v) for k, v in headers if k.lower() != b"content-length"]
                        headers.append((b"content-length", str(len(compressed)).encode()))
                        await send({**initial_message, "headers": headers})
                        await send({"type": "http.response.body", "body": compressed})
                    else:
                        if initial_message is not None:
                            await send(initial_message)
                        await send({"type": "http.response.body", "body": full_body})

        await self.app(scope, receive, buffered_send)

dispatch(scope, receive, send) async

Compress the response body with gzip if it exceeds the minimum size.

Source code in FasterAPI/middleware.py
async def dispatch(
    self,
    scope: dict[str, Any],
    receive: ASGIApp,
    send: ASGIApp,
) -> None:
    """Compress the response body with gzip if it exceeds the minimum size."""
    headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
    accept_encoding = ""
    for k, v in headers_raw:
        if k.decode("latin-1").lower() == "accept-encoding":
            accept_encoding = v.decode("latin-1")
            break

    if "gzip" not in accept_encoding.lower():
        await self.app(scope, receive, send)
        return

    # Collect response to potentially compress
    initial_message: dict[str, Any] | None = None
    body_parts: list[bytes] = []

    async def buffered_send(message: dict[str, Any]) -> None:
        nonlocal initial_message
        if message["type"] == "http.response.start":
            initial_message = message
        elif message["type"] == "http.response.body":
            body_parts.append(message.get("body", b""))
            if not message.get("more_body", False):
                # All body collected — decide whether to compress
                full_body = b"".join(body_parts)
                if len(full_body) >= self.minimum_size and initial_message is not None:
                    compressed = gzip.compress(full_body)
                    headers = list(initial_message.get("headers", []))
                    headers.append((b"content-encoding", b"gzip"))
                    headers.append((b"vary", b"Accept-Encoding"))
                    # Update content-length
                    headers = [(k, v) for k, v in headers if k.lower() != b"content-length"]
                    headers.append((b"content-length", str(len(compressed)).encode()))
                    await send({**initial_message, "headers": headers})
                    await send({"type": "http.response.body", "body": compressed})
                else:
                    if initial_message is not None:
                        await send(initial_message)
                    await send({"type": "http.response.body", "body": full_body})

    await self.app(scope, receive, buffered_send)

HTMLResponse

Bases: Response

Response with HTML content type.

Source code in FasterAPI/response.py
class HTMLResponse(Response):
    """Response with HTML content type."""

    media_type = "text/html"

    def __init__(
        self,
        content: Any = None,
        status_code: int = 200,
        headers: dict[str, str] | None = None,
    ) -> None:
        super().__init__(content, status_code, headers)

HTTPException

Bases: Exception

An HTTP exception that results in an error response with the given status code.

Source code in FasterAPI/exceptions.py
class HTTPException(Exception):
    """An HTTP exception that results in an error response with the given status code."""

    def __init__(
        self,
        status_code: int = 500,
        detail: Any = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        self.status_code = status_code
        self.detail = detail
        self.headers = headers

    def __repr__(self) -> str:
        return f"HTTPException(status_code={self.status_code}, detail={self.detail!r})"

HTTPSRedirectMiddleware

Bases: BaseHTTPMiddleware

Middleware that redirects all HTTP requests to HTTPS.

Source code in FasterAPI/middleware.py
class HTTPSRedirectMiddleware(BaseHTTPMiddleware):
    """Middleware that redirects all HTTP requests to HTTPS."""

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        if scope.get("scheme", "http") == "https":
            await self.app(scope, receive, send)
            return

        # Build redirect URL
        headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
        host = "localhost"
        for k, v in headers_raw:
            if k.decode("latin-1").lower() == "host":
                host = v.decode("latin-1")
                break

        path = scope.get("path", "/")
        qs = scope.get("query_string", b"")
        url = f"https://{host}{path}"
        if qs:
            url += f"?{qs.decode('latin-1')}"

        await send(
            {
                "type": "http.response.start",
                "status": 301,
                "headers": [
                    (b"location", url.encode("latin-1")),
                    (b"content-type", b"text/plain"),
                ],
            }
        )
        await send({"type": "http.response.body", "body": b"Redirecting to HTTPS"})

Header

Declare a header parameter with optional default and alias.

Source code in FasterAPI/params.py
class Header:
    """Declare a header parameter with optional default and alias."""

    __slots__ = ("default", "alias", "convert_underscores")

    def __init__(
        self,
        default: Any = None,
        *,
        alias: str | None = None,
        convert_underscores: bool = True,
    ) -> None:
        self.default = default
        self.alias = alias
        self.convert_underscores = convert_underscores

    def __repr__(self) -> str:
        return f"Header(default={self.default!r})"

JSONResponse

Bases: Response

Response that serializes content as JSON using msgspec.

Source code in FasterAPI/response.py
class JSONResponse(Response):
    """Response that serializes content as JSON using msgspec."""

    media_type = "application/json"

    def _render(self, content: Any) -> bytes:
        return msgspec.json.encode(content)

Path

Declare a path parameter with optional default and description.

Source code in FasterAPI/params.py
class Path:
    """Declare a path parameter with optional default and description."""

    __slots__ = ("default", "description")

    def __init__(self, default: Any = _MISSING, *, description: str = "") -> None:
        self.default = default
        self.description = description

    def __repr__(self) -> str:
        if self.default is _MISSING:
            return "Path()"
        return f"Path(default={self.default!r})"

PlainTextResponse

Bases: Response

Response with plain text content type.

Source code in FasterAPI/response.py
class PlainTextResponse(Response):
    """Response with plain text content type."""

    media_type = "text/plain"

    def __init__(
        self,
        content: Any = None,
        status_code: int = 200,
        headers: dict[str, str] | None = None,
    ) -> None:
        super().__init__(content, status_code, headers)

Query

Declare a query parameter with optional default, description, and alias.

Source code in FasterAPI/params.py
class Query:
    """Declare a query parameter with optional default, description, and alias."""

    __slots__ = ("default", "description", "alias")

    def __init__(
        self,
        default: Any = None,
        *,
        description: str = "",
        alias: str | None = None,
    ) -> None:
        self.default = default
        self.description = description
        self.alias = alias

    def __repr__(self) -> str:
        return f"Query(default={self.default!r})"

RadixRouter

O(k) URL router using a compressed radix tree (k = path segments).

Source code in FasterAPI/router.py
class RadixRouter:
    """O(k) URL router using a compressed radix tree (k = path segments)."""

    __slots__ = ("root",)

    def __init__(self) -> None:
        self.root = RadixNode()

    # ------------------------------------------------------------------
    #  Route registration (called at startup — speed is less critical)
    # ------------------------------------------------------------------

    def add_route(
        self,
        method: str,
        path: str,
        handler: ASGIApp,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Register a handler for the given HTTP method and path pattern."""
        node = self.root
        for segment in _split(path):
            if segment[0] == "{" and segment[-1] == "}":
                param_name = segment[1:-1]
                child = node.children.get("*")
                if child is None:
                    child = RadixNode()
                    child.is_param = True
                    child.param_name = param_name
                    node.children["*"] = child
                node = child
            else:
                child = node.children.get(segment)
                if child is None:
                    child = RadixNode()
                    node.children[segment] = child
                node = child

        node.handlers[method.upper()] = (handler, metadata or {})

    # ------------------------------------------------------------------
    #  Route resolution — HOT PATH, called on every request
    # ------------------------------------------------------------------

    def resolve(
        self,
        method: str,
        path: str,
    ) -> tuple[ASGIApp, dict[str, str], dict[str, Any]] | None:
        """Resolve a path to (handler, path_params, metadata) or None."""
        segments = _split(path)
        params: dict[str, str] = {}

        node = self._walk(self.root, segments, 0, params)
        if node is None:
            return None

        entry = node.handlers.get(method.upper())
        if entry is None:
            return None

        return entry[0], params, entry[1]

    def _walk(
        self,
        node: RadixNode,
        segments: list[str],
        idx: int,
        params: dict[str, str],
    ) -> RadixNode | None:
        """Iterative-first tree walk with recursive fallback for param backtracking."""
        n = len(segments)
        # Fast iterative path for the common case (no backtracking needed)
        while idx < n:
            seg = segments[idx]
            child = node.children.get(seg)
            if child is not None:
                node = child
                idx += 1
                continue
            # Try param child
            param_child = node.children.get("*")
            if param_child is not None:
                assert param_child.param_name is not None
                params[param_child.param_name] = seg
                node = param_child
                idx += 1
                continue
            return None

        return node if node.handlers else None

add_route(method, path, handler, metadata=None)

Register a handler for the given HTTP method and path pattern.

Source code in FasterAPI/router.py
def add_route(
    self,
    method: str,
    path: str,
    handler: ASGIApp,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Register a handler for the given HTTP method and path pattern."""
    node = self.root
    for segment in _split(path):
        if segment[0] == "{" and segment[-1] == "}":
            param_name = segment[1:-1]
            child = node.children.get("*")
            if child is None:
                child = RadixNode()
                child.is_param = True
                child.param_name = param_name
                node.children["*"] = child
            node = child
        else:
            child = node.children.get(segment)
            if child is None:
                child = RadixNode()
                node.children[segment] = child
            node = child

    node.handlers[method.upper()] = (handler, metadata or {})

resolve(method, path)

Resolve a path to (handler, path_params, metadata) or None.

Source code in FasterAPI/router.py
def resolve(
    self,
    method: str,
    path: str,
) -> tuple[ASGIApp, dict[str, str], dict[str, Any]] | None:
    """Resolve a path to (handler, path_params, metadata) or None."""
    segments = _split(path)
    params: dict[str, str] = {}

    node = self._walk(self.root, segments, 0, params)
    if node is None:
        return None

    entry = node.handlers.get(method.upper())
    if entry is None:
        return None

    return entry[0], params, entry[1]

RedirectResponse

Bases: Response

Response that redirects to a different URL.

Source code in FasterAPI/response.py
class RedirectResponse(Response):
    """Response that redirects to a different URL."""

    def __init__(
        self,
        url: str,
        status_code: int = 307,
        headers: dict[str, str] | None = None,
    ) -> None:
        super().__init__(b"", status_code, headers)
        self.headers["location"] = url

    def _render(self, content: Any) -> bytes:
        if isinstance(content, bytes):
            return content
        return b""

Request

Represents an incoming HTTP request with lazy attribute parsing.

Source code in FasterAPI/request.py
class Request:
    """Represents an incoming HTTP request with lazy attribute parsing."""

    __slots__ = (
        "_scope",
        "_receive",
        "_body",
        "_body_read",
        "_form_cache",
        "_headers",
        "_query_params",
        "_cookies",
        "method",
        "path",
        "path_params",
    )

    def __init__(self, scope: dict[str, Any], receive: ASGIApp) -> None:
        self._scope = scope
        self._receive = receive
        self._body: bytes = b""
        self._body_read: bool = False
        self._form_cache: FormData | None = None
        self._headers: dict[str, str] | None = None
        self._query_params: dict[str, Any] | None = None
        self._cookies: dict[str, str] | None = None

        self.method: str = scope.get("method", "GET")
        self.path: str = scope.get("path", "/")
        self.path_params: dict[str, str] = scope.get("path_params", {})

    # ------------------------------------------------------------------
    #  Lazy-parsed properties (only computed when first accessed)
    # ------------------------------------------------------------------

    @property
    def headers(self) -> dict[str, str]:
        h = self._headers
        if h is None:
            raw: list[tuple[bytes, bytes]] = self._scope.get("headers", [])
            h = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in raw}
            self._headers = h
        return h

    @property
    def query_params(self) -> dict[str, Any]:
        qp = self._query_params
        if qp is None:
            qs = self._scope.get("query_string", b"").decode("latin-1")
            if qs:
                parsed = parse_qs(qs, keep_blank_values=True)
                qp = {k: v[0] if len(v) == 1 else v for k, v in parsed.items()}
            else:
                qp = {}
            self._query_params = qp
        return qp

    @property
    def cookies(self) -> dict[str, str]:
        c = self._cookies
        if c is None:
            cookie_header = self.headers.get("cookie", "")
            if cookie_header:
                sc = SimpleCookie()
                sc.load(cookie_header)
                c = {key: morsel.value for key, morsel in sc.items()}
            else:
                c = {}
            self._cookies = c
        return c

    @property
    def content_type(self) -> str | None:
        return self.headers.get("content-type")

    @property
    def client(self) -> tuple[str, int] | None:
        val = self._scope.get("client")
        return (val[0], val[1]) if val is not None else None

    @property
    def body(self) -> bytes:
        return self._body

    # ------------------------------------------------------------------
    #  Body reading
    # ------------------------------------------------------------------

    async def _read_body(self) -> bytes:
        if self._body_read:
            return self._body
        parts: list[bytes] = []
        while True:
            message = await self._receive()
            chunk = message.get("body", b"")
            if chunk:
                parts.append(chunk)
            if not message.get("more_body", False):
                break
        self._body = b"".join(parts) if parts else b""
        self._body_read = True
        return self._body

    async def json(self) -> Any:
        """Read the request body and decode as JSON via msgspec (zero-copy)."""
        raw = await self._read_body()
        return msgspec.json.decode(raw)

    async def form(self) -> FormData:
        """Read the request body and parse as form / multipart data."""
        if self._form_cache is not None:
            return self._form_cache
        raw = await self._read_body()
        ct = self.content_type or ""
        result = _parse_multipart(raw, ct) if "multipart/form-data" in ct else _parse_urlencoded(raw)
        self._form_cache = result
        return result

form() async

Read the request body and parse as form / multipart data.

Source code in FasterAPI/request.py
async def form(self) -> FormData:
    """Read the request body and parse as form / multipart data."""
    if self._form_cache is not None:
        return self._form_cache
    raw = await self._read_body()
    ct = self.content_type or ""
    result = _parse_multipart(raw, ct) if "multipart/form-data" in ct else _parse_urlencoded(raw)
    self._form_cache = result
    return result

json() async

Read the request body and decode as JSON via msgspec (zero-copy).

Source code in FasterAPI/request.py
async def json(self) -> Any:
    """Read the request body and decode as JSON via msgspec (zero-copy)."""
    raw = await self._read_body()
    return msgspec.json.decode(raw)

RequestValidationError

Bases: Exception

Raised when request data fails validation.

Source code in FasterAPI/exceptions.py
class RequestValidationError(Exception):
    """Raised when request data fails validation."""

    def __init__(self, errors: list[dict[str, Any]]) -> None:
        self.errors = errors

    def __repr__(self) -> str:
        return f"RequestValidationError(errors={self.errors!r})"

Response

Base HTTP response class.

Source code in FasterAPI/response.py
class Response:
    """Base HTTP response class."""

    media_type: str | None = None
    charset: str = "utf-8"

    def __init__(
        self,
        content: Any = None,
        status_code: int = 200,
        headers: dict[str, str] | None = None,
        media_type: str | None = None,
    ) -> None:
        self.status_code = status_code
        self.headers = headers or {}
        if media_type is not None:
            self.media_type = media_type
        self.body = self._render(content)

    def _render(self, content: Any) -> bytes:
        if content is None:
            return b""
        if isinstance(content, bytes):
            return content
        return bytes(content.encode(self.charset))

    def _build_headers(self) -> list[tuple[bytes, bytes]]:
        raw: list[tuple[bytes, bytes]] = []
        if self.media_type is not None:
            ct = self.media_type
            if self.charset and "text" in ct:
                ct = f"{ct}; charset={self.charset}"
            raw.append((b"content-type", ct.encode("latin-1")))
        for key, value in self.headers.items():
            raw.append((key.lower().encode("latin-1"), value.encode("latin-1")))
        return raw

    async def to_asgi(self, send: ASGIApp) -> None:
        """Send the response through the ASGI interface."""
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self._build_headers(),
            }
        )
        await send(
            {
                "type": "http.response.body",
                "body": self.body,
            }
        )

to_asgi(send) async

Send the response through the ASGI interface.

Source code in FasterAPI/response.py
async def to_asgi(self, send: ASGIApp) -> None:
    """Send the response through the ASGI interface."""
    await send(
        {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self._build_headers(),
        }
    )
    await send(
        {
            "type": "http.response.body",
            "body": self.body,
        }
    )

StreamingResponse

Response that streams content from an async or sync iterator.

Source code in FasterAPI/response.py
class StreamingResponse:
    """Response that streams content from an async or sync iterator."""

    def __init__(
        self,
        content: AsyncIterator[bytes] | Iterator[bytes],
        status_code: int = 200,
        headers: dict[str, str] | None = None,
        media_type: str | None = None,
    ) -> None:
        self.content = content
        self.status_code = status_code
        self.headers = headers or {}
        self.media_type = media_type

    def _build_headers(self) -> list[tuple[bytes, bytes]]:
        raw: list[tuple[bytes, bytes]] = []
        if self.media_type is not None:
            raw.append((b"content-type", self.media_type.encode("latin-1")))
        for key, value in self.headers.items():
            raw.append((key.lower().encode("latin-1"), value.encode("latin-1")))
        return raw

    async def to_asgi(self, send: ASGIApp) -> None:
        """Stream the response body through the ASGI interface."""
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self._build_headers(),
            }
        )
        if hasattr(self.content, "__aiter__"):
            async for chunk in self.content:
                await send(
                    {
                        "type": "http.response.body",
                        "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                        "more_body": True,
                    }
                )
        else:
            for chunk in self.content:
                await send(
                    {
                        "type": "http.response.body",
                        "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                        "more_body": True,
                    }
                )
        await send({"type": "http.response.body", "body": b"", "more_body": False})

to_asgi(send) async

Stream the response body through the ASGI interface.

Source code in FasterAPI/response.py
async def to_asgi(self, send: ASGIApp) -> None:
    """Stream the response body through the ASGI interface."""
    await send(
        {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self._build_headers(),
        }
    )
    if hasattr(self.content, "__aiter__"):
        async for chunk in self.content:
            await send(
                {
                    "type": "http.response.body",
                    "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                    "more_body": True,
                }
            )
    else:
        for chunk in self.content:
            await send(
                {
                    "type": "http.response.body",
                    "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                    "more_body": True,
                }
            )
    await send({"type": "http.response.body", "body": b"", "more_body": False})

SubInterpreterPool

Fallback pool using ProcessPoolExecutor.

Provides the same run() / shutdown() API as the sub-interpreter pool. Upgrade to a Python version with the interpreters module for true per-interpreter GIL support.

Source code in FasterAPI/concurrency.py
class SubInterpreterPool:  # type: ignore[no-redef]
    """Fallback pool using ProcessPoolExecutor.

    Provides the same ``run()`` / ``shutdown()`` API as the
    sub-interpreter pool. Upgrade to a Python version with the
    ``interpreters`` module for true per-interpreter GIL support.
    """

    def __init__(self, max_workers: int | None = None) -> None:
        self._executor = ProcessPoolExecutor(
            max_workers=max_workers or _CPU_COUNT,
        )

    async def run(self, func: Callable, *args: Any) -> Any:  # type: ignore[type-arg]
        """Execute *func* in a worker process (pickle-based)."""
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            partial(func, *args),
        )

    def shutdown(self) -> None:
        """Shut down the process pool."""
        self._executor.shutdown(wait=False)

run(func, *args) async

Execute func in a worker process (pickle-based).

Source code in FasterAPI/concurrency.py
async def run(self, func: Callable, *args: Any) -> Any:  # type: ignore[type-arg]
    """Execute *func* in a worker process (pickle-based)."""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        partial(func, *args),
    )

shutdown()

Shut down the process pool.

Source code in FasterAPI/concurrency.py
def shutdown(self) -> None:
    """Shut down the process pool."""
    self._executor.shutdown(wait=False)

TestClient

Synchronous test client for FasterAPI applications.

Wraps ASGI app with httpx.ASGITransport for HTTP testing. Provides websocket_connect() for WebSocket testing.

Source code in FasterAPI/testclient.py
class TestClient:
    """Synchronous test client for FasterAPI applications.

    Wraps ASGI app with httpx.ASGITransport for HTTP testing.
    Provides websocket_connect() for WebSocket testing.
    """

    __test__ = False  # Prevent pytest collection

    def __init__(
        self,
        app: ASGIApp,
        base_url: str = "http://testserver",
    ) -> None:
        self.app = app
        self.base_url = base_url
        self._transport = httpx.ASGITransport(app=app)
        self._client = httpx.AsyncClient(
            transport=self._transport,
            base_url=base_url,
        )

    def __enter__(self) -> TestClient:
        return self

    def __exit__(self, *args: Any) -> None:
        self._run(self._client.aclose())

    def _run(self, coro: Any) -> Any:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = None

        if loop and loop.is_running():
            import concurrent.futures

            with concurrent.futures.ThreadPoolExecutor() as pool:
                return pool.submit(asyncio.run, coro).result()
        return asyncio.run(coro)

    # --- HTTP methods ---

    def get(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a GET request."""
        result: httpx.Response = self._run(self._client.get(url, **kwargs))
        return result

    def post(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a POST request."""
        result: httpx.Response = self._run(self._client.post(url, **kwargs))
        return result

    def put(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a PUT request."""
        result: httpx.Response = self._run(self._client.put(url, **kwargs))
        return result

    def delete(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a DELETE request."""
        result: httpx.Response = self._run(self._client.delete(url, **kwargs))
        return result

    def patch(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a PATCH request."""
        result: httpx.Response = self._run(self._client.patch(url, **kwargs))
        return result

    def options(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send an OPTIONS request."""
        result: httpx.Response = self._run(self._client.options(url, **kwargs))
        return result

    def head(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a HEAD request."""
        result: httpx.Response = self._run(self._client.head(url, **kwargs))
        return result

    # --- WebSocket ---

    @contextmanager
    def websocket_connect(
        self,
        path: str,
        headers: dict[str, str] | None = None,
        query_string: str = "",
    ) -> Generator[_WebSocketSession, None, None]:
        """Context manager for testing WebSocket endpoints."""
        session = _WebSocketSession()

        scope = {
            "type": "websocket",
            "path": path,
            "headers": [(k.lower().encode(), v.encode()) for k, v in (headers or {}).items()],
            "query_string": query_string.encode() if query_string else b"",
            "client": ("testclient", 0),
        }

        async def run_ws() -> None:
            await self.app(scope, session._asgi_receive, session._asgi_send)

        loop = asyncio.new_event_loop()
        task = loop.create_task(run_ws())

        # Drive the loop briefly to let the handler start and accept
        def _step() -> None:
            loop.run_until_complete(asyncio.sleep(0))

        _step()

        try:
            yield session
        finally:
            if not session._closed:
                session.close()
            # Drain remaining events
            for _ in range(100):
                try:
                    loop.run_until_complete(asyncio.wait_for(asyncio.shield(task), timeout=0.05))
                    break
                except (asyncio.TimeoutError, Exception):
                    break
            loop.close()

delete(url, **kwargs)

Send a DELETE request.

Source code in FasterAPI/testclient.py
def delete(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a DELETE request."""
    result: httpx.Response = self._run(self._client.delete(url, **kwargs))
    return result

get(url, **kwargs)

Send a GET request.

Source code in FasterAPI/testclient.py
def get(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a GET request."""
    result: httpx.Response = self._run(self._client.get(url, **kwargs))
    return result

head(url, **kwargs)

Send a HEAD request.

Source code in FasterAPI/testclient.py
def head(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a HEAD request."""
    result: httpx.Response = self._run(self._client.head(url, **kwargs))
    return result

options(url, **kwargs)

Send an OPTIONS request.

Source code in FasterAPI/testclient.py
def options(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send an OPTIONS request."""
    result: httpx.Response = self._run(self._client.options(url, **kwargs))
    return result

patch(url, **kwargs)

Send a PATCH request.

Source code in FasterAPI/testclient.py
def patch(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a PATCH request."""
    result: httpx.Response = self._run(self._client.patch(url, **kwargs))
    return result

post(url, **kwargs)

Send a POST request.

Source code in FasterAPI/testclient.py
def post(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a POST request."""
    result: httpx.Response = self._run(self._client.post(url, **kwargs))
    return result

put(url, **kwargs)

Send a PUT request.

Source code in FasterAPI/testclient.py
def put(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a PUT request."""
    result: httpx.Response = self._run(self._client.put(url, **kwargs))
    return result

websocket_connect(path, headers=None, query_string='')

Context manager for testing WebSocket endpoints.

Source code in FasterAPI/testclient.py
@contextmanager
def websocket_connect(
    self,
    path: str,
    headers: dict[str, str] | None = None,
    query_string: str = "",
) -> Generator[_WebSocketSession, None, None]:
    """Context manager for testing WebSocket endpoints."""
    session = _WebSocketSession()

    scope = {
        "type": "websocket",
        "path": path,
        "headers": [(k.lower().encode(), v.encode()) for k, v in (headers or {}).items()],
        "query_string": query_string.encode() if query_string else b"",
        "client": ("testclient", 0),
    }

    async def run_ws() -> None:
        await self.app(scope, session._asgi_receive, session._asgi_send)

    loop = asyncio.new_event_loop()
    task = loop.create_task(run_ws())

    # Drive the loop briefly to let the handler start and accept
    def _step() -> None:
        loop.run_until_complete(asyncio.sleep(0))

    _step()

    try:
        yield session
    finally:
        if not session._closed:
            session.close()
        # Drain remaining events
        for _ in range(100):
            try:
                loop.run_until_complete(asyncio.wait_for(asyncio.shield(task), timeout=0.05))
                break
            except (asyncio.TimeoutError, Exception):
                break
        loop.close()

TrustedHostMiddleware

Bases: BaseHTTPMiddleware

Middleware that validates the Host header against a list of allowed hosts.

Source code in FasterAPI/middleware.py
class TrustedHostMiddleware(BaseHTTPMiddleware):
    """Middleware that validates the Host header against a list of allowed hosts."""

    def __init__(
        self,
        app: ASGIApp,
        *,
        allowed_hosts: Sequence[str] = ("*",),
    ) -> None:
        super().__init__(app)
        self.allowed_hosts = set(allowed_hosts)
        self.allow_all = "*" in self.allowed_hosts

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        if self.allow_all:
            await self.app(scope, receive, send)
            return

        headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
        host = ""
        for k, v in headers_raw:
            if k.decode("latin-1").lower() == "host":
                host = v.decode("latin-1").split(":")[0]
                break

        if host not in self.allowed_hosts:
            await send(
                {
                    "type": "http.response.start",
                    "status": 400,
                    "headers": [(b"content-type", b"text/plain")],
                }
            )
            await send(
                {
                    "type": "http.response.body",
                    "body": b"Invalid host header",
                }
            )
            return

        await self.app(scope, receive, send)

UploadFile

Represents an uploaded file from a multipart/form-data request.

Uses SpooledTemporaryFile: data stays in memory up to 1 MB, then spills to disk.

Source code in FasterAPI/datastructures.py
class UploadFile:
    """Represents an uploaded file from a multipart/form-data request.

    Uses SpooledTemporaryFile: data stays in memory up to 1 MB, then spills to disk.
    """

    __slots__ = ("filename", "content_type", "file", "headers", "_size")

    def __init__(
        self,
        filename: str,
        content_type: str = "application/octet-stream",
        file: IO[bytes] | None = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        self.filename = filename
        self.content_type = content_type
        self.file: IO[bytes] = file or tempfile.SpooledTemporaryFile(
            max_size=1024 * 1024,  # 1 MB
        )
        self.headers = headers or {}
        self._size: int | None = None

    async def read(self, size: int = -1) -> bytes:
        """Read up to ``size`` bytes from the file (-1 means read all)."""
        return self.file.read(size)

    async def write(self, data: bytes) -> int:
        """Write data to the file and return the number of bytes written."""
        return self.file.write(data)

    async def seek(self, offset: int) -> None:
        """Seek to the given byte offset in the file."""
        self.file.seek(offset)

    async def close(self) -> None:
        """Close the underlying file."""
        self.file.close()

    @property
    def size(self) -> int | None:
        """Return the file size in bytes, or None if unknown."""
        return self._size

    def __repr__(self) -> str:
        return f"UploadFile(filename={self.filename!r}, content_type={self.content_type!r})"

size property

Return the file size in bytes, or None if unknown.

close() async

Close the underlying file.

Source code in FasterAPI/datastructures.py
async def close(self) -> None:
    """Close the underlying file."""
    self.file.close()

read(size=-1) async

Read up to size bytes from the file (-1 means read all).

Source code in FasterAPI/datastructures.py
async def read(self, size: int = -1) -> bytes:
    """Read up to ``size`` bytes from the file (-1 means read all)."""
    return self.file.read(size)

seek(offset) async

Seek to the given byte offset in the file.

Source code in FasterAPI/datastructures.py
async def seek(self, offset: int) -> None:
    """Seek to the given byte offset in the file."""
    self.file.seek(offset)

write(data) async

Write data to the file and return the number of bytes written.

Source code in FasterAPI/datastructures.py
async def write(self, data: bytes) -> int:
    """Write data to the file and return the number of bytes written."""
    return self.file.write(data)

WebSocket

Represents a WebSocket connection.

Source code in FasterAPI/websocket.py
class WebSocket:
    """Represents a WebSocket connection."""

    __slots__ = (
        "_scope",
        "_receive",
        "_send",
        "path",
        "path_params",
        "client",
        "headers",
        "query_params",
        "_state",
    )

    def __init__(self, scope: dict[str, Any], receive: ASGIApp, send: ASGIApp) -> None:
        self._scope = scope
        self._receive = receive
        self._send = send
        self._state = WebSocketState.CONNECTING
        self.path: str = scope.get("path", "/")
        self.path_params: dict[str, str] = scope.get("path_params", {})
        self.client: tuple[str, int] | None = scope.get("client")

        raw_headers: list[tuple[bytes, bytes]] = scope.get("headers", [])
        self.headers: dict[str, str] = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in raw_headers}

        qs = scope.get("query_string", b"")
        parsed = parse_qs(qs.decode("latin-1") if isinstance(qs, bytes) else qs)
        self.query_params: dict[str, Any] = {k: v[0] if len(v) == 1 else v for k, v in parsed.items()}

    async def accept(self, subprotocol: str | None = None) -> None:
        """Accept the WebSocket connection, optionally selecting a subprotocol."""
        if self._state != WebSocketState.CONNECTING:
            raise RuntimeError("WebSocket is not in CONNECTING state")
        msg: dict[str, Any] = {"type": "websocket.accept"}
        if subprotocol is not None:
            msg["subprotocol"] = subprotocol
        await self._send(msg)
        self._state = WebSocketState.CONNECTED

    async def _receive_message(self) -> dict[str, Any]:
        message: dict[str, Any] = await self._receive()
        if message["type"] == "websocket.disconnect":
            self._state = WebSocketState.DISCONNECTED
            raise WebSocketDisconnect(message.get("code", 1000))
        return message

    async def receive_text(self) -> str:
        """Receive a text message from the WebSocket."""
        message = await self._receive_message()
        return str(message.get("text", ""))

    async def receive_bytes(self) -> bytes:
        """Receive a binary message from the WebSocket."""
        message = await self._receive_message()
        result = message.get("bytes", b"")
        return bytes(result) if not isinstance(result, bytes) else result

    async def receive_json(self) -> Any:
        """Receive a message from the WebSocket and parse it as JSON."""
        text = await self.receive_text()
        return msgspec.json.decode(text.encode())

    async def send_text(self, data: str) -> None:
        """Send a text message through the WebSocket."""
        await self._send({"type": "websocket.send", "text": data})

    async def send_bytes(self, data: bytes) -> None:
        """Send a binary message through the WebSocket."""
        await self._send({"type": "websocket.send", "bytes": data})

    async def send_json(self, data: Any) -> None:
        """Send data as a JSON-encoded text message through the WebSocket."""
        encoded = msgspec.json.encode(data)
        await self.send_text(encoded.decode())

    async def close(self, code: int = 1000, reason: str = "") -> None:
        """Close the WebSocket connection."""
        await self._send({"type": "websocket.close", "code": code, "reason": reason})
        self._state = WebSocketState.DISCONNECTED

accept(subprotocol=None) async

Accept the WebSocket connection, optionally selecting a subprotocol.

Source code in FasterAPI/websocket.py
async def accept(self, subprotocol: str | None = None) -> None:
    """Accept the WebSocket connection, optionally selecting a subprotocol."""
    if self._state != WebSocketState.CONNECTING:
        raise RuntimeError("WebSocket is not in CONNECTING state")
    msg: dict[str, Any] = {"type": "websocket.accept"}
    if subprotocol is not None:
        msg["subprotocol"] = subprotocol
    await self._send(msg)
    self._state = WebSocketState.CONNECTED

close(code=1000, reason='') async

Close the WebSocket connection.

Source code in FasterAPI/websocket.py
async def close(self, code: int = 1000, reason: str = "") -> None:
    """Close the WebSocket connection."""
    await self._send({"type": "websocket.close", "code": code, "reason": reason})
    self._state = WebSocketState.DISCONNECTED

receive_bytes() async

Receive a binary message from the WebSocket.

Source code in FasterAPI/websocket.py
async def receive_bytes(self) -> bytes:
    """Receive a binary message from the WebSocket."""
    message = await self._receive_message()
    result = message.get("bytes", b"")
    return bytes(result) if not isinstance(result, bytes) else result

receive_json() async

Receive a message from the WebSocket and parse it as JSON.

Source code in FasterAPI/websocket.py
async def receive_json(self) -> Any:
    """Receive a message from the WebSocket and parse it as JSON."""
    text = await self.receive_text()
    return msgspec.json.decode(text.encode())

receive_text() async

Receive a text message from the WebSocket.

Source code in FasterAPI/websocket.py
async def receive_text(self) -> str:
    """Receive a text message from the WebSocket."""
    message = await self._receive_message()
    return str(message.get("text", ""))

send_bytes(data) async

Send a binary message through the WebSocket.

Source code in FasterAPI/websocket.py
async def send_bytes(self, data: bytes) -> None:
    """Send a binary message through the WebSocket."""
    await self._send({"type": "websocket.send", "bytes": data})

send_json(data) async

Send data as a JSON-encoded text message through the WebSocket.

Source code in FasterAPI/websocket.py
async def send_json(self, data: Any) -> None:
    """Send data as a JSON-encoded text message through the WebSocket."""
    encoded = msgspec.json.encode(data)
    await self.send_text(encoded.decode())

send_text(data) async

Send a text message through the WebSocket.

Source code in FasterAPI/websocket.py
async def send_text(self, data: str) -> None:
    """Send a text message through the WebSocket."""
    await self._send({"type": "websocket.send", "text": data})

WebSocketDisconnect

Bases: Exception

Raised when a WebSocket connection is disconnected.

Source code in FasterAPI/websocket.py
class WebSocketDisconnect(Exception):
    """Raised when a WebSocket connection is disconnected."""

    def __init__(self, code: int = 1000) -> None:
        self.code = code

WebSocketState

Enumeration of WebSocket connection states.

Source code in FasterAPI/websocket.py
class WebSocketState:
    """Enumeration of WebSocket connection states."""

    CONNECTING = 0
    CONNECTED = 1
    DISCONNECTED = 2

run_in_subinterpreter(func, *args) async

Execute func with maximum available parallelism.

Python 3.13+: Runs in a sub-interpreter with its own GIL. True parallel execution, no pickling, ~100x lighter than a process.

Python 3.11–3.12: Falls back to ProcessPoolExecutor. Arguments must be picklable. Still achieves parallelism via multiprocessing.

Python < 3.11: Same as 3.11 fallback with older asyncio internals.

Usage::

result = await run_in_subinterpreter(heavy_computation, n)
Source code in FasterAPI/concurrency.py
async def run_in_subinterpreter(func: Callable, *args: Any) -> Any:  # type: ignore[type-arg]
    """Execute *func* with maximum available parallelism.

    **Python 3.13+**: Runs in a sub-interpreter with its own GIL.
    True parallel execution, no pickling, ~100x lighter than a process.

    **Python 3.11–3.12**: Falls back to ``ProcessPoolExecutor``.
    Arguments must be picklable. Still achieves parallelism via
    multiprocessing.

    **Python < 3.11**: Same as 3.11 fallback with older asyncio internals.

    Usage::

        result = await run_in_subinterpreter(heavy_computation, n)
    """
    pool = _get_subinterp_pool()
    return await pool.run(func, *args)