Skip to content

API Reference

This page documents every public class and function exported from FasterAPI. For the full list of re-exports, see FasterAPI/__init__.py.

Application

Faster

The main ASGI application class.

from FasterAPI import Faster

app = Faster(
    title="My API",           # shown in Swagger UI
    version="1.0.0",          # shown in Swagger UI
    description="...",        # Markdown description
    openapi_url="/openapi.json",  # set to None to disable
    docs_url="/docs",         # Swagger UI; None to disable
    redoc_url="/redoc",       # ReDoc; None to disable
)

Route decorators: @app.get, @app.post, @app.put, @app.delete, @app.patch, @app.websocket

Lifecycle: @app.on_startup, @app.on_shutdown

Middleware: app.add_middleware(MiddlewareClass, **kwargs)

Exception handlers: app.add_exception_handler(ExcClass, handler)

Router inclusion: app.include_router(router, prefix="", tags=())


FasterRouter

Groups related routes into a reusable router.

from FasterAPI import FasterRouter

router = FasterRouter()

@router.get("/")
async def list_items(): ...

app.include_router(router, prefix="/items", tags=["items"])

Request & Response

Request

Represents an incoming HTTP request.

from FasterAPI import Request

@app.get("/info")
async def info(request: Request):
    return {
        "method": request.method,
        "path": request.url.path,
        "client": request.client,
    }

Key attributes:

Attribute Type Description
method str HTTP verb
url URL Full URL
headers Headers Request headers (case-insensitive)
query_params QueryParams Parsed query string
cookies dict[str, str] Parsed cookies
client tuple[str, int] \| None Client IP and port
path_params dict[str, str] Matched path segments

Async methods: await request.body(), await request.json(), await request.form()


Response

Base HTTP response. All response classes accept content, status_code, headers, and optionally media_type.

from FasterAPI import Response

return Response(content=b"raw bytes", status_code=200, media_type="text/plain")

JSONResponse

Serialises content with msgspec.json.encode.

from FasterAPI import JSONResponse

return JSONResponse({"key": "value"}, status_code=200)

HTMLResponse

Sets Content-Type: text/html.

from FasterAPI import HTMLResponse

return HTMLResponse("<h1>Hello</h1>")

PlainTextResponse

Sets Content-Type: text/plain.

from FasterAPI import PlainTextResponse

return PlainTextResponse("OK")

RedirectResponse

Issues an HTTP redirect.

from FasterAPI import RedirectResponse

return RedirectResponse(url="/new-path", status_code=307)

StreamingResponse

Streams body from an async or sync iterator.

from FasterAPI import StreamingResponse

async def gen():
    yield b"chunk1"
    yield b"chunk2"

return StreamingResponse(gen(), media_type="text/plain")

FileResponse

Serves a file from disk with Content-Disposition: attachment.

from FasterAPI import FileResponse

return FileResponse("report.pdf", filename="report.pdf")

Parameters

Path

Marks a parameter as coming from the URL path. Usage: item_id: int = Path().

Query

Marks a parameter as coming from the query string.

from FasterAPI import Query

async def search(q: str | None = Query(default=None, alias="search")): ...

Marks a parameter as coming from a request header. Underscores in the parameter name are converted to hyphens by default (convert_underscores=True).

from FasterAPI import Header

async def handler(user_agent: str | None = Header(default=None)): ...
# reads "User-Agent" header

Marks a parameter as coming from a cookie.

from FasterAPI import Cookie

async def handler(session: str | None = Cookie(default=None)): ...

Body

Marks a parameter as coming from the raw JSON request body.

from FasterAPI import Body

async def handler(data: dict = Body()): ...

Form

Marks a parameter as coming from form data.

from FasterAPI import Form

async def login(username: str = Form(), password: str = Form()): ...

File

Marks a parameter as an uploaded file.

from FasterAPI import File, UploadFile

async def upload(file: UploadFile = File()): ...

Dependency Injection

Depends

Declares a dependency to be resolved before the route handler.

from FasterAPI import Depends

async def get_db(): ...

@app.get("/items")
async def handler(db = Depends(get_db)): ...

Parameters: - dependency — callable to resolve - use_cache=True — if True, calls dependency once per request


Exceptions

HTTPException

from FasterAPI import HTTPException

raise HTTPException(status_code=404, detail="Not found")
raise HTTPException(status_code=401, headers={"WWW-Authenticate": "Bearer"})

RequestValidationError

Raised automatically when a path/query/body parameter fails validation.

from FasterAPI.exceptions import RequestValidationError

app.add_exception_handler(RequestValidationError, my_handler)

Middleware

BaseHTTPMiddleware

Subclass to write custom middleware:

from FasterAPI import BaseHTTPMiddleware

class MyMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, scope, receive, send):
        # before
        await self.app(scope, receive, send)
        # after

CORSMiddleware

from FasterAPI import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
    allow_credentials=False,
    max_age=600,
)

GZipMiddleware

from FasterAPI import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000)

TrustedHostMiddleware

from FasterAPI import TrustedHostMiddleware

app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com"])

HTTPSRedirectMiddleware

from FasterAPI import HTTPSRedirectMiddleware

app.add_middleware(HTTPSRedirectMiddleware)

Background Tasks

BackgroundTasks

from FasterAPI import BackgroundTasks

@app.post("/items")
async def create(tasks: BackgroundTasks):
    tasks.add_task(send_email, "user@example.com")
    return {"queued": True}

BackgroundTask

Single task wrapper:

from FasterAPI import BackgroundTask

task = BackgroundTask(send_email, "user@example.com")

WebSocket

WebSocket

from FasterAPI import WebSocket

@app.websocket("/ws")
async def ws_handler(ws: WebSocket):
    await ws.accept()
    data = await ws.receive_text()
    await ws.send_text(f"Echo: {data}")

Methods: accept(), receive_text(), receive_bytes(), receive_json(), send_text(), send_bytes(), send_json(), close(code=1000)

WebSocketDisconnect

Exception raised when the client disconnects.

WebSocketState

Enum: CONNECTING, CONNECTED, DISCONNECTED


Data Structures

UploadFile

Represents an uploaded file:

Attribute / method Description
filename Original filename
content_type MIME type
await file.read() Read all bytes

FormData

Mapping-like object returned by await request.form().


Concurrency

SubInterpreterPool

CPU-parallel worker pool using Python 3.13 sub-interpreters (falls back to ProcessPoolExecutor on earlier versions).

from FasterAPI import SubInterpreterPool

pool = SubInterpreterPool(max_workers=4)

run_in_subinterpreter

Run a function in a sub-interpreter and return an asyncio.Future:

from FasterAPI import run_in_subinterpreter

result = await run_in_subinterpreter(heavy_function, arg1, arg2)

Auto-generated docs

FasterAPI

FasterAPI — A high-performance ASGI web framework.

Drop-in FastAPI replacement powered by msgspec (C extension JSON), radix-tree routing, uvloop, and Python 3.13 sub-interpreters.

APIKeyCookie

Bases: _APIKeyBase

API key extracted from a cookie.

api_key_cookie = APIKeyCookie(name="session")

@app.get("/secure") async def secure(key: str = Depends(api_key_cookie)): ...

Source code in FasterAPI/security.py
class APIKeyCookie(_APIKeyBase):
    """API key extracted from a cookie.

    api_key_cookie = APIKeyCookie(name="session")

    @app.get("/secure")
    async def secure(key: str = Depends(api_key_cookie)):
        ...
    """

    async def __call__(self, request: Request) -> str | None:
        key = request.cookies.get(self.name)
        if key is None:
            self._deny()
            return None
        return key

APIKeyHeader

Bases: _APIKeyBase

API key extracted from an HTTP request header.

api_key_header = APIKeyHeader(name="X-API-Key")

@app.get("/secure") async def secure(key: str = Depends(api_key_header)): ...

Source code in FasterAPI/security.py
class APIKeyHeader(_APIKeyBase):
    """API key extracted from an HTTP request header.

    api_key_header = APIKeyHeader(name="X-API-Key")

    @app.get("/secure")
    async def secure(key: str = Depends(api_key_header)):
        ...
    """

    async def __call__(self, request: Request) -> str | None:
        key = request.headers.get(self.name.lower())
        if key is None:
            self._deny()
            return None
        return key

APIKeyQuery

Bases: _APIKeyBase

API key extracted from a query parameter.

api_key_query = APIKeyQuery(name="api_key")

@app.get("/secure") async def secure(key: str = Depends(api_key_query)): ...

Source code in FasterAPI/security.py
class APIKeyQuery(_APIKeyBase):
    """API key extracted from a query parameter.

    api_key_query = APIKeyQuery(name="api_key")

    @app.get("/secure")
    async def secure(key: str = Depends(api_key_query)):
        ...
    """

    async def __call__(self, request: Request) -> str | None:
        raw = request.query_params.get(self.name)
        if raw is None:
            self._deny()
            return None
        key: str = str(raw)
        return key

BackgroundTask

A single background task to be executed after a response is sent.

Source code in FasterAPI/background.py
class BackgroundTask:
    """A single background task to be executed after a response is sent."""

    __slots__ = ("func", "args", "kwargs")

    def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs

    async def run(self) -> None:
        """Execute the background task."""
        if is_coroutine(self.func):
            await self.func(*self.args, **self.kwargs)
        else:
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, self._run_sync)

    def _run_sync(self) -> None:
        self.func(*self.args, **self.kwargs)

run() async

Execute the background task.

Source code in FasterAPI/background.py
async def run(self) -> None:
    """Execute the background task."""
    if is_coroutine(self.func):
        await self.func(*self.args, **self.kwargs)
    else:
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self._run_sync)

BackgroundTasks

A collection of background tasks to be executed after a response is sent.

Source code in FasterAPI/background.py
class BackgroundTasks:
    """A collection of background tasks to be executed after a response is sent."""

    def __init__(self) -> None:
        self._tasks: list[BackgroundTask] = []

    def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
        """Add a new background task to the collection."""
        self._tasks.append(BackgroundTask(func, *args, **kwargs))

    async def run(self) -> None:
        """Execute all background tasks sequentially."""
        for task in self._tasks:
            await task.run()

add_task(func, *args, **kwargs)

Add a new background task to the collection.

Source code in FasterAPI/background.py
def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
    """Add a new background task to the collection."""
    self._tasks.append(BackgroundTask(func, *args, **kwargs))

run() async

Execute all background tasks sequentially.

Source code in FasterAPI/background.py
async def run(self) -> None:
    """Execute all background tasks sequentially."""
    for task in self._tasks:
        await task.run()

BaseHTTPMiddleware

Base class for HTTP middleware that wraps an ASGI application.

Source code in FasterAPI/middleware.py
class BaseHTTPMiddleware:
    """Base class for HTTP middleware that wraps an ASGI application."""

    def __init__(self, app: ASGIApp) -> None:
        self.app = app

    async def __call__(self, scope: dict[str, Any], receive: ASGIApp, send: ASGIApp) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        await self.dispatch(scope, receive, send)

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        """Process the request. Override this method in subclasses."""

        async def call_next() -> None:
            await self.app(scope, receive, send)

        await call_next()

dispatch(scope, receive, send) async

Process the request. Override this method in subclasses.

Source code in FasterAPI/middleware.py
async def dispatch(
    self,
    scope: dict[str, Any],
    receive: ASGIApp,
    send: ASGIApp,
) -> None:
    """Process the request. Override this method in subclasses."""

    async def call_next() -> None:
        await self.app(scope, receive, send)

    await call_next()

Body

Declare a request body parameter with optional default and embed mode.

Source code in FasterAPI/params.py
class Body:
    """Declare a request body parameter with optional default and embed mode."""

    __slots__ = ("default", "description", "embed")

    def __init__(
        self,
        default: Any = _MISSING,
        *,
        description: str = "",
        embed: bool = False,
    ) -> None:
        self.default = default
        self.description = description
        self.embed = embed

    def __repr__(self) -> str:
        if self.default is _MISSING:
            return "Body()"
        return f"Body(default={self.default!r})"

CORSMiddleware

Bases: BaseHTTPMiddleware

Middleware that handles Cross-Origin Resource Sharing (CORS) headers.

Source code in FasterAPI/middleware.py
class CORSMiddleware(BaseHTTPMiddleware):
    """Middleware that handles Cross-Origin Resource Sharing (CORS) headers."""

    def __init__(
        self,
        app: ASGIApp,
        *,
        allow_origins: Sequence[str] = ("*",),
        allow_methods: Sequence[str] = ("*",),
        allow_headers: Sequence[str] = ("*",),
        allow_credentials: bool = False,
        max_age: int = 600,
        expose_headers: Sequence[str] = (),
    ) -> None:
        super().__init__(app)
        self.allow_origins = set(allow_origins)
        self.allow_methods = set(allow_methods)
        self.allow_headers = set(allow_headers)
        self.allow_credentials = allow_credentials
        self.max_age = max_age
        self.expose_headers = set(expose_headers)
        self.allow_all_origins = "*" in self.allow_origins
        self.allow_all_methods = "*" in self.allow_methods
        self.allow_all_headers = "*" in self.allow_headers

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        """Handle CORS preflight requests and inject CORS headers into responses."""
        headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
        request_headers = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in headers_raw}
        origin = request_headers.get("origin")
        method = scope.get("method", "GET")

        # Preflight
        if method == "OPTIONS" and "access-control-request-method" in request_headers:
            await self._preflight_response(send, origin, request_headers)
            return

        # Normal request — intercept send to inject CORS headers
        cors_headers = self._build_cors_headers(origin)

        async def send_with_cors(message: dict[str, Any]) -> None:
            if message["type"] == "http.response.start":
                existing = list(message.get("headers", []))
                existing.extend(cors_headers)
                message = {**message, "headers": existing}
            await send(message)

        await self.app(scope, receive, send_with_cors)

    def _origin_allowed(self, origin: str | None) -> bool:
        if origin is None:
            return False
        if self.allow_all_origins:
            return True
        return origin in self.allow_origins

    def _build_cors_headers(self, origin: str | None) -> list[tuple[bytes, bytes]]:
        if not self._origin_allowed(origin):
            return []

        headers: list[tuple[bytes, bytes]] = []
        if self.allow_all_origins and not self.allow_credentials:
            headers.append((b"access-control-allow-origin", b"*"))
        elif origin:
            headers.append((b"access-control-allow-origin", origin.encode("latin-1")))
            headers.append((b"vary", b"Origin"))

        if self.allow_credentials:
            headers.append((b"access-control-allow-credentials", b"true"))

        if self.expose_headers:
            headers.append(
                (
                    b"access-control-expose-headers",
                    ", ".join(self.expose_headers).encode("latin-1"),
                )
            )
        return headers

    async def _preflight_response(
        self,
        send: ASGIApp,
        origin: str | None,
        request_headers: dict[str, str],
    ) -> None:
        headers: list[tuple[bytes, bytes]] = []

        if self._origin_allowed(origin):
            if self.allow_all_origins and not self.allow_credentials:
                headers.append((b"access-control-allow-origin", b"*"))
            elif origin:
                headers.append((b"access-control-allow-origin", origin.encode("latin-1")))
                headers.append((b"vary", b"Origin"))

            # Methods
            if self.allow_all_methods:
                req_method = request_headers.get("access-control-request-method", "")
                headers.append((b"access-control-allow-methods", req_method.encode("latin-1")))
            else:
                headers.append(
                    (
                        b"access-control-allow-methods",
                        ", ".join(self.allow_methods).encode("latin-1"),
                    )
                )

            # Headers
            if self.allow_all_headers:
                req_headers = request_headers.get("access-control-request-headers", "")
                headers.append((b"access-control-allow-headers", req_headers.encode("latin-1")))
            else:
                headers.append(
                    (
                        b"access-control-allow-headers",
                        ", ".join(self.allow_headers).encode("latin-1"),
                    )
                )

            if self.allow_credentials:
                headers.append((b"access-control-allow-credentials", b"true"))

            headers.append((b"access-control-max-age", str(self.max_age).encode()))

        await send(
            {
                "type": "http.response.start",
                "status": 200,
                "headers": headers,
            }
        )
        await send({"type": "http.response.body", "body": b""})

dispatch(scope, receive, send) async

Handle CORS preflight requests and inject CORS headers into responses.

Source code in FasterAPI/middleware.py
async def dispatch(
    self,
    scope: dict[str, Any],
    receive: ASGIApp,
    send: ASGIApp,
) -> None:
    """Handle CORS preflight requests and inject CORS headers into responses."""
    headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
    request_headers = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in headers_raw}
    origin = request_headers.get("origin")
    method = scope.get("method", "GET")

    # Preflight
    if method == "OPTIONS" and "access-control-request-method" in request_headers:
        await self._preflight_response(send, origin, request_headers)
        return

    # Normal request — intercept send to inject CORS headers
    cors_headers = self._build_cors_headers(origin)

    async def send_with_cors(message: dict[str, Any]) -> None:
        if message["type"] == "http.response.start":
            existing = list(message.get("headers", []))
            existing.extend(cors_headers)
            message = {**message, "headers": existing}
        await send(message)

    await self.app(scope, receive, send_with_cors)

Cookie

Declare a cookie parameter with an optional default value.

Source code in FasterAPI/params.py
class Cookie:
    """Declare a cookie parameter with an optional default value."""

    __slots__ = ("default",)

    def __init__(self, default: Any = None) -> None:
        self.default = default

    def __repr__(self) -> str:
        return f"Cookie(default={self.default!r})"

DatabasePoolMiddleware

Bases: BaseHTTPMiddleware

Attach a shared pool/engine object to scope["state"] for handlers.

Typical usage with SQLAlchemy::

engine = create_async_engine(url, pool_size=20)

app.add_middleware(DatabasePoolMiddleware, pool=engine, state_key="engine")

def get_engine(request: Request):
    return request.state["engine"]

For asyncpg, pass the asyncpg.Pool instance as pool.

Source code in FasterAPI/production.py
class DatabasePoolMiddleware(BaseHTTPMiddleware):
    """Attach a shared pool/engine object to ``scope["state"]`` for handlers.

    Typical usage with SQLAlchemy::

        engine = create_async_engine(url, pool_size=20)

        app.add_middleware(DatabasePoolMiddleware, pool=engine, state_key="engine")

        def get_engine(request: Request):
            return request.state["engine"]

    For **asyncpg**, pass the ``asyncpg.Pool`` instance as ``pool``.
    """

    def __init__(
        self,
        app: ASGIApp,
        *,
        pool: Any,
        state_key: str = "db_pool",
    ) -> None:
        super().__init__(app)
        self.pool = pool
        self.state_key = state_key

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        scope.setdefault("state", {})[self.state_key] = self.pool
        await self.app(scope, receive, send)

Depends

Declare a dependency to be resolved and injected into a route handler.

Source code in FasterAPI/dependencies.py
class Depends:
    """Declare a dependency to be resolved and injected into a route handler."""

    __slots__ = ("dependency", "use_cache")

    def __init__(self, dependency: Callable[..., Any] | None = None, *, use_cache: bool = True) -> None:
        self.dependency = dependency
        self.use_cache = use_cache

    def __repr__(self) -> str:
        name = self.dependency.__name__ if self.dependency else "..."
        return f"Depends({name})"

EventSourceResponse

Server-Sent Events (SSE) response.

Streams events to the client in the text/event-stream format.

Usage::

async def event_generator():
    yield {"data": "hello"}
    yield {"event": "update", "data": "world", "id": "1"}

@app.get("/stream")
async def stream():
    return EventSourceResponse(event_generator())
Source code in FasterAPI/response.py
class EventSourceResponse:
    """Server-Sent Events (SSE) response.

    Streams events to the client in the ``text/event-stream`` format.

    Usage::

        async def event_generator():
            yield {"data": "hello"}
            yield {"event": "update", "data": "world", "id": "1"}

        @app.get("/stream")
        async def stream():
            return EventSourceResponse(event_generator())
    """

    def __init__(
        self,
        content: AsyncIterator[dict[str, str]] | Iterator[dict[str, str]],
        status_code: int = 200,
        headers: dict[str, str] | None = None,
        ping_interval: float | None = None,
    ) -> None:
        self.content = content
        self.status_code = status_code
        self.headers = headers or {}
        self.ping_interval = ping_interval

    @staticmethod
    def _format_event(event: dict[str, str] | str) -> bytes:
        if isinstance(event, str):
            return f"data: {event}\n\n".encode()
        lines: list[str] = []
        if "id" in event:
            lines.append(f"id: {event['id']}")
        if "event" in event:
            lines.append(f"event: {event['event']}")
        if "data" in event:
            for line in event["data"].splitlines():
                lines.append(f"data: {line}")
        if "retry" in event:
            lines.append(f"retry: {event['retry']}")
        return ("\n".join(lines) + "\n\n").encode()

    def _build_headers(self) -> list[tuple[bytes, bytes]]:
        raw: list[tuple[bytes, bytes]] = [
            (b"content-type", b"text/event-stream"),
            (b"cache-control", b"no-cache"),
            (b"connection", b"keep-alive"),
            (b"x-accel-buffering", b"no"),
        ]
        for key, value in self.headers.items():
            raw.append((key.lower().encode("latin-1"), value.encode("latin-1")))
        return raw

    async def to_asgi(self, send: ASGIApp) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self._build_headers(),
            }
        )
        if hasattr(self.content, "__aiter__"):
            async for event in self.content:
                chunk = self._format_event(event)
                await send({"type": "http.response.body", "body": chunk, "more_body": True})
        else:
            for event in self.content:
                chunk = self._format_event(event)
                await send({"type": "http.response.body", "body": chunk, "more_body": True})
        await send({"type": "http.response.body", "body": b"", "more_body": False})

Faster

The main FasterAPI application class, implementing the ASGI interface.

Source code in FasterAPI/app.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
class Faster:
    """The main FasterAPI application class, implementing the ASGI interface."""

    __slots__ = (
        "title",
        "version",
        "description",
        "openapi_url",
        "docs_url",
        "redoc_url",
        "openapi_tags",
        "terms_of_service",
        "contact",
        "license_info",
        "routes",
        "startup_handlers",
        "shutdown_handlers",
        "lifespan",
        "middleware",
        "exception_handlers",
        "_router",
        "_openapi_cache",
        "_middleware_app",
        "_ws_routes",
        "_mounts",
        "max_body_size",
        "stream_request_body",
        "stream_multipart",
    )

    def __init__(
        self,
        *,
        title: str = "FasterAPI",
        version: str | None = None,
        description: str = "",
        openapi_url: str | None = "/openapi.json",
        docs_url: str | None = "/docs",
        redoc_url: str | None = "/redoc",
        openapi_tags: list[dict[str, Any]] | None = None,
        terms_of_service: str | None = None,
        contact: dict[str, str] | None = None,
        license_info: dict[str, str] | None = None,
        lifespan: Callable[[Faster], Any] | None = None,
        max_body_size: int | None = None,
        stream_request_body: bool = False,
        stream_multipart: bool = False,
    ) -> None:
        self.title = title
        self.version = version if version is not None else get_version()
        self.description = description
        self.openapi_url = openapi_url
        self.docs_url = docs_url
        self.redoc_url = redoc_url
        self.openapi_tags = openapi_tags
        self.terms_of_service = terms_of_service
        self.contact = contact
        self.license_info = license_info
        self.lifespan = lifespan
        self.routes: list[dict[str, Any]] = []
        self.startup_handlers: list[Callable[[], Any]] = []
        self.shutdown_handlers: list[Callable[[], Any]] = []
        self.middleware: list[dict[str, Any]] = []
        self.exception_handlers: dict[type, Any] = {}
        self._router = RadixRouter()
        self._openapi_cache: dict[str, Any] | None = None
        self._middleware_app: ASGIApp | None = None
        self._ws_routes: dict[str, ASGIApp] = {}
        self._mounts: list[tuple[str, ASGIApp]] = []
        self.max_body_size = max_body_size
        self.stream_request_body = stream_request_body
        self.stream_multipart = stream_multipart
        self._setup_openapi_routes()

    def __repr__(self) -> str:
        return f"<Faster routes={len(self.routes)}>"

    # ------------------------------------------------------------------
    #  OpenAPI auto-routes
    # ------------------------------------------------------------------

    def _setup_openapi_routes(self) -> None:
        if self.openapi_url is not None:
            api_url = self.openapi_url
            app_ref = self

            async def openapi_schema() -> JSONResponse:
                spec = generate_openapi(
                    app_ref,
                    title=app_ref.title,
                    version=app_ref.version,
                    description=app_ref.description,
                    openapi_tags=app_ref.openapi_tags,
                    terms_of_service=app_ref.terms_of_service,
                    contact=app_ref.contact,
                    license_info=app_ref.license_info,
                )
                return JSONResponse(spec)

            self._add_route(
                "GET",
                api_url,
                openapi_schema,
                tags=["openapi"],
                summary="OpenAPI Schema",
                response_model=None,
                status_code=200,
                deprecated=False,
                responses=None,
                dependencies=None,
            )

        if self.docs_url is not None and self.openapi_url is not None:
            ourl, t = self.openapi_url, self.title

            async def swagger_docs() -> HTMLResponse:
                return HTMLResponse(swagger_ui_html(ourl, title=f"{t} - Swagger UI"))

            self._add_route(
                "GET",
                self.docs_url,
                swagger_docs,
                tags=["openapi"],
                summary="Swagger UI",
                response_model=None,
                status_code=200,
                deprecated=False,
                responses=None,
                dependencies=None,
            )

        if self.redoc_url is not None and self.openapi_url is not None:
            ourl, t = self.openapi_url, self.title

            async def redoc_docs() -> HTMLResponse:
                return HTMLResponse(redoc_html(ourl, title=f"{t} - ReDoc"))

            self._add_route(
                "GET",
                self.redoc_url,
                redoc_docs,
                tags=["openapi"],
                summary="ReDoc",
                response_model=None,
                status_code=200,
                deprecated=False,
                responses=None,
                dependencies=None,
            )

    # ------------------------------------------------------------------
    #  ASGI interface
    # ------------------------------------------------------------------

    async def __call__(
        self,
        scope: dict[str, Any],
        receive: Any,
        send: Any,
    ) -> None:
        if self.middleware:
            if self._middleware_app is None:
                self._middleware_app = self._build_middleware_chain()
            await self._middleware_app(scope, receive, send)
        else:
            await self._asgi_app(scope, receive, send)

    async def _asgi_app(
        self,
        scope: dict[str, Any],
        receive: Any,
        send: Any,
    ) -> None:
        scope_type = scope["type"]
        if scope_type == "http":
            # Check mounts first
            path: str = scope.get("path", "/")
            for prefix, mounted_app in self._mounts:
                if path == prefix or path.startswith(prefix + "/"):
                    sub_scope = dict(scope)
                    sub_scope["path"] = path[len(prefix) :] or "/"
                    sub_scope["root_path"] = scope.get("root_path", "") + prefix
                    await mounted_app(sub_scope, receive, send)
                    return
            await self._handle_http(scope, receive, send)
        elif scope_type == "websocket":
            await self._handle_websocket(scope, receive, send)
        elif scope_type == "lifespan":
            await self._handle_lifespan(scope, receive, send)

    def _build_middleware_chain(self) -> ASGIApp:
        app = self._asgi_app
        for entry in reversed(self.middleware):
            app = entry["class"](app, **entry["kwargs"])
        return app

    # ------------------------------------------------------------------
    #  HTTP dispatch — hot path
    # ------------------------------------------------------------------

    async def _handle_http(
        self,
        scope: dict[str, Any],
        receive: Any,
        send: Any,
    ) -> None:
        result = self._router.resolve(scope["method"], scope["path"])
        if result is None:
            await _send_error(send, 404, "Not Found")
            return

        handler, path_params, metadata = result
        scope["path_params"] = path_params
        st = scope.setdefault("state", {})
        st["max_body_size"] = self.max_body_size
        st["stream_body_no_buffer"] = self.stream_request_body
        st["stream_multipart"] = self.stream_multipart
        request = Request(scope, receive)
        bg_tasks = None

        extra_deps: list[Depends] | None = metadata.get("dependencies")

        try:
            response, bg_tasks = await _resolve_handler(handler, request, path_params, extra_deps)
        except RequestValidationError as exc:
            status, body, headers = await self._handle_exc(
                request,
                exc,
                RequestValidationError,
                _default_validation_exception_handler,
            )
            await _send_raw(send, status, body, headers)
            return
        except HTTPException as exc:
            status, body, headers = await self._handle_exc(
                request,
                exc,
                HTTPException,
                _default_http_exception_handler,
            )
            await _send_raw(send, status, body, headers)
            return
        except Exception as exc:
            for exc_class in type(exc).__mro__:
                if exc_class in self.exception_handlers:
                    response = self.exception_handlers[exc_class](request, exc)
                    if asyncio.iscoroutine(response):
                        response = await response
                    break
            else:
                await _send_error(send, 500, "Internal Server Error")
                return

        # Apply response_model filtering if configured
        response_model = metadata.get("response_model")
        response_model_include = metadata.get("response_model_include")
        response_model_exclude = metadata.get("response_model_exclude")
        if response_model is not None and not hasattr(response, "to_asgi"):
            response = _apply_response_model(response, response_model, response_model_include, response_model_exclude)

        await _send_response(send, metadata.get("status_code", 200), response)

        if bg_tasks is not None:
            await bg_tasks.run()

    async def _handle_exc(
        self,
        request: Request,
        exc: Exception,
        exc_class: type,
        default_handler: Any,
    ) -> tuple[int, bytes, list[tuple[bytes, bytes]]]:
        handler = self.exception_handlers.get(exc_class, default_handler)
        result = handler(request, exc)
        if asyncio.iscoroutine(result):
            result = await result
        return cast(tuple[int, bytes, list[tuple[bytes, bytes]]], result)

    # ------------------------------------------------------------------
    #  WebSocket dispatch
    # ------------------------------------------------------------------

    async def _handle_websocket(
        self,
        scope: dict[str, Any],
        receive: Any,
        send: Any,
    ) -> None:
        path = scope.get("path", "/")
        handler = self._ws_routes.get(path.rstrip("/") or "/")
        if handler is None:
            await send({"type": "websocket.close", "code": 4004})
            return
        ws = WebSocket(scope, receive, send)
        await handler(ws)

    # ------------------------------------------------------------------
    #  Lifespan
    # ------------------------------------------------------------------

    async def _handle_lifespan(
        self,
        scope: dict[str, Any],
        receive: Any,
        send: Any,
    ) -> None:
        if self.lifespan is not None:
            await self._run_lifespan_context(receive, send)
            return

        while True:
            message = await receive()
            if message["type"] == "lifespan.startup":
                try:
                    for h in self.startup_handlers:
                        r = h()
                        if asyncio.iscoroutine(r):
                            await r
                    await send({"type": "lifespan.startup.complete"})
                except Exception as exc:
                    await send({"type": "lifespan.startup.failed", "message": str(exc)})
                    return
            elif message["type"] == "lifespan.shutdown":
                try:
                    for h in self.shutdown_handlers:
                        r = h()
                        if asyncio.iscoroutine(r):
                            await r
                    await send({"type": "lifespan.shutdown.complete"})
                except Exception:
                    pass
                return

    async def _run_lifespan_context(self, receive: Any, send: Any) -> None:
        """Run the lifespan async context manager."""
        assert self.lifespan is not None
        ctx = self.lifespan(self)
        # Support both @asynccontextmanager functions and plain async generators
        if hasattr(ctx, "__aenter__"):
            async with ctx:
                message = await receive()
                if message["type"] == "lifespan.startup":
                    await send({"type": "lifespan.startup.complete"})
                message = await receive()
                if message["type"] == "lifespan.shutdown":
                    await send({"type": "lifespan.shutdown.complete"})
        else:
            # Treat as async generator
            gen = ctx.__aiter__() if hasattr(ctx, "__aiter__") else ctx
            with contextlib.suppress(StopAsyncIteration):
                await gen.__anext__()
            message = await receive()
            if message["type"] == "lifespan.startup":
                await send({"type": "lifespan.startup.complete"})
            message = await receive()
            if message["type"] == "lifespan.shutdown":
                with contextlib.suppress(StopAsyncIteration):
                    await gen.__anext__()
                await send({"type": "lifespan.shutdown.complete"})

    # ------------------------------------------------------------------
    #  Route registration
    # ------------------------------------------------------------------

    def _add_route(
        self,
        method: str,
        path: str,
        handler: ASGIApp,
        *,
        tags: list[str],
        summary: str,
        response_model: Any,
        status_code: int,
        deprecated: bool,
        responses: dict[int | str, dict[str, Any]] | None,
        dependencies: list[Depends] | None,
        response_model_include: set[str] | None = None,
        response_model_exclude: set[str] | None = None,
        openapi_extra: dict[str, Any] | None = None,
    ) -> None:
        metadata: dict[str, Any] = {
            "tags": tags,
            "summary": summary,
            "response_model": response_model,
            "response_model_include": response_model_include,
            "response_model_exclude": response_model_exclude,
            "status_code": status_code,
            "deprecated": deprecated,
            "responses": responses,
            "dependencies": dependencies,
            "openapi_extra": openapi_extra,
        }
        self.routes.append({"method": method, "path": path, "handler": handler, **metadata})
        self._router.add_route(method, path, handler, metadata)
        compile_handler(handler)

    def _route_decorator(self, method: str, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route(
                method,
                path,
                handler,
                tags=kw.get("tags") or [],
                summary=kw.get("summary", ""),
                response_model=kw.get("response_model"),
                response_model_include=kw.get("response_model_include"),
                response_model_exclude=kw.get("response_model_exclude"),
                status_code=kw.get("status_code", 200),
                deprecated=kw.get("deprecated", False),
                responses=kw.get("responses"),
                dependencies=kw.get("dependencies"),
                openapi_extra=kw.get("openapi_extra"),
            )
            return handler

        return decorator

    def get(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("GET", path, **kw)

    def post(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("POST", path, **kw)

    def put(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("PUT", path, **kw)

    def delete(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("DELETE", path, **kw)

    def patch(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        return self._route_decorator("PATCH", path, **kw)

    def websocket(self, path: str) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._ws_routes[path.rstrip("/") or "/"] = handler
            return handler

        return decorator

    # ------------------------------------------------------------------
    #  Sub-application mounting
    # ------------------------------------------------------------------

    def mount(self, path: str, app: ASGIApp, name: str | None = None) -> None:
        """Mount an ASGI sub-application (e.g. StaticFiles) at *path*.

        Example::

            app.mount("/static", StaticFiles(directory="static"), name="static")
        """
        prefix = path.rstrip("/")
        self._mounts.append((prefix, app))

    # ------------------------------------------------------------------
    #  Lifecycle hooks
    # ------------------------------------------------------------------

    def on_startup(self, handler: Callable[[], Any]) -> Callable[[], Any]:
        self.startup_handlers.append(handler)
        return handler

    def on_shutdown(self, handler: Callable[[], Any]) -> Callable[[], Any]:
        self.shutdown_handlers.append(handler)
        return handler

    # ------------------------------------------------------------------
    #  Middleware & exception handlers
    # ------------------------------------------------------------------

    def add_middleware(self, middleware_class: type, **kwargs: Any) -> None:
        self.middleware.append({"class": middleware_class, "kwargs": kwargs})
        self._middleware_app = None  # invalidate cached chain

    def add_exception_handler(self, exc_class: type, handler: Any) -> None:
        self.exception_handlers[exc_class] = handler

    # ------------------------------------------------------------------
    #  Router inclusion
    # ------------------------------------------------------------------

    def include_router(
        self,
        router: Any,
        *,
        prefix: str = "",
        tags: Sequence[str] = (),
        dependencies: list[Depends] | None = None,
    ) -> None:
        pfx = prefix.rstrip("/")
        for route in router.routes:
            merged = dict(route)
            merged["path"] = pfx + merged["path"]
            merged["tags"] = list(tags) + merged["tags"]
            # Merge router-level dependencies with route-level dependencies
            route_deps: list[Depends] = merged.get("dependencies") or []
            router_deps: list[Depends] = getattr(router, "dependencies", None) or []
            caller_deps: list[Depends] = dependencies or []
            merged_deps = caller_deps + router_deps + route_deps
            merged["dependencies"] = merged_deps if merged_deps else None
            self.routes.append(merged)
            metadata = {k: v for k, v in merged.items() if k not in ("method", "path", "handler")}
            self._router.add_route(merged["method"], merged["path"], merged["handler"], metadata)
            compile_handler(merged["handler"])

mount(path, app, name=None)

Mount an ASGI sub-application (e.g. StaticFiles) at path.

Example::

app.mount("/static", StaticFiles(directory="static"), name="static")
Source code in FasterAPI/app.py
def mount(self, path: str, app: ASGIApp, name: str | None = None) -> None:
    """Mount an ASGI sub-application (e.g. StaticFiles) at *path*.

    Example::

        app.mount("/static", StaticFiles(directory="static"), name="static")
    """
    prefix = path.rstrip("/")
    self._mounts.append((prefix, app))

FasterRouter

API router for grouping routes with a common prefix, tags, and dependencies.

Source code in FasterAPI/router.py
class FasterRouter:
    """API router for grouping routes with a common prefix, tags, and dependencies."""

    __slots__ = ("prefix", "tags", "routes", "dependencies")

    def __init__(
        self,
        prefix: str = "",
        tags: list[str] | None = None,
        dependencies: list[Any] | None = None,
    ) -> None:
        self.prefix = prefix.rstrip("/")
        self.tags: list[str] = tags or []
        self.routes: list[dict[str, Any]] = []
        self.dependencies: list[Any] = dependencies or []

    def _add_route(
        self,
        method: str,
        path: str,
        handler: ASGIApp,
        *,
        tags: list[str],
        summary: str,
        response_model: Any,
        status_code: int,
        deprecated: bool,
        responses: dict[int | str, dict[str, Any]] | None,
        dependencies: list[Any] | None,
        response_model_include: set[str] | None = None,
        response_model_exclude: set[str] | None = None,
        openapi_extra: dict[str, Any] | None = None,
    ) -> None:
        full_path = self.prefix + path
        self.routes.append(
            {
                "method": method,
                "path": full_path,
                "handler": handler,
                "tags": self.tags + tags,
                "summary": summary,
                "response_model": response_model,
                "response_model_include": response_model_include,
                "response_model_exclude": response_model_exclude,
                "status_code": status_code,
                "deprecated": deprecated,
                "responses": responses,
                "dependencies": dependencies,
                "openapi_extra": openapi_extra,
            }
        )

    # Decorator factories — identical API to Faster app

    def get(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("GET", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def post(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("POST", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def put(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("PUT", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def delete(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("DELETE", path, handler, **_route_kw(kw))
            return handler

        return decorator

    def patch(self, path: str, **kw: Any) -> Callable[[ASGIApp], ASGIApp]:
        def decorator(handler: ASGIApp) -> ASGIApp:
            self._add_route("PATCH", path, handler, **_route_kw(kw))
            return handler

        return decorator

File

Declare a file upload parameter.

Source code in FasterAPI/params.py
class File:
    """Declare a file upload parameter."""

    __slots__ = ("description",)

    def __init__(self, *, description: str = "") -> None:
        self.description = description

    def __repr__(self) -> str:
        return "File()"

FileResponse

Response that sends a file as an attachment.

Source code in FasterAPI/response.py
class FileResponse:
    """Response that sends a file as an attachment."""

    def __init__(
        self,
        path: str | Path,
        filename: str | None = None,
        media_type: str | None = None,
        headers: dict[str, str] | None = None,
        status_code: int = 200,
    ) -> None:
        self.path = Path(path)
        self.filename = filename or self.path.name
        self.status_code = status_code
        self.headers = headers or {}
        if media_type is not None:
            self.media_type = media_type
        else:
            mt, _ = mimetypes.guess_type(str(self.path))
            self.media_type = mt or "application/octet-stream"

    def _build_headers(self) -> list[tuple[bytes, bytes]]:
        raw: list[tuple[bytes, bytes]] = [
            (b"content-type", self.media_type.encode("latin-1")),
            (
                b"content-disposition",
                f'attachment; filename="{self.filename}"'.encode("latin-1"),
            ),
        ]
        for key, value in self.headers.items():
            raw.append((key.lower().encode("latin-1"), value.encode("latin-1")))
        return raw

    async def to_asgi(self, send: ASGIApp) -> None:
        """Read the file and send it through the ASGI interface."""
        content = await asyncio.get_running_loop().run_in_executor(
            None,
            self.path.read_bytes,
        )
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self._build_headers(),
            }
        )
        await send({"type": "http.response.body", "body": content})

to_asgi(send) async

Read the file and send it through the ASGI interface.

Source code in FasterAPI/response.py
async def to_asgi(self, send: ASGIApp) -> None:
    """Read the file and send it through the ASGI interface."""
    content = await asyncio.get_running_loop().run_in_executor(
        None,
        self.path.read_bytes,
    )
    await send(
        {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self._build_headers(),
        }
    )
    await send({"type": "http.response.body", "body": content})

Form

Declare a form field parameter with an optional default value.

Source code in FasterAPI/params.py
class Form:
    """Declare a form field parameter with an optional default value."""

    __slots__ = ("default", "description")

    def __init__(self, default: Any = _MISSING, *, description: str = "") -> None:
        self.default = default
        self.description = description

    def __repr__(self) -> str:
        if self.default is _MISSING:
            return "Form()"
        return f"Form(default={self.default!r})"

FormData

Bases: dict[str, Any]

Dict subclass for form data that may contain UploadFile values.

Source code in FasterAPI/datastructures.py
class FormData(dict[str, Any]):
    """Dict subclass for form data that may contain UploadFile values."""

    async def close(self) -> None:
        """Close all UploadFile instances in this form data."""
        for value in self.values():
            if isinstance(value, UploadFile):
                await value.close()

close() async

Close all UploadFile instances in this form data.

Source code in FasterAPI/datastructures.py
async def close(self) -> None:
    """Close all UploadFile instances in this form data."""
    for value in self.values():
        if isinstance(value, UploadFile):
            await value.close()

GZipMiddleware

Bases: BaseHTTPMiddleware

Middleware that compresses responses using gzip when the client supports it.

Source code in FasterAPI/middleware.py
class GZipMiddleware(BaseHTTPMiddleware):
    """Middleware that compresses responses using gzip when the client supports it."""

    def __init__(self, app: ASGIApp, *, minimum_size: int = 1000) -> None:
        super().__init__(app)
        self.minimum_size = minimum_size

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        """Compress the response body with gzip if it exceeds the minimum size."""
        headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
        accept_encoding = ""
        for k, v in headers_raw:
            if k.decode("latin-1").lower() == "accept-encoding":
                accept_encoding = v.decode("latin-1")
                break

        if "gzip" not in accept_encoding.lower():
            await self.app(scope, receive, send)
            return

        # Collect response to potentially compress
        initial_message: dict[str, Any] | None = None
        body_parts: list[bytes] = []

        async def buffered_send(message: dict[str, Any]) -> None:
            nonlocal initial_message
            if message["type"] == "http.response.start":
                initial_message = message
            elif message["type"] == "http.response.body":
                body_parts.append(message.get("body", b""))
                if not message.get("more_body", False):
                    # All body collected — decide whether to compress
                    full_body = b"".join(body_parts)
                    if len(full_body) >= self.minimum_size and initial_message is not None:
                        compressed = gzip.compress(full_body)
                        headers = list(initial_message.get("headers", []))
                        headers.append((b"content-encoding", b"gzip"))
                        headers.append((b"vary", b"Accept-Encoding"))
                        # Update content-length
                        headers = [(k, v) for k, v in headers if k.lower() != b"content-length"]
                        headers.append((b"content-length", str(len(compressed)).encode()))
                        await send({**initial_message, "headers": headers})
                        await send({"type": "http.response.body", "body": compressed})
                    else:
                        if initial_message is not None:
                            await send(initial_message)
                        await send({"type": "http.response.body", "body": full_body})

        await self.app(scope, receive, buffered_send)

dispatch(scope, receive, send) async

Compress the response body with gzip if it exceeds the minimum size.

Source code in FasterAPI/middleware.py
async def dispatch(
    self,
    scope: dict[str, Any],
    receive: ASGIApp,
    send: ASGIApp,
) -> None:
    """Compress the response body with gzip if it exceeds the minimum size."""
    headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
    accept_encoding = ""
    for k, v in headers_raw:
        if k.decode("latin-1").lower() == "accept-encoding":
            accept_encoding = v.decode("latin-1")
            break

    if "gzip" not in accept_encoding.lower():
        await self.app(scope, receive, send)
        return

    # Collect response to potentially compress
    initial_message: dict[str, Any] | None = None
    body_parts: list[bytes] = []

    async def buffered_send(message: dict[str, Any]) -> None:
        nonlocal initial_message
        if message["type"] == "http.response.start":
            initial_message = message
        elif message["type"] == "http.response.body":
            body_parts.append(message.get("body", b""))
            if not message.get("more_body", False):
                # All body collected — decide whether to compress
                full_body = b"".join(body_parts)
                if len(full_body) >= self.minimum_size and initial_message is not None:
                    compressed = gzip.compress(full_body)
                    headers = list(initial_message.get("headers", []))
                    headers.append((b"content-encoding", b"gzip"))
                    headers.append((b"vary", b"Accept-Encoding"))
                    # Update content-length
                    headers = [(k, v) for k, v in headers if k.lower() != b"content-length"]
                    headers.append((b"content-length", str(len(compressed)).encode()))
                    await send({**initial_message, "headers": headers})
                    await send({"type": "http.response.body", "body": compressed})
                else:
                    if initial_message is not None:
                        await send(initial_message)
                    await send({"type": "http.response.body", "body": full_body})

    await self.app(scope, receive, buffered_send)

HTMLResponse

Bases: Response

Response with HTML content type.

Source code in FasterAPI/response.py
class HTMLResponse(Response):
    """Response with HTML content type."""

    media_type = "text/html"

    def __init__(
        self,
        content: Any = None,
        status_code: int = 200,
        headers: dict[str, str] | None = None,
    ) -> None:
        super().__init__(content, status_code, headers)

HTTPBasic

Extracts credentials from an HTTP Basic Authorization header.

Use as a dependency:

http_basic = HTTPBasic()

@app.get("/protected")
async def protected(creds: HTTPBasicCredentials = Depends(http_basic)):
    ...
Source code in FasterAPI/security.py
class HTTPBasic:
    """Extracts credentials from an HTTP Basic Authorization header.

    Use as a dependency:

        http_basic = HTTPBasic()

        @app.get("/protected")
        async def protected(creds: HTTPBasicCredentials = Depends(http_basic)):
            ...
    """

    __slots__ = ("scheme_name", "realm", "auto_error")

    def __init__(
        self,
        *,
        scheme_name: str | None = None,
        realm: str | None = None,
        auto_error: bool = True,
    ) -> None:
        self.scheme_name = scheme_name or self.__class__.__name__
        self.realm = realm
        self.auto_error = auto_error

    async def __call__(self, request: Request) -> HTTPBasicCredentials | None:
        authorization = request.headers.get("authorization", "")
        if not authorization.startswith("Basic "):
            if self.auto_error:
                www_auth = f'Basic realm="{self.realm}"' if self.realm else "Basic"
                raise HTTPException(
                    status_code=401,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": www_auth},
                )
            return None
        try:
            decoded = base64.b64decode(authorization[6:]).decode("latin-1")
            username, _, password = decoded.partition(":")
        except (binascii.Error, UnicodeDecodeError) as exc:
            if self.auto_error:
                raise HTTPException(status_code=400, detail="Invalid authentication credentials") from exc
            return None
        return HTTPBasicCredentials(username=username, password=password)

HTTPBasicCredentials

Username and password extracted from an HTTP Basic Authorization header.

Source code in FasterAPI/security.py
class HTTPBasicCredentials:
    """Username and password extracted from an HTTP Basic Authorization header."""

    __slots__ = ("username", "password")

    def __init__(self, username: str, password: str) -> None:
        self.username = username
        self.password = password

    def __repr__(self) -> str:
        return f"HTTPBasicCredentials(username={self.username!r})"

HTTPException

Bases: Exception

An HTTP exception that results in an error response with the given status code.

Source code in FasterAPI/exceptions.py
class HTTPException(Exception):
    """An HTTP exception that results in an error response with the given status code."""

    def __init__(
        self,
        status_code: int = 500,
        detail: Any = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        self.status_code = status_code
        self.detail = detail
        self.headers = headers

    def __repr__(self) -> str:
        return f"HTTPException(status_code={self.status_code}, detail={self.detail!r})"

HTTPSRedirectMiddleware

Bases: BaseHTTPMiddleware

Middleware that redirects all HTTP requests to HTTPS.

Source code in FasterAPI/middleware.py
class HTTPSRedirectMiddleware(BaseHTTPMiddleware):
    """Middleware that redirects all HTTP requests to HTTPS."""

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        if scope.get("scheme", "http") == "https":
            await self.app(scope, receive, send)
            return

        # Build redirect URL
        headers_raw: list[tuple[bytes, bytes]] = scope.get("headers", [])
        host = "localhost"
        for k, v in headers_raw:
            if k.decode("latin-1").lower() == "host":
                host = v.decode("latin-1")
                break

        path = scope.get("path", "/")
        qs = scope.get("query_string", b"")
        url = f"https://{host}{path}"
        if qs:
            url += f"?{qs.decode('latin-1')}"

        await send(
            {
                "type": "http.response.start",
                "status": 301,
                "headers": [
                    (b"location", url.encode("latin-1")),
                    (b"content-type", b"text/plain"),
                ],
            }
        )
        await send({"type": "http.response.body", "body": b"Redirecting to HTTPS"})

Header

Declare a header parameter with optional default and alias.

Source code in FasterAPI/params.py
class Header:
    """Declare a header parameter with optional default and alias."""

    __slots__ = ("default", "alias", "convert_underscores")

    def __init__(
        self,
        default: Any = None,
        *,
        alias: str | None = None,
        convert_underscores: bool = True,
    ) -> None:
        self.default = default
        self.alias = alias
        self.convert_underscores = convert_underscores

    def __repr__(self) -> str:
        return f"Header(default={self.default!r})"

JSONResponse

Bases: Response

Response that serializes content as JSON using msgspec (with datetime/UUID/Decimal support).

Pass bytes, bytearray, or memoryview to skip encoding and send pre-serialised JSON (hot-path optimisation when the payload is fixed at import time or cached externally).

Source code in FasterAPI/response.py
class JSONResponse(Response):
    """Response that serializes content as JSON using msgspec (with datetime/UUID/Decimal support).

    Pass ``bytes``, ``bytearray``, or ``memoryview`` to skip encoding and send **pre-serialised**
    JSON (hot-path optimisation when the payload is fixed at import time or cached externally).
    """

    media_type = "application/json"

    def _render(self, content: Any) -> bytes:
        if isinstance(content, memoryview):
            return bytes(content)
        if isinstance(content, (bytes, bytearray)):
            return bytes(content)
        return encode_json(content)

JWTBearer

Decode a Bearer JWT from Authorization and inject claims as a dict.

Usage::

jwt_scheme = JWTBearer(secret=settings.jwt_secret)

@app.get("/me")
async def me(claims: dict = Depends(jwt_scheme)):
    user_id = claims.get("sub")
Source code in FasterAPI/jwt_auth.py
class JWTBearer:
    """Decode a Bearer JWT from ``Authorization`` and inject claims as a ``dict``.

    Usage::

        jwt_scheme = JWTBearer(secret=settings.jwt_secret)

        @app.get(\"/me\")
        async def me(claims: dict = Depends(jwt_scheme)):
            user_id = claims.get(\"sub\")
    """

    __slots__ = ("secret", "public_key", "algorithms", "audience", "issuer", "auto_error", "scheme_name")

    def __init__(
        self,
        secret: str | None = None,
        *,
        algorithms: list[str] | None = None,
        audience: str | None = None,
        issuer: str | None = None,
        auto_error: bool = True,
        scheme_name: str | None = None,
        public_key: str | None = None,
    ) -> None:
        if secret is None and public_key is None:
            raise ValueError("JWTBearer requires secret (HMAC) or public_key (asymmetric)")
        if secret is not None and public_key is not None:
            raise ValueError("Provide either secret or public_key, not both")
        self.secret = secret
        self.public_key = public_key
        self.algorithms = algorithms or ["HS256"]
        self.audience = audience
        self.issuer = issuer
        self.auto_error = auto_error
        self.scheme_name = scheme_name or self.__class__.__name__

    async def __call__(self, request: Request) -> dict[str, Any] | None:
        authorization = request.headers.get("authorization", "")
        if not authorization.startswith("Bearer "):
            if self.auto_error:
                raise HTTPException(
                    status_code=401,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            return None

        token = authorization[7:]
        jwt_mod = _jwt_module()
        key = self.secret if self.public_key is None else self.public_key
        try:
            return jwt_mod.decode(
                token,
                key,
                algorithms=self.algorithms,
                audience=self.audience,
                issuer=self.issuer,
                options={"require": ["exp"]},
            )
        except getattr(getattr(jwt_mod, "exceptions", jwt_mod), "InvalidTokenError", Exception) as exc:
            if self.auto_error:
                raise HTTPException(
                    status_code=401,
                    detail="Could not validate credentials",
                    headers={"WWW-Authenticate": "Bearer"},
                ) from exc
            return None

Jinja2Templates

Render Jinja2 templates as HTML responses.

Usage::

templates = Jinja2Templates(directory="templates")

@app.get("/hello/{name}")
async def hello(request: Request, name: str):
    return templates.TemplateResponse(request, "hello.html", {"name": name})

Requires jinja2 to be installed: pip install jinja2.

Source code in FasterAPI/templating.py
class Jinja2Templates:
    """Render Jinja2 templates as HTML responses.

    Usage::

        templates = Jinja2Templates(directory="templates")

        @app.get("/hello/{name}")
        async def hello(request: Request, name: str):
            return templates.TemplateResponse(request, "hello.html", {"name": name})

    Requires ``jinja2`` to be installed: ``pip install jinja2``.
    """

    def __init__(self, directory: str | Path) -> None:
        try:
            import jinja2
        except ImportError as exc:
            raise ImportError("Jinja2Templates requires jinja2. Install with: pip install jinja2") from exc

        self.env = jinja2.Environment(
            loader=jinja2.FileSystemLoader(str(directory)),
            autoescape=jinja2.select_autoescape(["html", "xml"]),
        )

    def get_template(self, name: str) -> Any:
        return self.env.get_template(name)

    def TemplateResponse(  # noqa: N802
        self,
        request: Request,
        name: str,
        context: dict[str, Any] | None = None,
        status_code: int = 200,
        headers: dict[str, str] | None = None,
        media_type: str = "text/html",
    ) -> Response:
        ctx = dict(context) if context else {}
        ctx.setdefault("request", request)
        template = self.get_template(name)
        content = template.render(ctx)
        return HTMLResponse(content=content, status_code=status_code, headers=headers)

OAuth2PasswordBearer

Extracts a Bearer token from the Authorization header.

Use as a dependency:

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

@app.get("/me")
async def me(token: str = Depends(oauth2_scheme)):
    ...
Source code in FasterAPI/security.py
class OAuth2PasswordBearer:
    """Extracts a Bearer token from the Authorization header.

    Use as a dependency:

        oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

        @app.get("/me")
        async def me(token: str = Depends(oauth2_scheme)):
            ...
    """

    __slots__ = ("tokenUrl", "scheme_name", "scopes", "auto_error")

    def __init__(
        self,
        tokenUrl: str,
        *,
        scheme_name: str | None = None,
        scopes: dict[str, str] | None = None,
        auto_error: bool = True,
    ) -> None:
        self.tokenUrl = tokenUrl
        self.scheme_name = scheme_name or self.__class__.__name__
        self.scopes: dict[str, str] = scopes or {}
        self.auto_error = auto_error

    async def __call__(self, request: Request) -> str | None:
        authorization = request.headers.get("authorization", "")
        if not authorization.startswith("Bearer "):
            if self.auto_error:
                raise HTTPException(
                    status_code=401,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            return None
        return authorization[7:]

OAuth2PasswordRequestForm

Parses an OAuth2 password flow form submission.

Use as a dependency:

@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
    form.username, form.password, form.scopes
Source code in FasterAPI/security.py
class OAuth2PasswordRequestForm:
    """Parses an OAuth2 password flow form submission.

    Use as a dependency:

        @app.post("/token")
        async def login(form: OAuth2PasswordRequestForm = Depends()):
            form.username, form.password, form.scopes
    """

    __slots__ = ("grant_type", "username", "password", "scopes", "client_id", "client_secret")

    def __init__(
        self,
        *,
        grant_type: str | None = None,
        username: str = "",
        password: str = "",
        scope: str = "",
        client_id: str | None = None,
        client_secret: str | None = None,
    ) -> None:
        self.grant_type = grant_type
        self.username = username
        self.password = password
        self.scopes: list[str] = scope.split() if scope else []
        self.client_id = client_id
        self.client_secret = client_secret

    @classmethod
    async def from_request(cls, request: Request) -> OAuth2PasswordRequestForm:
        """Parse form data from a request and return a populated instance."""
        form_data = await request.form()
        return cls(
            grant_type=str(form_data.get("grant_type")) if form_data.get("grant_type") is not None else None,
            username=str(form_data.get("username", "")),
            password=str(form_data.get("password", "")),
            scope=str(form_data.get("scope", "")),
            client_id=str(form_data.get("client_id")) if form_data.get("client_id") is not None else None,
            client_secret=str(form_data.get("client_secret")) if form_data.get("client_secret") is not None else None,
        )

from_request(request) async classmethod

Parse form data from a request and return a populated instance.

Source code in FasterAPI/security.py
@classmethod
async def from_request(cls, request: Request) -> OAuth2PasswordRequestForm:
    """Parse form data from a request and return a populated instance."""
    form_data = await request.form()
    return cls(
        grant_type=str(form_data.get("grant_type")) if form_data.get("grant_type") is not None else None,
        username=str(form_data.get("username", "")),
        password=str(form_data.get("password", "")),
        scope=str(form_data.get("scope", "")),
        client_id=str(form_data.get("client_id")) if form_data.get("client_id") is not None else None,
        client_secret=str(form_data.get("client_secret")) if form_data.get("client_secret") is not None else None,
    )

Path

Declare a path parameter with optional default and description.

Source code in FasterAPI/params.py
class Path:
    """Declare a path parameter with optional default and description."""

    __slots__ = ("default", "description")

    def __init__(self, default: Any = _MISSING, *, description: str = "") -> None:
        self.default = default
        self.description = description

    def __repr__(self) -> str:
        if self.default is _MISSING:
            return "Path()"
        return f"Path(default={self.default!r})"

PlainTextResponse

Bases: Response

Response with plain text content type.

Source code in FasterAPI/response.py
class PlainTextResponse(Response):
    """Response with plain text content type."""

    media_type = "text/plain"

    def __init__(
        self,
        content: Any = None,
        status_code: int = 200,
        headers: dict[str, str] | None = None,
    ) -> None:
        super().__init__(content, status_code, headers)

Query

Declare a query parameter with optional default, description, and alias.

Source code in FasterAPI/params.py
class Query:
    """Declare a query parameter with optional default, description, and alias."""

    __slots__ = ("default", "description", "alias")

    def __init__(
        self,
        default: Any = None,
        *,
        description: str = "",
        alias: str | None = None,
    ) -> None:
        self.default = default
        self.description = description
        self.alias = alias

    def __repr__(self) -> str:
        return f"Query(default={self.default!r})"

RadixRouter

O(k) URL router using a compressed radix tree (k = path segments).

Source code in FasterAPI/router.py
class RadixRouter:
    """O(k) URL router using a compressed radix tree (k = path segments)."""

    __slots__ = ("root",)

    def __init__(self) -> None:
        self.root = RadixNode()

    # ------------------------------------------------------------------
    #  Route registration (called at startup — speed is less critical)
    # ------------------------------------------------------------------

    def add_route(
        self,
        method: str,
        path: str,
        handler: ASGIApp,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Register a handler for the given HTTP method and path pattern."""
        node = self.root
        for segment in _split(path):
            if segment[0] == "{" and segment[-1] == "}":
                param_name = segment[1:-1]
                child = node.children.get("*")
                if child is None:
                    child = RadixNode()
                    child.is_param = True
                    child.param_name = param_name
                    node.children["*"] = child
                node = child
            else:
                child = node.children.get(segment)
                if child is None:
                    child = RadixNode()
                    node.children[segment] = child
                node = child

        node.handlers[method.upper()] = (handler, metadata or {})

    # ------------------------------------------------------------------
    #  Route resolution — HOT PATH, called on every request
    # ------------------------------------------------------------------

    def resolve(
        self,
        method: str,
        path: str,
    ) -> tuple[ASGIApp, dict[str, str], dict[str, Any]] | None:
        """Resolve a path to (handler, path_params, metadata) or None."""
        segments = _split(path)
        params: dict[str, str] = {}

        node = self._walk(self.root, segments, 0, params)
        if node is None:
            return None

        entry = node.handlers.get(method.upper())
        if entry is None:
            return None

        return entry[0], params, entry[1]

    def _walk(
        self,
        node: RadixNode,
        segments: list[str],
        idx: int,
        params: dict[str, str],
    ) -> RadixNode | None:
        """Iterative-first tree walk with recursive fallback for param backtracking."""
        n = len(segments)
        while idx < n:
            seg = segments[idx]
            child = node.children.get(seg)
            if child is not None:
                node = child
                idx += 1
                continue
            param_child = node.children.get("*")
            if param_child is not None:
                assert param_child.param_name is not None
                params[param_child.param_name] = seg
                node = param_child
                idx += 1
                continue
            return None

        return node if node.handlers else None

add_route(method, path, handler, metadata=None)

Register a handler for the given HTTP method and path pattern.

Source code in FasterAPI/router.py
def add_route(
    self,
    method: str,
    path: str,
    handler: ASGIApp,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Register a handler for the given HTTP method and path pattern."""
    node = self.root
    for segment in _split(path):
        if segment[0] == "{" and segment[-1] == "}":
            param_name = segment[1:-1]
            child = node.children.get("*")
            if child is None:
                child = RadixNode()
                child.is_param = True
                child.param_name = param_name
                node.children["*"] = child
            node = child
        else:
            child = node.children.get(segment)
            if child is None:
                child = RadixNode()
                node.children[segment] = child
            node = child

    node.handlers[method.upper()] = (handler, metadata or {})

resolve(method, path)

Resolve a path to (handler, path_params, metadata) or None.

Source code in FasterAPI/router.py
def resolve(
    self,
    method: str,
    path: str,
) -> tuple[ASGIApp, dict[str, str], dict[str, Any]] | None:
    """Resolve a path to (handler, path_params, metadata) or None."""
    segments = _split(path)
    params: dict[str, str] = {}

    node = self._walk(self.root, segments, 0, params)
    if node is None:
        return None

    entry = node.handlers.get(method.upper())
    if entry is None:
        return None

    return entry[0], params, entry[1]

RateLimitMiddleware

Bases: BaseHTTPMiddleware

Simple sliding-window rate limit per client IP (in-memory).

Not suitable for multi-process deployments without a shared store—use Redis etc. for horizontal scale. For single-worker or development this is enough.

client from ASGI scope is used unless forwarded_for_header is set and the header exists (first hop).

Source code in FasterAPI/production.py
class RateLimitMiddleware(BaseHTTPMiddleware):
    """Simple sliding-window rate limit per client IP (in-memory).

    Not suitable for multi-process deployments without a shared store—use Redis
    etc. for horizontal scale. For single-worker or development this is enough.

    ``client`` from ASGI scope is used unless ``forwarded_for_header`` is set and
    the header exists (first hop).
    """

    def __init__(
        self,
        app: ASGIApp,
        *,
        requests_per_minute: int = 120,
        window_seconds: float = 60.0,
        forwarded_for_header: str | None = None,
    ) -> None:
        super().__init__(app)
        if requests_per_minute < 1:
            raise ValueError("requests_per_minute must be >= 1")
        self.requests_per_minute = requests_per_minute
        self.window_seconds = window_seconds
        self.forwarded_for_header = forwarded_for_header.lower() if forwarded_for_header else None
        self._hits: dict[str, deque[float]] = defaultdict(deque)

    def _client_key(self, scope: dict[str, Any]) -> str:
        if self.forwarded_for_header:
            for k, v in scope.get("headers", []):
                if k.decode("latin-1").lower() == self.forwarded_for_header:
                    first = v.decode("latin-1").split(",")[0].strip()
                    if first:
                        return first
                    break
        client = scope.get("client")
        return client[0] if client else "unknown"

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        key = self._client_key(scope)
        now = time.monotonic()
        cutoff = now - self.window_seconds
        dq = self._hits[key]
        while dq and dq[0] < cutoff:
            dq.popleft()

        if len(dq) >= self.requests_per_minute:
            await send(
                {
                    "type": "http.response.start",
                    "status": 429,
                    "headers": [
                        (b"content-type", b"application/json"),
                        (b"retry-after", b"60"),
                    ],
                }
            )
            await send({"type": "http.response.body", "body": b'{"detail":"Too Many Requests"}'})
            return

        dq.append(now)
        await self.app(scope, receive, send)

RedirectResponse

Bases: Response

Response that redirects to a different URL.

Source code in FasterAPI/response.py
class RedirectResponse(Response):
    """Response that redirects to a different URL."""

    def __init__(
        self,
        url: str,
        status_code: int = 307,
        headers: dict[str, str] | None = None,
    ) -> None:
        super().__init__(b"", status_code, headers)
        self.headers["location"] = url

    def _render(self, content: Any) -> bytes:
        if isinstance(content, bytes):
            return content
        return b""

RedisCacheMiddleware

Bases: BaseHTTPMiddleware

Cache GET responses (small bodies) in Redis.

Skips caching when the client sends Cache-Control: no-cache or when the response status is not in cacheable_statuses. Suitable for idempotent JSON APIs; validate before caching authenticated routes.

Requires redis>=5 with redis.asyncio and pip install redis.

Source code in FasterAPI/redis_cache.py
class RedisCacheMiddleware(BaseHTTPMiddleware):
    """Cache **GET** responses (small bodies) in Redis.

    Skips caching when the client sends ``Cache-Control: no-cache`` or when the
    response status is not in *cacheable_statuses*. Suitable for idempotent JSON
    APIs; validate before caching authenticated routes.

    Requires ``redis>=5`` with ``redis.asyncio`` and ``pip install redis``.
    """

    def __init__(
        self,
        app: ASGIApp,
        redis_client: Any,
        *,
        ttl: int = 300,
        key_prefix: str = "fasterapi:http:",
        methods: tuple[str, ...] = ("GET",),
        cacheable_statuses: tuple[int, ...] = (200,),
        max_body_bytes: int = 1_048_576,
    ) -> None:
        super().__init__(app)
        self.redis = redis_client
        self.ttl = ttl
        self.key_prefix = key_prefix
        self.methods = {m.upper() for m in methods}
        self.cacheable_statuses = cacheable_statuses
        self.max_body_bytes = max_body_bytes

    def _cache_key(self, scope: dict[str, Any]) -> str:
        path = scope.get("path", "/")
        qs = scope.get("query_string", b"").decode("latin-1")
        raw = f"{path}?{qs}".encode()
        digest = hashlib.sha256(raw).hexdigest()
        return f"{self.key_prefix}{digest}"

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        if scope["method"].upper() not in self.methods:
            await self.app(scope, receive, send)
            return

        headers_in = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in scope.get("headers", [])}
        if "no-cache" in headers_in.get("cache-control", "").lower():
            await self.app(scope, receive, send)
            return

        cache_key = self._cache_key(scope)
        try:
            cached = await self.redis.get(cache_key)
        except Exception:
            await self.app(scope, receive, send)
            return

        if cached:
            try:
                payload = json.loads(cached)
                status = int(payload["status"])
                res_headers = [(k.encode("latin-1"), v.encode("latin-1")) for k, v in payload["headers"]]
                body = bytes.fromhex(payload["body_hex"])
            except (KeyError, ValueError, TypeError):
                await self.app(scope, receive, send)
                return

            await send({"type": "http.response.start", "status": status, "headers": res_headers})
            await send({"type": "http.response.body", "body": body})
            return

        start_msg: dict[str, Any] | None = None
        body_parts: list[bytes] = []

        async def capturing_send(message: dict[str, Any]) -> None:
            nonlocal start_msg
            if message["type"] == "http.response.start":
                start_msg = message
            elif message["type"] == "http.response.body":
                body_parts.append(message.get("body", b""))
            await send(message)

        await self.app(scope, receive, capturing_send)

        if start_msg is None:
            return
        status = int(start_msg.get("status", 200))
        if status not in self.cacheable_statuses:
            return
        full = b"".join(body_parts)
        if len(full) > self.max_body_bytes:
            return
        raw_headers = start_msg.get("headers", [])
        headers_list = [(k.decode("latin-1"), v.decode("latin-1")) for k, v in raw_headers]
        payload_dict = {"status": status, "headers": headers_list, "body_hex": full.hex()}
        payload_str = json.dumps(payload_dict)
        try:
            await self.redis.set(cache_key, payload_str, ex=self.ttl)
        except Exception:
            return

Request

Represents an incoming HTTP request with lazy attribute parsing.

Source code in FasterAPI/request.py
class Request:
    """Represents an incoming HTTP request with lazy attribute parsing."""

    __slots__ = (
        "_scope",
        "_receive",
        "_body",
        "_body_read",
        "_form_cache",
        "_headers",
        "_query_params",
        "_cookies",
        "_max_body_size",
        "_stream_no_buffer",
        "method",
        "path",
        "path_params",
    )

    def __init__(self, scope: dict[str, Any], receive: ASGIApp) -> None:
        self._scope = scope
        self._receive = receive
        self._body: bytes = b""
        self._body_read: bool = False
        self._form_cache: FormData | None = None
        self._headers: dict[str, str] | None = None
        self._query_params: dict[str, Any] | None = None
        self._cookies: dict[str, str] | None = None

        st = scope.setdefault("state", {})
        self._max_body_size = st.get("max_body_size")
        self._stream_no_buffer = bool(st.get("stream_body_no_buffer"))

        self.method: str = scope.get("method", "GET")
        self.path: str = scope.get("path", "/")
        self.path_params: dict[str, str] = scope.get("path_params", {})

    @property
    def state(self) -> dict[str, Any]:
        """Mutable per-request state (ASGI ``scope["state"]``)."""
        return self._scope.setdefault("state", {})

    # ------------------------------------------------------------------
    #  Lazy-parsed properties (only computed when first accessed)
    # ------------------------------------------------------------------

    @property
    def headers(self) -> dict[str, str]:
        h = self._headers
        if h is None:
            raw: list[tuple[bytes, bytes]] = self._scope.get("headers", [])
            h = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in raw}
            self._headers = h
        return h

    @property
    def query_params(self) -> dict[str, Any]:
        qp = self._query_params
        if qp is None:
            qs = self._scope.get("query_string", b"").decode("latin-1")
            if qs:
                parsed = parse_qs(qs, keep_blank_values=True)
                qp = {k: v[0] if len(v) == 1 else v for k, v in parsed.items()}
            else:
                qp = {}
            self._query_params = qp
        return qp

    @property
    def cookies(self) -> dict[str, str]:
        c = self._cookies
        if c is None:
            cookie_header = self.headers.get("cookie", "")
            if cookie_header:
                sc = SimpleCookie()
                sc.load(cookie_header)
                c = {key: morsel.value for key, morsel in sc.items()}
            else:
                c = {}
            self._cookies = c
        return c

    @property
    def content_type(self) -> str | None:
        return self.headers.get("content-type")

    @property
    def client(self) -> tuple[str, int] | None:
        val = self._scope.get("client")
        return (val[0], val[1]) if val is not None else None

    @property
    def body(self) -> bytes:
        return self._body

    # ------------------------------------------------------------------
    #  Body reading
    # ------------------------------------------------------------------

    def _enforce_limit(self, total_so_far: int, chunk_len: int) -> None:
        if self._max_body_size is None:
            return
        if total_so_far + chunk_len > self._max_body_size:
            raise HTTPException(status_code=413, detail="Request entity too large")

    async def stream(self) -> AsyncIterator[bytes]:
        """Yield body chunks from the ASGI ``receive`` channel.

        When ``Faster(stream_request_body=False)`` (default), chunks are also
        concatenated so :meth:`body`, :meth:`json`, and :meth:`form` still work.

        With ``stream_request_body=True``, bytes are **not** retained after
        iteration—use this for large uploads written straight to disk.
        """
        if self._body_read:
            if self._body:
                yield self._body
            return

        buffer_chunks: list[bytes] = []
        total = 0
        try:
            while True:
                message = await self._receive()
                chunk = message.get("body", b"")
                self._enforce_limit(total, len(chunk))
                total += len(chunk)
                if not self._stream_no_buffer and chunk:
                    buffer_chunks.append(chunk)
                if chunk:
                    yield chunk
                if not message.get("more_body", False):
                    break
        finally:
            self._body_read = True
            if not self._stream_no_buffer:
                self._body = b"".join(buffer_chunks)
            else:
                self._body = b""

    async def _read_body(self) -> bytes:
        if self._body_read:
            return self._body
        async for _ in self.stream():
            pass
        return self._body

    async def json(self) -> Any:
        """Read the request body and decode as JSON via msgspec (zero-copy)."""
        raw = await self._read_body()
        return msgspec.json.decode(raw)

    async def form(self) -> FormData:
        """Read the request body and parse as form / multipart data."""
        if self._form_cache is not None:
            return self._form_cache
        ct = self.content_type or ""

        if "multipart/form-data" in ct and self.state.get("stream_multipart"):
            self._form_cache = await self._parse_multipart_stream(ct)
            return self._form_cache

        raw = await self._read_body()
        result = _parse_multipart(raw, ct) if "multipart/form-data" in ct else _parse_urlencoded(raw)
        self._form_cache = result
        return result

    async def _parse_multipart_stream(self, content_type: str) -> FormData:
        """Parse multipart by feeding ASGI chunks into ``MultipartParser`` (no full-body buffer)."""
        _, params = parse_options_header(content_type)
        boundary = params.get(b"boundary", b"")

        header_field = bytearray()
        header_value = bytearray()
        current_headers: dict[str, str] = {}
        current_data = bytearray()
        fields: dict[str, str | UploadFile] = {}
        part_info: dict[str, Any] = {}

        def on_part_begin() -> None:
            header_field.clear()
            header_value.clear()
            current_headers.clear()
            current_data.clear()
            part_info.clear()

        def on_header_field(data: bytes, start: int, end: int) -> None:
            header_field.extend(data[start:end])

        def on_header_value(data: bytes, start: int, end: int) -> None:
            header_value.extend(data[start:end])

        def on_header_end() -> None:
            current_headers[bytes(header_field).decode("latin-1").lower()] = bytes(header_value).decode("latin-1")
            header_field.clear()
            header_value.clear()

        def on_headers_finished() -> None:
            disposition = current_headers.get("content-disposition", "")
            _, disp_params = parse_options_header(disposition)
            part_info["name"] = disp_params.get(b"name", b"").decode("utf-8")
            filename = disp_params.get(b"filename")
            if filename is not None:
                part_info["filename"] = filename.decode("utf-8")
                part_info["content_type"] = current_headers.get(
                    "content-type",
                    "application/octet-stream",
                )
                part_info["headers"] = dict(current_headers)
                part_info["upload"] = UploadFile(
                    filename=part_info["filename"],
                    content_type=part_info["content_type"],
                    headers=part_info["headers"],
                )
                part_info["is_file"] = True
            else:
                part_info["is_file"] = False
                part_info["headers"] = dict(current_headers)

        def on_part_data(data: bytes, start: int, end: int) -> None:
            sl = data[start:end]
            if part_info.get("is_file") and "upload" in part_info:
                part_info["upload"].file.write(sl)
            else:
                current_data.extend(sl)

        def on_part_end() -> None:
            name = part_info.get("name", "")
            if not name:
                return
            if part_info.get("is_file") and "upload" in part_info:
                upload = part_info["upload"]
                upload.file.seek(0, 2)
                upload._size = upload.file.tell()
                upload.file.seek(0)
                fields[name] = upload
            else:
                fields[name] = bytes(current_data).decode("utf-8")

        parser = MultipartParser(
            boundary,
            cast(
                Any,
                {
                    "on_part_begin": on_part_begin,
                    "on_header_field": on_header_field,
                    "on_header_value": on_header_value,
                    "on_header_end": on_header_end,
                    "on_headers_finished": on_headers_finished,
                    "on_part_data": on_part_data,
                    "on_part_end": on_part_end,
                },
            ),
        )

        total = 0
        while True:
            message = await self._receive()
            chunk = message.get("body", b"")
            self._enforce_limit(total, len(chunk))
            total += len(chunk)
            if chunk:
                parser.write(chunk)
            if not message.get("more_body", False):
                break

        parser.finalize()
        self._body_read = True
        self._body = b""
        return FormData(fields)

state property

Mutable per-request state (ASGI scope["state"]).

form() async

Read the request body and parse as form / multipart data.

Source code in FasterAPI/request.py
async def form(self) -> FormData:
    """Read the request body and parse as form / multipart data."""
    if self._form_cache is not None:
        return self._form_cache
    ct = self.content_type or ""

    if "multipart/form-data" in ct and self.state.get("stream_multipart"):
        self._form_cache = await self._parse_multipart_stream(ct)
        return self._form_cache

    raw = await self._read_body()
    result = _parse_multipart(raw, ct) if "multipart/form-data" in ct else _parse_urlencoded(raw)
    self._form_cache = result
    return result

json() async

Read the request body and decode as JSON via msgspec (zero-copy).

Source code in FasterAPI/request.py
async def json(self) -> Any:
    """Read the request body and decode as JSON via msgspec (zero-copy)."""
    raw = await self._read_body()
    return msgspec.json.decode(raw)

stream() async

Yield body chunks from the ASGI receive channel.

When Faster(stream_request_body=False) (default), chunks are also concatenated so :meth:body, :meth:json, and :meth:form still work.

With stream_request_body=True, bytes are not retained after iteration—use this for large uploads written straight to disk.

Source code in FasterAPI/request.py
async def stream(self) -> AsyncIterator[bytes]:
    """Yield body chunks from the ASGI ``receive`` channel.

    When ``Faster(stream_request_body=False)`` (default), chunks are also
    concatenated so :meth:`body`, :meth:`json`, and :meth:`form` still work.

    With ``stream_request_body=True``, bytes are **not** retained after
    iteration—use this for large uploads written straight to disk.
    """
    if self._body_read:
        if self._body:
            yield self._body
        return

    buffer_chunks: list[bytes] = []
    total = 0
    try:
        while True:
            message = await self._receive()
            chunk = message.get("body", b"")
            self._enforce_limit(total, len(chunk))
            total += len(chunk)
            if not self._stream_no_buffer and chunk:
                buffer_chunks.append(chunk)
            if chunk:
                yield chunk
            if not message.get("more_body", False):
                break
    finally:
        self._body_read = True
        if not self._stream_no_buffer:
            self._body = b"".join(buffer_chunks)
        else:
            self._body = b""

RequestIDMiddleware

Bases: BaseHTTPMiddleware

Ensure each request has X-Request-ID (generate or propagate).

The ID is stored in scope["state"]["request_id"] (also available as request.state["request_id"]).

Source code in FasterAPI/production.py
class RequestIDMiddleware(BaseHTTPMiddleware):
    """Ensure each request has ``X-Request-ID`` (generate or propagate).

    The ID is stored in ``scope["state"]["request_id"]`` (also available as
    ``request.state["request_id"]``).
    """

    def __init__(
        self,
        app: ASGIApp,
        *,
        header_name: str = "x-request-id",
        outgoing_header_name: str = "x-request-id",
        generator: Callable[[], str] | None = None,
    ) -> None:
        super().__init__(app)
        self.header_name = header_name.lower()
        self.outgoing_header_name = outgoing_header_name.lower()
        self.generator = generator or (lambda: uuid.uuid4().hex)

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        raw_headers: list[tuple[bytes, bytes]] = scope.get("headers", [])
        incoming = ""
        for k, v in raw_headers:
            if k.decode("latin-1").lower() == self.header_name:
                incoming = v.decode("latin-1").strip()
                break

        rid = incoming or self.generator()
        scope.setdefault("state", {})["request_id"] = rid

        async def send_with_header(message: dict[str, Any]) -> None:
            if message["type"] == "http.response.start":
                headers = list(message.get("headers", []))
                keys = {x[0].lower() for x in headers}
                out_key = self.outgoing_header_name.encode("latin-1")
                if out_key not in keys:
                    headers.append((out_key, rid.encode("latin-1")))
                message = {**message, "headers": headers}
            await send(message)

        await self.app(scope, receive, send_with_header)

RequestValidationError

Bases: Exception

Raised when request data fails validation.

Source code in FasterAPI/exceptions.py
class RequestValidationError(Exception):
    """Raised when request data fails validation."""

    def __init__(self, errors: list[dict[str, Any]]) -> None:
        self.errors = errors

    def __repr__(self) -> str:
        return f"RequestValidationError(errors={self.errors!r})"

Response

Base HTTP response class.

Source code in FasterAPI/response.py
class Response:
    """Base HTTP response class."""

    media_type: str | None = None
    charset: str = "utf-8"

    def __init__(
        self,
        content: Any = None,
        status_code: int = 200,
        headers: dict[str, str] | None = None,
        media_type: str | None = None,
    ) -> None:
        self.status_code = status_code
        self.headers = headers or {}
        if media_type is not None:
            self.media_type = media_type
        self.body = self._render(content)

    def _render(self, content: Any) -> bytes:
        if content is None:
            return b""
        if isinstance(content, bytes):
            return content
        return bytes(content.encode(self.charset))

    def _build_headers(self) -> list[tuple[bytes, bytes]]:
        raw: list[tuple[bytes, bytes]] = []
        if self.media_type is not None:
            ct = self.media_type
            if self.charset and "text" in ct:
                ct = f"{ct}; charset={self.charset}"
            raw.append((b"content-type", ct.encode("latin-1")))
        for key, value in self.headers.items():
            raw.append((key.lower().encode("latin-1"), value.encode("latin-1")))
        return raw

    async def to_asgi(self, send: ASGIApp) -> None:
        """Send the response through the ASGI interface."""
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self._build_headers(),
            }
        )
        await send(
            {
                "type": "http.response.body",
                "body": self.body,
            }
        )

to_asgi(send) async

Send the response through the ASGI interface.

Source code in FasterAPI/response.py
async def to_asgi(self, send: ASGIApp) -> None:
    """Send the response through the ASGI interface."""
    await send(
        {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self._build_headers(),
        }
    )
    await send(
        {
            "type": "http.response.body",
            "body": self.body,
        }
    )

SecurityScopes

Holds the list of OAuth2 security scopes required by a dependency tree.

Source code in FasterAPI/security.py
class SecurityScopes:
    """Holds the list of OAuth2 security scopes required by a dependency tree."""

    __slots__ = ("scopes", "scope_str")

    def __init__(self, scopes: list[str] | None = None) -> None:
        self.scopes: list[str] = scopes or []
        self.scope_str: str = " ".join(self.scopes)

    def __repr__(self) -> str:
        return f"SecurityScopes(scopes={self.scopes!r})"

StaticFiles

Serve static files from a local directory as an ASGI application.

Behaviour aligns with common Starlette patterns: conditional GET (ETag / Last-Modified, 304 Not Modified), Range requests (206 including multipart byte ranges), HEAD without a body, and async chunked reads via anyio.

Source code in FasterAPI/staticfiles.py
class StaticFiles:
    """Serve static files from a local directory as an ASGI application.

    Behaviour aligns with common Starlette patterns: conditional GET (``ETag`` /
    ``Last-Modified``, ``304 Not Modified``), ``Range`` requests (``206`` including
    multipart byte ranges), ``HEAD`` without a body, and async chunked reads via
    ``anyio``.
    """

    def __init__(self, *, directory: str | Path, html: bool = False, check_dir: bool = True) -> None:
        self.directory = Path(directory).resolve()
        self.html = html
        if check_dir and not self.directory.is_dir():
            raise RuntimeError(f"StaticFiles directory '{directory}' does not exist")

    async def __call__(
        self,
        scope: dict[str, Any],
        receive: Any,
        send: Any,
    ) -> None:
        if scope["type"] != "http":
            return
        if scope["method"] not in ("GET", "HEAD"):
            await _send_error(send, 405, _METHOD_NOT_ALLOWED)
            return
        await self._handle(scope, send)

    def _sync_lookup(self, raw_path: str) -> tuple[Path, os.stat_result] | None:
        """Resolve *raw_path* under ``directory``; return path and stat, or ``None``."""
        rel = raw_path.lstrip("/")
        file_path = (self.directory / rel).resolve()
        try:
            file_path.relative_to(self.directory)
        except ValueError:
            return None
        if file_path.is_dir():
            if self.html:
                file_path = file_path / "index.html"
            else:
                return None
        try:
            st = os.stat(file_path)
        except OSError:
            return None
        if not stat_mod.S_ISREG(st.st_mode):
            return None
        return file_path, st

    def _build_full_headers(
        self,
        file_path: Path,
        st: os.stat_result,
        media_type: str,
        encoding: str | None,
    ) -> list[tuple[bytes, bytes]]:
        etag = _etag_from_stat(st)
        lm = _last_modified_http(st)
        headers: list[tuple[bytes, bytes]] = [
            (b"content-type", media_type.encode("latin-1")),
            (b"content-length", str(st.st_size).encode()),
            (b"accept-ranges", b"bytes"),
            (b"etag", etag.encode("latin-1")),
            (b"last-modified", lm.encode("latin-1")),
        ]
        if encoding:
            headers.append((b"content-encoding", encoding.encode("latin-1")))
        return headers

    async def _handle(self, scope: dict[str, Any], send: Any) -> None:
        raw_path: str = scope.get("path", "/")
        looked_up = await anyio.to_thread.run_sync(self._sync_lookup, raw_path)
        if looked_up is None:
            await _send_error(send, 404, _NOT_FOUND)
            return

        file_path, st = looked_up
        media_type, encoding = mimetypes.guess_type(str(file_path))
        if media_type is None:
            media_type = "application/octet-stream"

        etag_http = _etag_from_stat(st)
        lm_http = _last_modified_http(st)
        full_headers = self._build_full_headers(file_path, st, media_type, encoding)

        req = _scope_headers(scope)
        method = scope["method"]

        if _check_not_modified(req, etag_http, lm_http):
            h304 = _headers_for_304(full_headers)
            await send({"type": "http.response.start", "status": 304, "headers": h304})
            await send({"type": "http.response.body", "body": b"", "more_body": False})
            return

        http_range = req.get("range")
        http_if_range = req.get("if-range")
        use_range = http_range is not None and (
            http_if_range is None or _if_range_matches(http_if_range, lm_http, etag_http)
        )

        if use_range:
            try:
                ranges = _parse_range_header(http_range or "", st.st_size)
            except MalformedRangeHeader as exc:
                await _send_plain(send, 400, exc.content.encode("utf-8"))
                return
            except RangeNotSatisfiable as exc:
                await send(
                    {
                        "type": "http.response.start",
                        "status": 416,
                        "headers": [
                            (b"content-type", _CT_PLAIN),
                            (b"content-range", f"bytes */{exc.max_size}".encode("latin-1")),
                        ],
                    }
                )
                await send({"type": "http.response.body", "body": b"", "more_body": False})
                return

            if len(ranges) == 1:
                start, end = ranges[0]
                await self._send_single_range(send, file_path, full_headers, st.st_size, start, end, method)
                return

            await self._send_multipart_ranges(
                send, file_path, full_headers, media_type, st.st_size, ranges, method
            )
            return

        await self._send_full_file(send, file_path, full_headers, method)

    async def _send_full_file(
        self,
        send: Any,
        file_path: Path,
        headers: list[tuple[bytes, bytes]],
        method: str,
    ) -> None:
        await send({"type": "http.response.start", "status": 200, "headers": headers})
        if method == "HEAD":
            await send({"type": "http.response.body", "body": b"", "more_body": False})
            return

        async with await anyio.open_file(file_path, "rb") as file:
            more_body = True
            while more_body:
                chunk = await file.read(_CHUNK_SIZE)
                more_body = len(chunk) == _CHUNK_SIZE
                await send({"type": "http.response.body", "body": chunk, "more_body": more_body})

    async def _send_single_range(
        self,
        send: Any,
        file_path: Path,
        base_headers: list[tuple[bytes, bytes]],
        file_size: int,
        start: int,
        end: int,
        method: str,
    ) -> None:
        """Serve ``bytes [start, end)`` with status 206."""
        rh: list[tuple[bytes, bytes]] = []
        for k, v in base_headers:
            if k.lower() in (b"content-length", b"content-range"):
                continue
            rh.append((k, v))
        rh.append((b"content-range", f"bytes {start}-{end - 1}/{file_size}".encode("latin-1")))
        rh.append((b"content-length", str(end - start).encode()))

        await send({"type": "http.response.start", "status": 206, "headers": rh})

        if method == "HEAD":
            await send({"type": "http.response.body", "body": b"", "more_body": False})
            return

        async with await anyio.open_file(file_path, "rb") as file:
            await file.seek(start)
            pos = start
            while pos < end:
                chunk = await file.read(min(_CHUNK_SIZE, end - pos))
                if not chunk:
                    break
                pos += len(chunk)
                more_body = pos < end
                await send({"type": "http.response.body", "body": chunk, "more_body": more_body})

    async def _send_multipart_ranges(
        self,
        send: Any,
        file_path: Path,
        base_headers: list[tuple[bytes, bytes]],
        media_type: str,
        file_size: int,
        ranges: list[tuple[int, int]],
        method: str,
    ) -> None:
        boundary = token_hex(13)
        mp_len, header_block = _multipart_payload_length_and_headers(boundary, media_type, file_size, ranges)

        out_headers: list[tuple[bytes, bytes]] = []
        for k, v in base_headers:
            if k.lower() in (b"content-length", b"content-type"):
                continue
            out_headers.append((k, v))
        out_headers.append((b"content-type", f"multipart/byteranges; boundary={boundary}".encode("latin-1")))
        out_headers.append((b"content-length", str(mp_len).encode()))

        await send({"type": "http.response.start", "status": 206, "headers": out_headers})

        if method == "HEAD":
            await send({"type": "http.response.body", "body": b"", "more_body": False})
            return

        async with await anyio.open_file(file_path, "rb") as file:
            for part_start, part_end in ranges:
                await send(
                    {
                        "type": "http.response.body",
                        "body": header_block(part_start, part_end),
                        "more_body": True,
                    }
                )
                await file.seek(part_start)
                pos = part_start
                while pos < part_end:
                    chunk = await file.read(min(_CHUNK_SIZE, part_end - pos))
                    if not chunk:
                        break
                    pos += len(chunk)
                    await send({"type": "http.response.body", "body": chunk, "more_body": True})
                await send({"type": "http.response.body", "body": b"\r\n", "more_body": True})

            await send(
                {
                    "type": "http.response.body",
                    "body": f"--{boundary}--".encode("latin-1"),
                    "more_body": False,
                }
            )

StreamingResponse

Response that streams content from an async or sync iterator.

Source code in FasterAPI/response.py
class StreamingResponse:
    """Response that streams content from an async or sync iterator."""

    def __init__(
        self,
        content: AsyncIterator[bytes] | Iterator[bytes],
        status_code: int = 200,
        headers: dict[str, str] | None = None,
        media_type: str | None = None,
    ) -> None:
        self.content = content
        self.status_code = status_code
        self.headers = headers or {}
        self.media_type = media_type

    def _build_headers(self) -> list[tuple[bytes, bytes]]:
        raw: list[tuple[bytes, bytes]] = []
        if self.media_type is not None:
            raw.append((b"content-type", self.media_type.encode("latin-1")))
        for key, value in self.headers.items():
            raw.append((key.lower().encode("latin-1"), value.encode("latin-1")))
        return raw

    async def to_asgi(self, send: ASGIApp) -> None:
        """Stream the response body through the ASGI interface."""
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self._build_headers(),
            }
        )
        if hasattr(self.content, "__aiter__"):
            async for chunk in self.content:
                await send(
                    {
                        "type": "http.response.body",
                        "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                        "more_body": True,
                    }
                )
        else:
            for chunk in self.content:
                await send(
                    {
                        "type": "http.response.body",
                        "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                        "more_body": True,
                    }
                )
        await send({"type": "http.response.body", "body": b"", "more_body": False})

to_asgi(send) async

Stream the response body through the ASGI interface.

Source code in FasterAPI/response.py
async def to_asgi(self, send: ASGIApp) -> None:
    """Stream the response body through the ASGI interface."""
    await send(
        {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self._build_headers(),
        }
    )
    if hasattr(self.content, "__aiter__"):
        async for chunk in self.content:
            await send(
                {
                    "type": "http.response.body",
                    "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                    "more_body": True,
                }
            )
    else:
        for chunk in self.content:
            await send(
                {
                    "type": "http.response.body",
                    "body": chunk if isinstance(chunk, bytes) else chunk.encode(),
                    "more_body": True,
                }
            )
    await send({"type": "http.response.body", "body": b"", "more_body": False})

SubInterpreterPool

Fallback pool using ProcessPoolExecutor.

Provides the same run() / shutdown() API as the sub-interpreter pool. Upgrade to a Python version with the interpreters module for true per-interpreter GIL support.

Source code in FasterAPI/concurrency.py
class SubInterpreterPool:  # type: ignore[no-redef]
    """Fallback pool using ProcessPoolExecutor.

    Provides the same ``run()`` / ``shutdown()`` API as the
    sub-interpreter pool. Upgrade to a Python version with the
    ``interpreters`` module for true per-interpreter GIL support.
    """

    def __init__(self, max_workers: int | None = None) -> None:
        self._executor = ProcessPoolExecutor(
            max_workers=max_workers or _CPU_COUNT,
        )

    async def run(self, func: Callable, *args: Any) -> Any:  # type: ignore[type-arg]
        """Execute *func* in a worker process (pickle-based)."""
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self._executor,
            partial(func, *args),
        )

    def shutdown(self) -> None:
        """Shut down the process pool."""
        self._executor.shutdown(wait=False)

run(func, *args) async

Execute func in a worker process (pickle-based).

Source code in FasterAPI/concurrency.py
async def run(self, func: Callable, *args: Any) -> Any:  # type: ignore[type-arg]
    """Execute *func* in a worker process (pickle-based)."""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        self._executor,
        partial(func, *args),
    )

shutdown()

Shut down the process pool.

Source code in FasterAPI/concurrency.py
def shutdown(self) -> None:
    """Shut down the process pool."""
    self._executor.shutdown(wait=False)

TestClient

Synchronous test client for FasterAPI applications.

Wraps ASGI app with httpx.ASGITransport for HTTP testing. Provides websocket_connect() for WebSocket testing.

Source code in FasterAPI/testclient.py
class TestClient:
    """Synchronous test client for FasterAPI applications.

    Wraps ASGI app with httpx.ASGITransport for HTTP testing.
    Provides websocket_connect() for WebSocket testing.
    """

    __test__ = False  # Prevent pytest collection

    def __init__(
        self,
        app: ASGIApp,
        base_url: str = "http://testserver",
    ) -> None:
        self.app = app
        self.base_url = base_url
        self._transport = httpx.ASGITransport(app=app)
        self._client = httpx.AsyncClient(
            transport=self._transport,
            base_url=base_url,
        )

    def __enter__(self) -> TestClient:
        self._lifespan_startup_done = threading.Event()
        self._lifespan_shutdown_trigger = threading.Event()
        self._lifespan_thread = threading.Thread(target=self._run_lifespan_thread, daemon=True)
        self._lifespan_thread.start()
        self._lifespan_startup_done.wait(timeout=5.0)
        return self

    def __exit__(self, *args: Any) -> None:
        if hasattr(self, "_lifespan_shutdown_trigger"):
            self._lifespan_shutdown_trigger.set()
            self._lifespan_thread.join(timeout=5.0)
        self._run(self._client.aclose())

    def _run_lifespan_thread(self) -> None:
        """Run the app's lifespan protocol in a dedicated thread."""
        startup_done = self._lifespan_startup_done
        shutdown_trigger = self._lifespan_shutdown_trigger

        async def _run() -> None:
            messages: list[dict[str, Any]] = [
                {"type": "lifespan.startup"},
            ]
            idx = [0]

            async def receive() -> dict[str, Any]:
                if idx[0] < len(messages):
                    msg = messages[idx[0]]
                    idx[0] += 1
                    return msg
                # Wait for shutdown trigger
                await asyncio.get_event_loop().run_in_executor(None, shutdown_trigger.wait)
                return {"type": "lifespan.shutdown"}

            async def send(msg: dict[str, Any]) -> None:
                if msg.get("type") == "lifespan.startup.complete":
                    startup_done.set()

            try:
                await self.app({"type": "lifespan"}, receive, send)
            except Exception:
                startup_done.set()

        asyncio.run(_run())

    def _run(self, coro: Any) -> Any:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = None

        if loop and loop.is_running():
            import concurrent.futures

            with concurrent.futures.ThreadPoolExecutor() as pool:
                return pool.submit(asyncio.run, coro).result()
        return asyncio.run(coro)

    # --- HTTP methods ---

    def get(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a GET request."""
        result: httpx.Response = self._run(self._client.get(url, **kwargs))
        return result

    def post(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a POST request."""
        result: httpx.Response = self._run(self._client.post(url, **kwargs))
        return result

    def put(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a PUT request."""
        result: httpx.Response = self._run(self._client.put(url, **kwargs))
        return result

    def delete(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a DELETE request."""
        result: httpx.Response = self._run(self._client.delete(url, **kwargs))
        return result

    def patch(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a PATCH request."""
        result: httpx.Response = self._run(self._client.patch(url, **kwargs))
        return result

    def options(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send an OPTIONS request."""
        result: httpx.Response = self._run(self._client.options(url, **kwargs))
        return result

    def head(self, url: str, **kwargs: Any) -> httpx.Response:
        """Send a HEAD request."""
        result: httpx.Response = self._run(self._client.head(url, **kwargs))
        return result

    # --- WebSocket ---

    @contextmanager
    def websocket_connect(
        self,
        path: str,
        headers: dict[str, str] | None = None,
        query_string: str = "",
    ) -> Generator[_WebSocketSession, None, None]:
        """Context manager for testing WebSocket endpoints."""
        session = _WebSocketSession()

        scope = {
            "type": "websocket",
            "path": path,
            "headers": [(k.lower().encode(), v.encode()) for k, v in (headers or {}).items()],
            "query_string": query_string.encode() if query_string else b"",
            "client": ("testclient", 0),
        }

        async def run_ws() -> None:
            await self.app(scope, session._asgi_receive, session._asgi_send)

        loop = asyncio.new_event_loop()
        task = loop.create_task(run_ws())

        # Drive the loop briefly to let the handler start and accept
        def _step() -> None:
            loop.run_until_complete(asyncio.sleep(0))

        _step()

        try:
            yield session
        finally:
            if not session._closed:
                session.close()
            # Drain remaining events
            for _ in range(100):
                try:
                    loop.run_until_complete(asyncio.wait_for(asyncio.shield(task), timeout=0.05))
                    break
                except (asyncio.TimeoutError, Exception):
                    break
            loop.close()

delete(url, **kwargs)

Send a DELETE request.

Source code in FasterAPI/testclient.py
def delete(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a DELETE request."""
    result: httpx.Response = self._run(self._client.delete(url, **kwargs))
    return result

get(url, **kwargs)

Send a GET request.

Source code in FasterAPI/testclient.py
def get(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a GET request."""
    result: httpx.Response = self._run(self._client.get(url, **kwargs))
    return result

head(url, **kwargs)

Send a HEAD request.

Source code in FasterAPI/testclient.py
def head(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a HEAD request."""
    result: httpx.Response = self._run(self._client.head(url, **kwargs))
    return result

options(url, **kwargs)

Send an OPTIONS request.

Source code in FasterAPI/testclient.py
def options(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send an OPTIONS request."""
    result: httpx.Response = self._run(self._client.options(url, **kwargs))
    return result

patch(url, **kwargs)

Send a PATCH request.

Source code in FasterAPI/testclient.py
def patch(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a PATCH request."""
    result: httpx.Response = self._run(self._client.patch(url, **kwargs))
    return result

post(url, **kwargs)

Send a POST request.

Source code in FasterAPI/testclient.py
def post(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a POST request."""
    result: httpx.Response = self._run(self._client.post(url, **kwargs))
    return result

put(url, **kwargs)

Send a PUT request.

Source code in FasterAPI/testclient.py
def put(self, url: str, **kwargs: Any) -> httpx.Response:
    """Send a PUT request."""
    result: httpx.Response = self._run(self._client.put(url, **kwargs))
    return result

websocket_connect(path, headers=None, query_string='')

Context manager for testing WebSocket endpoints.

Source code in FasterAPI/testclient.py
@contextmanager
def websocket_connect(
    self,
    path: str,
    headers: dict[str, str] | None = None,
    query_string: str = "",
) -> Generator[_WebSocketSession, None, None]:
    """Context manager for testing WebSocket endpoints."""
    session = _WebSocketSession()

    scope = {
        "type": "websocket",
        "path": path,
        "headers": [(k.lower().encode(), v.encode()) for k, v in (headers or {}).items()],
        "query_string": query_string.encode() if query_string else b"",
        "client": ("testclient", 0),
    }

    async def run_ws() -> None:
        await self.app(scope, session._asgi_receive, session._asgi_send)

    loop = asyncio.new_event_loop()
    task = loop.create_task(run_ws())

    # Drive the loop briefly to let the handler start and accept
    def _step() -> None:
        loop.run_until_complete(asyncio.sleep(0))

    _step()

    try:
        yield session
    finally:
        if not session._closed:
            session.close()
        # Drain remaining events
        for _ in range(100):
            try:
                loop.run_until_complete(asyncio.wait_for(asyncio.shield(task), timeout=0.05))
                break
            except (asyncio.TimeoutError, Exception):
                break
        loop.close()

TrustedHostMiddleware

Bases: BaseHTTPMiddleware

Middleware that validates the Host header against a list of allowed hosts.

Source code in FasterAPI/middleware.py
class TrustedHostMiddleware(BaseHTTPMiddleware):
    """Middleware that validates the Host header against a list of allowed hosts."""

    def __init__(
        self,
        app: ASGIApp,
        *,
        allowed_hosts: Sequence[str] = ("*",),
    ) -> None:
        super().__init__(app)
        self.allowed_hosts = set(allowed_hosts)
        self.allow_all = "*" in self.allowed_hosts

    async def dispatch(
        self,
        scope: dict[str, Any],
        receive: ASGIApp,
        send: ASGIApp,
    ) -> None:
        if self.allow_all:
            await self.app(scope, receive, send)
            return

        host = get_server_host(scope) or ""
        if host:
            host = host.split(":")[0]

        if host not in self.allowed_hosts:
            await send(
                {
                    "type": "http.response.start",
                    "status": 400,
                    "headers": [(b"content-type", b"text/plain")],
                }
            )
            await send(
                {
                    "type": "http.response.body",
                    "body": b"Invalid host header",
                }
            )
            return

        await self.app(scope, receive, send)

UploadFile

Represents an uploaded file from a multipart/form-data request.

Uses SpooledTemporaryFile: data stays in memory up to 1 MB, then spills to disk.

Source code in FasterAPI/datastructures.py
class UploadFile:
    """Represents an uploaded file from a multipart/form-data request.

    Uses SpooledTemporaryFile: data stays in memory up to 1 MB, then spills to disk.
    """

    __slots__ = ("filename", "content_type", "file", "headers", "_size")

    def __init__(
        self,
        filename: str,
        content_type: str = "application/octet-stream",
        file: IO[bytes] | None = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        self.filename = filename
        self.content_type = content_type
        self.file: IO[bytes] = file or tempfile.SpooledTemporaryFile(
            max_size=1024 * 1024,  # 1 MB
        )
        self.headers = headers or {}
        self._size: int | None = None

    async def read(self, size: int = -1) -> bytes:
        """Read up to ``size`` bytes from the file (-1 means read all)."""
        return self.file.read(size)

    async def write(self, data: bytes) -> int:
        """Write data to the file and return the number of bytes written."""
        return self.file.write(data)

    async def seek(self, offset: int) -> None:
        """Seek to the given byte offset in the file."""
        self.file.seek(offset)

    async def close(self) -> None:
        """Close the underlying file."""
        self.file.close()

    @property
    def size(self) -> int | None:
        """Return the file size in bytes, or None if unknown."""
        return self._size

    def __repr__(self) -> str:
        return f"UploadFile(filename={self.filename!r}, content_type={self.content_type!r})"

size property

Return the file size in bytes, or None if unknown.

close() async

Close the underlying file.

Source code in FasterAPI/datastructures.py
async def close(self) -> None:
    """Close the underlying file."""
    self.file.close()

read(size=-1) async

Read up to size bytes from the file (-1 means read all).

Source code in FasterAPI/datastructures.py
async def read(self, size: int = -1) -> bytes:
    """Read up to ``size`` bytes from the file (-1 means read all)."""
    return self.file.read(size)

seek(offset) async

Seek to the given byte offset in the file.

Source code in FasterAPI/datastructures.py
async def seek(self, offset: int) -> None:
    """Seek to the given byte offset in the file."""
    self.file.seek(offset)

write(data) async

Write data to the file and return the number of bytes written.

Source code in FasterAPI/datastructures.py
async def write(self, data: bytes) -> int:
    """Write data to the file and return the number of bytes written."""
    return self.file.write(data)

WebSocket

Represents a WebSocket connection.

Source code in FasterAPI/websocket.py
class WebSocket:
    """Represents a WebSocket connection."""

    __slots__ = (
        "_scope",
        "_receive",
        "_send",
        "path",
        "path_params",
        "client",
        "headers",
        "query_params",
        "_state",
    )

    def __init__(self, scope: dict[str, Any], receive: ASGIApp, send: ASGIApp) -> None:
        self._scope = scope
        self._receive = receive
        self._send = send
        self._state = WebSocketState.CONNECTING
        self.path: str = scope.get("path", "/")
        self.path_params: dict[str, str] = scope.get("path_params", {})
        self.client: tuple[str, int] | None = scope.get("client")

        raw_headers: list[tuple[bytes, bytes]] = scope.get("headers", [])
        self.headers: dict[str, str] = {k.decode("latin-1").lower(): v.decode("latin-1") for k, v in raw_headers}

        qs = scope.get("query_string", b"")
        parsed = parse_qs(qs.decode("latin-1") if isinstance(qs, bytes) else qs)
        self.query_params: dict[str, Any] = {k: v[0] if len(v) == 1 else v for k, v in parsed.items()}

    async def accept(self, subprotocol: str | None = None) -> None:
        """Accept the WebSocket connection, optionally selecting a subprotocol."""
        if self._state != WebSocketState.CONNECTING:
            raise RuntimeError("WebSocket is not in CONNECTING state")
        msg: dict[str, Any] = {"type": "websocket.accept"}
        if subprotocol is not None:
            msg["subprotocol"] = subprotocol
        await self._send(msg)
        self._state = WebSocketState.CONNECTED

    async def _receive_message(self) -> dict[str, Any]:
        message: dict[str, Any] = await self._receive()
        if message["type"] == "websocket.disconnect":
            self._state = WebSocketState.DISCONNECTED
            raise WebSocketDisconnect(message.get("code", 1000))
        return message

    async def receive_text(self) -> str:
        """Receive a text message from the WebSocket."""
        message = await self._receive_message()
        return str(message.get("text", ""))

    async def receive_bytes(self) -> bytes:
        """Receive a binary message from the WebSocket."""
        message = await self._receive_message()
        result = message.get("bytes", b"")
        return bytes(result) if not isinstance(result, bytes) else result

    async def receive_json(self) -> Any:
        """Receive a message from the WebSocket and parse it as JSON."""
        text = await self.receive_text()
        return msgspec.json.decode(text.encode())

    async def send_text(self, data: str) -> None:
        """Send a text message through the WebSocket."""
        await self._send({"type": "websocket.send", "text": data})

    async def send_bytes(self, data: bytes) -> None:
        """Send a binary message through the WebSocket."""
        await self._send({"type": "websocket.send", "bytes": data})

    async def send_json(self, data: Any) -> None:
        """Send data as a JSON-encoded text message through the WebSocket."""
        encoded = msgspec.json.encode(data)
        await self.send_text(encoded.decode())

    async def close(self, code: int = 1000, reason: str = "") -> None:
        """Close the WebSocket connection."""
        await self._send({"type": "websocket.close", "code": code, "reason": reason})
        self._state = WebSocketState.DISCONNECTED

accept(subprotocol=None) async

Accept the WebSocket connection, optionally selecting a subprotocol.

Source code in FasterAPI/websocket.py
async def accept(self, subprotocol: str | None = None) -> None:
    """Accept the WebSocket connection, optionally selecting a subprotocol."""
    if self._state != WebSocketState.CONNECTING:
        raise RuntimeError("WebSocket is not in CONNECTING state")
    msg: dict[str, Any] = {"type": "websocket.accept"}
    if subprotocol is not None:
        msg["subprotocol"] = subprotocol
    await self._send(msg)
    self._state = WebSocketState.CONNECTED

close(code=1000, reason='') async

Close the WebSocket connection.

Source code in FasterAPI/websocket.py
async def close(self, code: int = 1000, reason: str = "") -> None:
    """Close the WebSocket connection."""
    await self._send({"type": "websocket.close", "code": code, "reason": reason})
    self._state = WebSocketState.DISCONNECTED

receive_bytes() async

Receive a binary message from the WebSocket.

Source code in FasterAPI/websocket.py
async def receive_bytes(self) -> bytes:
    """Receive a binary message from the WebSocket."""
    message = await self._receive_message()
    result = message.get("bytes", b"")
    return bytes(result) if not isinstance(result, bytes) else result

receive_json() async

Receive a message from the WebSocket and parse it as JSON.

Source code in FasterAPI/websocket.py
async def receive_json(self) -> Any:
    """Receive a message from the WebSocket and parse it as JSON."""
    text = await self.receive_text()
    return msgspec.json.decode(text.encode())

receive_text() async

Receive a text message from the WebSocket.

Source code in FasterAPI/websocket.py
async def receive_text(self) -> str:
    """Receive a text message from the WebSocket."""
    message = await self._receive_message()
    return str(message.get("text", ""))

send_bytes(data) async

Send a binary message through the WebSocket.

Source code in FasterAPI/websocket.py
async def send_bytes(self, data: bytes) -> None:
    """Send a binary message through the WebSocket."""
    await self._send({"type": "websocket.send", "bytes": data})

send_json(data) async

Send data as a JSON-encoded text message through the WebSocket.

Source code in FasterAPI/websocket.py
async def send_json(self, data: Any) -> None:
    """Send data as a JSON-encoded text message through the WebSocket."""
    encoded = msgspec.json.encode(data)
    await self.send_text(encoded.decode())

send_text(data) async

Send a text message through the WebSocket.

Source code in FasterAPI/websocket.py
async def send_text(self, data: str) -> None:
    """Send a text message through the WebSocket."""
    await self._send({"type": "websocket.send", "text": data})

WebSocketDisconnect

Bases: Exception

Raised when a WebSocket connection is disconnected.

Source code in FasterAPI/websocket.py
class WebSocketDisconnect(Exception):
    """Raised when a WebSocket connection is disconnected."""

    def __init__(self, code: int = 1000) -> None:
        self.code = code

WebSocketState

Enumeration of WebSocket connection states.

Source code in FasterAPI/websocket.py
class WebSocketState:
    """Enumeration of WebSocket connection states."""

    CONNECTING = 0
    CONNECTED = 1
    DISCONNECTED = 2

async_engine_from_url_optional(url, **kwargs)

Create an async engine if SQLAlchemy is installed; raise ImportError otherwise.

Source code in FasterAPI/sqlalchemy_ext.py
def async_engine_from_url_optional(url: str, **kwargs: Any) -> Any:
    """Create an async engine if SQLAlchemy is installed; raise ``ImportError`` otherwise."""
    try:
        from sqlalchemy.ext.asyncio import create_async_engine
    except ImportError as exc:
        raise ImportError(
            "SQLAlchemy is required. Install with: pip install 'sqlalchemy[asyncio]'",
        ) from exc
    return create_async_engine(url, **kwargs)

configure_structlog(*, json_format=False, log_level='INFO')

Configure structlog for JSON or console output.

Requires pip install structlog. Safe to call once at process startup.

Bind fields per request (often request_id from middleware) via::

import structlog
structlog.contextvars.bind_contextvars(request_id=...)
Source code in FasterAPI/log_config.py
def configure_structlog(
    *,
    json_format: bool = False,
    log_level: str = "INFO",
) -> Any:
    """Configure ``structlog`` for JSON or console output.

    Requires ``pip install structlog``. Safe to call once at process startup.

    Bind fields per request (often ``request_id`` from middleware) via::

        import structlog
        structlog.contextvars.bind_contextvars(request_id=...)
    """
    if structlog_module is None:
        raise ImportError("structlog is not installed. Install with: pip install structlog")

    lvl = getattr(logging_module, log_level.upper(), logging_module.INFO)
    if not isinstance(lvl, int):
        lvl = logging_module.INFO

    processors: list[Any] = [
        structlog_module.contextvars.merge_contextvars,
        structlog_module.processors.add_log_level,
        structlog_module.processors.StackInfoRenderer(),
        structlog_module.processors.format_exc_info,
    ]
    if json_format:
        processors.append(structlog_module.processors.TimeStamper(fmt="iso", utc=True))
        processors.append(structlog_module.processors.JSONRenderer())
    else:
        processors.append(structlog_module.dev.ConsoleRenderer())

    structlog_module.configure(
        processors=processors,
        wrapper_class=structlog_module.make_filtering_bound_logger(lvl),
        context_class=dict,
        logger_factory=structlog_module.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )
    return structlog_module

create_access_token(subject, secret, *, algorithm='HS256', expires_delta=None, expires_minutes=60, audience=None, issuer=None, extra_claims=None)

Create a signed JWT string (sub claim from subject or merge subject dict).

Source code in FasterAPI/jwt_auth.py
def create_access_token(
    subject: str | dict[str, Any],
    secret: str,
    *,
    algorithm: str = "HS256",
    expires_delta: timedelta | None = None,
    expires_minutes: int = 60,
    audience: str | None = None,
    issuer: str | None = None,
    extra_claims: dict[str, Any] | None = None,
) -> str:
    """Create a signed JWT string (``sub`` claim from *subject* or merge *subject* dict)."""
    from datetime import datetime, timezone

    jwt = _jwt_module()
    now = datetime.now(timezone.utc)
    exp = now + (expires_delta or timedelta(minutes=expires_minutes))

    if isinstance(subject, str):
        payload: dict[str, Any] = {"sub": subject}
    else:
        payload = dict(subject)
    payload["exp"] = exp
    payload.setdefault("iat", now)
    if audience is not None:
        payload["aud"] = audience
    if issuer is not None:
        payload["iss"] = issuer
    if extra_claims:
        payload.update(extra_claims)

    return jwt.encode(payload, secret, algorithm=algorithm)

get_header(scope, name)

Case-insensitive header lookup from ASGI scope["headers"].

Source code in FasterAPI/asgi_compat.py
def get_header(scope: dict[str, Any], name: str) -> str | None:
    """Case-insensitive header lookup from ASGI ``scope[\"headers\"]``."""
    want = name.lower().encode("latin-1")
    for k, v in scope.get("headers", []):
        if k.lower() == want:
            return v.decode("latin-1")
    return None

get_server_host(scope)

Hostname for virtual hosting: Host or :authority, then server tuple.

Source code in FasterAPI/asgi_compat.py
def get_server_host(scope: dict[str, Any]) -> str | None:
    """Hostname for virtual hosting: ``Host`` or ``:authority``, then ``server`` tuple."""
    for k, v in scope.get("headers", []):
        lk = k.lower()
        if lk in (b"host", b":authority"):
            return v.decode("latin-1").split(":")[0]
    server = scope.get("server")
    if isinstance(server, tuple | list) and len(server) >= 1:
        return str(server[0])
    return None

http_version(scope)

Return HTTP version string from scope (e.g. "1.1", "2").

Source code in FasterAPI/asgi_compat.py
def http_version(scope: dict[str, Any]) -> str:
    """Return HTTP version string from scope (e.g. ``\"1.1\"``, ``\"2\"``)."""
    ver = scope.get("http_version")
    if isinstance(ver, str):
        return ver
    return "1.1"

is_http2(scope)

True when the ASGI server reports HTTP/2 for this connection.

Source code in FasterAPI/asgi_compat.py
def is_http2(scope: dict[str, Any]) -> bool:
    """True when the ASGI server reports HTTP/2 for this connection."""
    return http_version(scope).startswith("2")

oauth2_access_token_json(access_token, *, token_type='bearer', expires_in=None)

JSON body shape for POST /token OAuth2 password / client responses (RFC 6749 §5.1).

Source code in FasterAPI/jwt_auth.py
def oauth2_access_token_json(
    access_token: str,
    *,
    token_type: str = "bearer",
    expires_in: int | None = None,
) -> dict[str, str | int]:
    """JSON body shape for ``POST /token`` OAuth2 password / client responses (RFC 6749 §5.1)."""
    body: dict[str, str | int] = {"access_token": access_token, "token_type": token_type}
    if expires_in is not None:
        body["expires_in"] = expires_in
    return body

oauth2_password_token_response(form, *, secret, authenticate, expires_minutes=60) async

Validate credentials via authenticate and return an OAuth2-style token JSON dict.

Typical handler::

@app.post("/token")
async def token(form: OAuth2PasswordRequestForm = Depends(OAuth2PasswordRequestForm)):
    return await oauth2_password_token_response(form, secret=SECRET, authenticate=verify_user)

authenticate should return a subject string (e.g. user id) or None if invalid.

Source code in FasterAPI/jwt_auth.py
async def oauth2_password_token_response(
    form: OAuth2PasswordRequestForm,
    *,
    secret: str,
    authenticate: Callable[[str, str], Awaitable[str | None]],
    expires_minutes: int = 60,
) -> dict[str, str | int]:
    """Validate credentials via *authenticate* and return an OAuth2-style token JSON dict.

    Typical handler::

        @app.post(\"/token\")
        async def token(form: OAuth2PasswordRequestForm = Depends(OAuth2PasswordRequestForm)):
            return await oauth2_password_token_response(form, secret=SECRET, authenticate=verify_user)

    *authenticate* should return a subject string (e.g. user id) or ``None`` if invalid.
    """
    uid = await authenticate(form.username, form.password)
    if uid is None:
        raise HTTPException(status_code=401, detail="Incorrect username or password")
    token = create_access_token(uid, secret, expires_minutes=expires_minutes)
    return oauth2_access_token_json(token, expires_in=expires_minutes * 60)

run_in_subinterpreter(func, *args) async

Execute func with maximum available parallelism.

Python 3.13+: Runs in a sub-interpreter with its own GIL. True parallel execution, no pickling, ~100x lighter than a process.

Python 3.11–3.12: Falls back to ProcessPoolExecutor. Arguments must be picklable. Still achieves parallelism via multiprocessing.

Python < 3.11: Same as 3.11 fallback with older asyncio internals.

Usage::

result = await run_in_subinterpreter(heavy_computation, n)
Source code in FasterAPI/concurrency.py
async def run_in_subinterpreter(func: Callable, *args: Any) -> Any:  # type: ignore[type-arg]
    """Execute *func* with maximum available parallelism.

    **Python 3.13+**: Runs in a sub-interpreter with its own GIL.
    True parallel execution, no pickling, ~100x lighter than a process.

    **Python 3.11–3.12**: Falls back to ``ProcessPoolExecutor``.
    Arguments must be picklable. Still achieves parallelism via
    multiprocessing.

    **Python < 3.11**: Same as 3.11 fallback with older asyncio internals.

    Usage::

        result = await run_in_subinterpreter(heavy_computation, n)
    """
    pool = _get_subinterp_pool()
    return await pool.run(func, *args)

sqlalchemy_session_dependency(session_factory)

Return an async dependency that yields one :class:sqlalchemy.ext.asyncio.AsyncSession.

The factory must be an :class:async_sessionmaker (or any callable returning an object usable as async with session_factory() as session:.

Usage::

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from FasterAPI import Depends, Faster
from FasterAPI.sqlalchemy_ext import sqlalchemy_session_dependency

engine = create_async_engine("postgresql+asyncpg://...", echo=False)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
get_session = sqlalchemy_session_dependency(SessionLocal)

app = Faster()

@app.get("/rows")
async def rows(session: AsyncSession = Depends(get_session)):
    ...

Install: pip install faster-api-web[ecosystem] or sqlalchemy[asyncio].

Source code in FasterAPI/sqlalchemy_ext.py
def sqlalchemy_session_dependency(session_factory: Any) -> Callable[..., AsyncGenerator[Any, None]]:
    """Return an async dependency that yields one :class:`sqlalchemy.ext.asyncio.AsyncSession`.

    The factory must be an :class:`async_sessionmaker` (or any callable returning
    an object usable as ``async with session_factory() as session:``.

    Usage::

        from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
        from FasterAPI import Depends, Faster
        from FasterAPI.sqlalchemy_ext import sqlalchemy_session_dependency

        engine = create_async_engine(\"postgresql+asyncpg://...\", echo=False)
        SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
        get_session = sqlalchemy_session_dependency(SessionLocal)

        app = Faster()

        @app.get(\"/rows\")
        async def rows(session: AsyncSession = Depends(get_session)):
            ...

    Install: ``pip install faster-api-web[ecosystem]`` or ``sqlalchemy[asyncio]``.
    """
    if session_factory is None:
        raise ValueError("session_factory is required")

    async def get_session() -> AsyncGenerator[Any, None]:
        async with session_factory() as session:
            yield session

    return get_session