Skip to content

Strategies

Token strategies validate or issue credentials and pair with transports inside an AuthenticationBackend (see Backends: transports and strategies). Three concrete implementations ship with litestar-auth:

  • JWTStrategy issues and verifies stateless signed JWTs with your configured signing keys. Library-issued access tokens include JOSE typ=JWT; decode rejects tokens with a missing or unexpected typ header before the normal signed validation. This header check is defense-in-depth against token-class confusion and does not replace signature, algorithm allowlist, audience, issuer, or required-claim validation. Use this strategy when you want bearer or cookie flows without storing each access token in a database or Redis—scaling and rotation are typically driven by expiry and refresh semantics rather than per-token rows. Use JWTStrategyConfig(...) when you want the signing, validation, revocation, lifetime, and session-fingerprint settings carried as one typed object.

  • DatabaseTokenStrategy stores opaque tokens in your application database (hashed at rest). Use it when you need durable revocation, per-token metadata, refresh-session/device listing, or audit trails aligned with your ORM models. Use DatabaseTokenStrategyConfig(...) when the session, token models, token hash secret, access lifetime, refresh lifetime, and token-size settings should travel together.

  • RedisTokenStrategy keeps opaque token state in Redis with TTL-backed keys and a per-user token index. Use it when you want fast invalidation and shared token state across app instances without adding DB round-trips for every validation. invalidate_all_tokens(...) deletes only tokens present in that per-user index; orphaned keys from older deployments that never wrote the index are left to expire by their Redis TTL instead of being discovered by a keyspace scan. The per-user index key hashes the serialized user id before adding it to the Redis key, so custom id values cannot inject key delimiters or reshape the namespace. You can construct it with RedisTokenStrategyConfig(...) when you want the Redis client, hash secret, TTL, key prefix, token byte count, and optional subject decoder carried as one typed settings object.

  • ApiKeyStrategy verifies user-owned API keys against a BaseApiKeyStore. Configure it with ApiKeyStrategyConfig or equivalent keyword arguments. Bearer keys compare the presented secret with the stored HMAC digest. Signing-required keys use the encrypted stored secret to verify LSA1-HMAC-SHA256 normalized request signatures, validate X-Auth-Date within signing_skew_seconds, and reject replayed X-Auth-Nonce values through an ApiKeyNonceStore. Successful reads return ApiKeyAuthenticationResult with the resolved user and ApiKeyContext; middleware exposes that context as request.auth.

For plugin-oriented setup, DatabaseTokenAuthConfig on LitestarAuthConfig is the direct shortcut for wiring opaque database-backed tokens (hash secret, optional backend naming, and related compatibility flags) without hand-assembling the strategy and related pieces in isolation. Full wiring for the preset, route flags, and related options is covered in Backends; ORM mixins, token tables, and SQLAlchemyUserDatabase contracts are covered in User and manager.

Refresh-session management support

The session/device HTTP API is backed by a strategy protocol rather than by controller-side database queries. DatabaseTokenStrategy implements that protocol and can:

  • list the authenticated user's active, non-expired refresh sessions;
  • revoke one current-user session by public session_id;
  • revoke all other current-user sessions, preserving the current session when the current refresh credential can be identified;
  • identify a public session_id from a raw refresh token by hashing the supplied value and comparing it with stored digests.

JWTStrategy and RedisTokenStrategy do not currently provide the session/device dashboard contract. If plugin-owned session/device routes are enabled against an unsupported strategy, the route returns 400 with SESSION_MANAGEMENT_UNSUPPORTED; it does not synthesize empty session data. The API never returns raw tokens, access tokens, refresh tokens, stored token digests, or keyed token digests.

litestar_auth.authentication.strategy

Issue, validate, rotate, and revoke tokens (JWT, database, or Redis).

Strategies pair with :mod:litestar_auth.authentication.transport implementations inside :class:~litestar_auth.authentication.backend.AuthenticationBackend.

DatabaseTokenModels is the explicit contract for DatabaseTokenStrategy when you swap in mixin-composed token ORM classes. The explicit bundled-token bootstrap helper lives at litestar_auth.models.import_token_orm_models().

ApiKeyContext(key_id, scopes, prefix_env, scope_subset_check=True, scope_authority=None) dataclass

Authentication context exposed as request.auth for API-key requests.

ApiKeyNonceStore

Bases: Protocol

Persistence contract for API-key signing nonces.

mark_used(*, key_id, nonce, ttl_seconds) async

Record nonce for key_id if it was not already seen.

Source code in litestar_auth/authentication/strategy/_api_key_nonce_store.py
async def mark_used(self, *, key_id: str, nonce: str, ttl_seconds: int) -> ApiKeyNonceStoreResult:
    """Record ``nonce`` for ``key_id`` if it was not already seen."""

ApiKeyNonceStoreResult(stored, rejected_as_replay=False) dataclass

Outcome of recording a request-signing nonce.

ApiKeyStrategy(*, config=None, **options)

ApiKeyStrategy(*, config: ApiKeyStrategyConfig)
ApiKeyStrategy(**options: Unpack[ApiKeyStrategyOptions])

Bases: Strategy[UP, ID]

Verify API-key credentials against indexed persisted key rows.

Initialize the API-key strategy.

Raises:

Type Description
ValueError

If config and keyword options are combined.

ConfigurationError

If api_key_hash_secret is not production-safe.

Source code in litestar_auth/authentication/strategy/api_key.py
def __init__(
    self,
    *,
    config: ApiKeyStrategyConfig | None = None,
    **options: Unpack[ApiKeyStrategyOptions],
) -> None:
    """Initialize the API-key strategy.

    Raises:
        ValueError: If ``config`` and keyword options are combined.
        ConfigurationError: If ``api_key_hash_secret`` is not production-safe.
    """
    if config is not None and options:
        msg = "Pass either ApiKeyStrategyConfig or keyword options, not both."
        raise ValueError(msg)
    settings = ApiKeyStrategyConfig(**options) if config is None else config
    try:
        validate_production_secret(
            settings.api_key_hash_secret,
            label="ApiKeyStrategy api_key_hash_secret",
            unsafe_testing=settings.unsafe_testing,
        )
    except ConfigurationError as exc:
        raise ConfigurationError(str(exc)) from exc

    self.api_key_store = settings.api_key_store
    self._api_key_hash_secret = settings.api_key_hash_secret.encode()
    self.prefix_env = settings.prefix_env
    self.prefix = settings.prefix
    self.scope_subset_check = settings.scope_subset_check
    self.scope_authority = settings.scope_authority
    self.signing_skew_seconds = settings.signing_skew_seconds
    self.nonce_store = settings.nonce_store
    self.secret_encryption_keyring = settings.secret_encryption_keyring
    self.unsafe_testing = settings.unsafe_testing

classify_failure_code(token) async

Return the most specific API-key authentication failure code for token.

Source code in litestar_auth/authentication/strategy/api_key.py
async def classify_failure_code(self, token: str | None) -> ErrorCode:  # noqa: PLR0911
    """Return the most specific API-key authentication failure code for ``token``."""
    if token == API_KEY_HMAC_SCHEME:
        return await self._classify_signed_failure_code()
    if token is None:
        return ErrorCode.API_KEY_INVALID
    parsed = parse_api_key(token, expected_prefix_env=self.prefix_env, prefix=self.prefix)
    if parsed is None:
        return ErrorCode.API_KEY_INVALID
    api_key = await self.api_key_store.get_by_key_id(parsed.key_id, include_inactive=True)
    if api_key is None or api_key.prefix_env != parsed.prefix_env:
        return ErrorCode.API_KEY_INVALID
    if getattr(api_key, "signing_required", False):
        return ErrorCode.API_KEY_SIGNATURE_INVALID
    if api_key.revoked_at is not None:
        return ErrorCode.API_KEY_REVOKED
    if _expires_in_past(api_key.expires_at):
        return ErrorCode.API_KEY_EXPIRED
    if not api_key_secret_matches(
        stored_digest=api_key.hashed_secret,
        api_key_hash_secret=self._api_key_hash_secret,
        secret=parsed.secret,
    ):
        return ErrorCode.API_KEY_INVALID
    return ErrorCode.API_KEY_INVALID

destroy_token(token, user) async

Do nothing because API-key revocation is handled by API-key management flows.

Source code in litestar_auth/authentication/strategy/api_key.py
@override
async def destroy_token(self, token: str, user: UP) -> None:
    """Do nothing because API-key revocation is handled by API-key management flows."""
    del token, user

read_token(token, user_manager) async

Resolve a user from an API-key token.

Returns:

Type Description
UP | None

Resolved user, or None when verification fails.

Source code in litestar_auth/authentication/strategy/api_key.py
@override
async def read_token(self, token: str | None, user_manager: object) -> UP | None:
    """Resolve a user from an API-key token.

    Returns:
        Resolved user, or ``None`` when verification fails.
    """
    result = await self.read_token_with_context(
        token,
        user_manager=cast("UserManagerProtocol[UP, ID]", user_manager),
    )
    return None if result is None else result.user

read_token_with_context(token, user_manager) async

Resolve a user and API-key context from a canonical API-key token.

Returns:

Type Description
ApiKeyAuthenticationResult[UP] | None

Resolved user and API-key context, or None when verification fails.

Source code in litestar_auth/authentication/strategy/api_key.py
async def read_token_with_context(  # noqa: PLR0911
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> ApiKeyAuthenticationResult[UP] | None:
    """Resolve a user and API-key context from a canonical API-key token.

    Returns:
        Resolved user and API-key context, or ``None`` when verification fails.
    """
    if token is None:
        return None
    if token == API_KEY_HMAC_SCHEME:
        return await self._read_signed_request(user_manager)

    parsed = parse_api_key(token, expected_prefix_env=self.prefix_env, prefix=self.prefix)
    if parsed is None:
        return None

    api_key = await self.api_key_store.get_by_key_id(parsed.key_id, include_inactive=True)
    if api_key is None or not self._api_key_can_be_verified(api_key, prefix_env=parsed.prefix_env):
        return None
    if getattr(api_key, "signing_required", False):
        return None
    if not api_key_secret_matches(
        stored_digest=api_key.hashed_secret,
        api_key_hash_secret=self._api_key_hash_secret,
        secret=parsed.secret,
    ):
        return None

    user = await user_manager.get(cast("ID", api_key.user_id))
    if user is None:
        return None
    return ApiKeyAuthenticationResult(
        user=user,
        context=ApiKeyContext(
            key_id=api_key.key_id,
            scopes=tuple(api_key.scopes),
            prefix_env=api_key.prefix_env,
            scope_subset_check=self.scope_subset_check,
            scope_authority=self.scope_authority,
        ),
    )

write_token(user) async

Reject login-token issuance because API keys are manager-issued credentials.

Raises:

Type Description
TokenError

Always, because API keys are not login-flow tokens.

Source code in litestar_auth/authentication/strategy/api_key.py
@override
async def write_token(self, user: UP) -> str:
    """Reject login-token issuance because API keys are manager-issued credentials.

    Raises:
        TokenError: Always, because API keys are not login-flow tokens.
    """
    del user
    msg = "ApiKeyStrategy does not issue login tokens."
    raise TokenError(msg)

ApiKeyStrategyConfig(api_key_store, api_key_hash_secret, prefix_env=None, prefix=API_KEY_PREFIX, scope_subset_check=True, scope_authority=None, signing_skew_seconds=300, nonce_store=None, secret_encryption_keyring=None, unsafe_testing=False) dataclass

Configuration for :class:ApiKeyStrategy.

ContextualStrategy

Bases: Protocol

Protocol for strategies that return custom request auth context.

read_token_with_context(token, user_manager) async

Resolve a user plus strategy-specific authentication context.

Source code in litestar_auth/authentication/strategy/base.py
async def read_token_with_context(
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> AuthT | None:
    """Resolve a user plus strategy-specific authentication context."""

DatabaseTokenModels(access_token_model=AccessToken, refresh_token_model=RefreshToken) dataclass

Explicit access-token and refresh-token ORM contract for DatabaseTokenStrategy.

The supplied access-token model must expose mapped token, created_at, user_id, and user attributes compatible with the persistence operations performed by the DB token strategy. The supplied refresh-token model must also expose session_id, last_used_at, and client_metadata so DB-backed refresh sessions have a non-sensitive public session identifier and bounded client metadata. Defaults preserve the bundled AccessToken / RefreshToken behavior.

__post_init__()

Validate the supplied token-model classes eagerly.

Source code in litestar_auth/authentication/strategy/db_models.py
def __post_init__(self) -> None:
    """Validate the supplied token-model classes eagerly."""
    _validate_token_model_contract(
        self.access_token_model,
        field_name="access_token_model",
        required_attributes=_REQUIRED_ACCESS_TOKEN_MODEL_ATTRIBUTES,
    )
    _validate_token_model_contract(
        self.refresh_token_model,
        field_name="refresh_token_model",
        required_attributes=_REQUIRED_REFRESH_TOKEN_MODEL_ATTRIBUTES,
    )

DatabaseTokenStrategy(*, config=None, **options)

DatabaseTokenStrategy(*, config: DatabaseTokenStrategyConfig)
DatabaseTokenStrategy(**options: Unpack[DatabaseTokenStrategyOptions])

Bases: Strategy[UP, ID], RefreshableStrategy[UP, ID]

Stateful strategy that persists opaque tokens in the database.

Initialize the strategy.

Parameters:

Name Type Description Default
config DatabaseTokenStrategyConfig | None

Database-token strategy configuration.

None
**options Unpack[DatabaseTokenStrategyOptions]

Individual database-token strategy settings. Do not combine with config.

{}

Raises:

Type Description
ValueError

If config and keyword options are combined.

ConfigurationError

When token_hash_secret fails minimum-length requirements.

Source code in litestar_auth/authentication/strategy/db.py
def __init__(
    self,
    *,
    config: DatabaseTokenStrategyConfig | None = None,
    **options: Unpack[DatabaseTokenStrategyOptions],
) -> None:
    """Initialize the strategy.

    Args:
        config: Database-token strategy configuration.
        **options: Individual database-token strategy settings. Do not combine
            with ``config``.

    Raises:
        ValueError: If ``config`` and keyword options are combined.
        ConfigurationError: When ``token_hash_secret`` fails minimum-length requirements.
    """
    if config is not None and options:
        msg = "Pass either DatabaseTokenStrategyConfig or keyword options, not both."
        raise ValueError(msg)
    settings = DatabaseTokenStrategyConfig(**options) if config is None else config
    try:
        validate_production_secret(settings.token_hash_secret, label="DatabaseTokenStrategy token_hash_secret")
    except ConfigurationError as exc:
        raise ConfigurationError(str(exc)) from exc
    validate_token_bytes(settings.token_bytes, label="DatabaseTokenStrategy")

    self.session = settings.session
    self._token_hash_secret = settings.token_hash_secret.encode()
    self.token_models = DatabaseTokenModels() if settings.token_models is None else settings.token_models
    self.access_token_model = self.token_models.access_token_model
    self.refresh_token_model = self.token_models.refresh_token_model
    self._access_token_repository_type = build_token_repository(self.access_token_model)
    self._refresh_token_repository_type = build_token_repository(self.refresh_token_model)
    self.max_age = settings.max_age
    self.refresh_max_age = settings.refresh_max_age
    self.token_bytes = settings.token_bytes
    self.unsafe_testing = settings.unsafe_testing
    self._refresh_token_request_metadata: dict[str, str] | None = None

cleanup_expired_tokens(session) async

Delete expired access and refresh tokens for the configured TTLs.

Returns:

Type Description
int

Total number of deleted access-token and refresh-token rows.

Source code in litestar_auth/authentication/strategy/db.py
async def cleanup_expired_tokens(self, session: AsyncSession) -> int:
    """Delete expired access and refresh tokens for the configured TTLs.

    Returns:
        Total number of deleted access-token and refresh-token rows.
    """
    now = datetime.now(tz=UTC)
    access_cutoff = now - self.max_age
    refresh_cutoff = now - self.refresh_max_age

    access_result = await session.execute(
        delete(self.access_token_model).where(self.access_token_model.created_at <= access_cutoff),
    )
    refresh_result = await session.execute(
        delete(self.refresh_token_model).where(self.refresh_token_model.created_at <= refresh_cutoff),
    )
    await session.commit()

    access_rowcount = getattr(access_result, "rowcount", 0) or 0
    refresh_rowcount = getattr(refresh_result, "rowcount", 0) or 0
    return access_rowcount + refresh_rowcount

destroy_token(token, user) async

Delete a persisted token.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def destroy_token(self, token: str, user: UP) -> None:
    """Delete a persisted token."""
    del user
    token_digest = self._token_digest(token)
    await self._repository(self._access_token_repository_type).delete_where(token=token_digest, auto_commit=False)
    await self.session.commit()

has_recent_totp_verification(user, session_id) async

Return whether a DB-backed refresh session has a live TOTP step-up marker.

Source code in litestar_auth/authentication/strategy/db.py
async def has_recent_totp_verification(self, user: UP, session_id: str) -> bool:
    """Return whether a DB-backed refresh session has a live TOTP step-up marker."""
    row = await self._load_refresh_session_row(user, session_id)
    if row is None:
        return False
    metadata = row.client_metadata or {}
    try:
        expires_at = float(metadata.get(_CLIENT_METADATA_TOTP_STEPUP_EXPIRES_AT_KEY, ""))
    except ValueError:
        return False
    if expires_at > datetime.now(tz=UTC).timestamp():
        return True
    metadata = dict(metadata)
    metadata.pop(_CLIENT_METADATA_TOTP_STEPUP_EXPIRES_AT_KEY, None)
    row.client_metadata = metadata or None
    await self.session.commit()
    return False

identify_refresh_session(user, refresh_token) async

Return the public refresh-session id for refresh_token when it belongs to user.

Returns:

Type Description
str | None

Public refresh-session id, or None when the token is missing, expired, or not owned by user.

Source code in litestar_auth/authentication/strategy/db.py
async def identify_refresh_session(self, user: UP, refresh_token: str) -> str | None:
    """Return the public refresh-session id for ``refresh_token`` when it belongs to ``user``.

    Returns:
        Public refresh-session id, or ``None`` when the token is missing, expired, or not owned by ``user``.
    """
    persisted_token = cast(
        "_RefreshTokenRow | None",
        await self._resolve_token(
            self._repository(self._refresh_token_repository_type),
            refresh_token,
            load=[],
        ),
    )
    if persisted_token is None or persisted_token.user_id != user.id:
        return None
    if self._is_token_expired(persisted_token.created_at, self.refresh_max_age):
        await self._delete_refresh_token_row(persisted_token)
        await self.session.commit()
        return None
    return persisted_token.session_id

invalidate_all_tokens(user) async

Delete all persisted access and refresh tokens for the given user.

Source code in litestar_auth/authentication/strategy/db.py
async def invalidate_all_tokens(self, user: UP) -> None:
    """Delete all persisted access and refresh tokens for the given user."""
    await self._repository(self._access_token_repository_type).delete_where(user_id=user.id, auto_commit=False)
    await self._repository(self._refresh_token_repository_type).delete_where(user_id=user.id, auto_commit=False)
    await self.session.commit()

issue_totp_stepup(user, session_id, *, ttl_seconds) async

Store a short-lived TOTP step-up marker on a DB-backed refresh session.

Source code in litestar_auth/authentication/strategy/db.py
async def issue_totp_stepup(self, user: UP, session_id: str, *, ttl_seconds: int) -> None:
    """Store a short-lived TOTP step-up marker on a DB-backed refresh session."""
    row = await self._load_refresh_session_row(user, session_id)
    if row is None:
        return
    metadata = dict(row.client_metadata or {})
    if ttl_seconds <= 0:
        metadata.pop(_CLIENT_METADATA_TOTP_STEPUP_EXPIRES_AT_KEY, None)
    else:
        expires_at = datetime.now(tz=UTC).timestamp() + ttl_seconds
        metadata[_CLIENT_METADATA_TOTP_STEPUP_EXPIRES_AT_KEY] = str(expires_at)
    row.client_metadata = metadata or None
    await self.session.commit()

list_refresh_sessions(user) async

Return active refresh sessions belonging to user.

Expired refresh-token rows are deleted before active sessions are returned.

Returns:

Type Description
list[RefreshSession]

Active refresh-session metadata ordered by creation time.

Source code in litestar_auth/authentication/strategy/db.py
async def list_refresh_sessions(self, user: UP) -> list[RefreshSession]:
    """Return active refresh sessions belonging to ``user``.

    Expired refresh-token rows are deleted before active sessions are returned.

    Returns:
        Active refresh-session metadata ordered by creation time.
    """
    expired_count = await self._delete_expired_refresh_sessions_for_user(user)
    if expired_count:
        await self.session.commit()

    result = await self.session.execute(
        select(self.refresh_token_model)
        .where(self.refresh_token_model.user_id == user.id)
        .order_by(self.refresh_token_model.created_at),
    )
    rows = cast("list[_RefreshTokenRow]", result.scalars().all())
    return [self._refresh_session_from_row(row) for row in rows]

read_token(token, user_manager) async

Resolve a user from an opaque database token.

Returns:

Type Description
UP | None

Related user when the token exists and is not expired, otherwise None.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def read_token(
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> UP | None:
    """Resolve a user from an opaque database token.

    Returns:
        Related user when the token exists and is not expired, otherwise ``None``.
    """
    del user_manager
    if token is None:
        return None

    access_token = cast(
        "Any",
        await self._resolve_token(
            self._repository(self._access_token_repository_type),
            token,
            load=[self.access_token_model.user],
        ),
    )
    if access_token is None or self._is_token_expired(access_token.created_at, self.max_age):
        return None

    return cast("UP", access_token.user)

revoke_other_refresh_sessions(user, current_session_id) async

Revoke active refresh sessions for user except current_session_id.

Returns:

Type Description
int

Number of active refresh sessions revoked.

Source code in litestar_auth/authentication/strategy/db.py
async def revoke_other_refresh_sessions(self, user: UP, current_session_id: str | None) -> int:
    """Revoke active refresh sessions for ``user`` except ``current_session_id``.

    Returns:
        Number of active refresh sessions revoked.
    """
    expired_count = await self._delete_expired_refresh_sessions_for_user(user)
    conditions = [self.refresh_token_model.user_id == user.id]
    if current_session_id is not None:
        conditions.append(self.refresh_token_model.session_id != current_session_id)
    deleted_count = await self._execute_delete(delete(self.refresh_token_model).where(*conditions))
    if expired_count or deleted_count:
        await self.session.commit()
    return deleted_count

revoke_refresh_session(user, session_id) async

Revoke one active refresh session for user by public session id.

Returns:

Type Description
bool

True when a matching active session was deleted, otherwise False.

Source code in litestar_auth/authentication/strategy/db.py
async def revoke_refresh_session(self, user: UP, session_id: str) -> bool:
    """Revoke one active refresh session for ``user`` by public session id.

    Returns:
        ``True`` when a matching active session was deleted, otherwise ``False``.
    """
    expired_count = await self._delete_expired_refresh_sessions_for_user(user)
    deleted_count = await self._execute_delete(
        delete(self.refresh_token_model).where(
            self.refresh_token_model.user_id == user.id,
            self.refresh_token_model.session_id == session_id,
        ),
    )
    if expired_count or deleted_count:
        await self.session.commit()
    return deleted_count > 0

rotate_refresh_token(refresh_token, user_manager) async

Rotate a refresh token and return the related user plus replacement.

Returns:

Type Description
tuple[UP, str] | None

Tuple of the resolved user and rotated refresh token, or None when invalid.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def rotate_refresh_token(
    self,
    refresh_token: str,
    user_manager: UserManagerProtocol[UP, ID],
) -> tuple[UP, str] | None:
    """Rotate a refresh token and return the related user plus replacement.

    Returns:
        Tuple of the resolved user and rotated refresh token, or ``None`` when invalid.
    """
    del user_manager
    client_metadata = self._consume_refresh_token_request_metadata()
    persisted_token = await self._load_refresh_token_for_rotation(refresh_token)
    if persisted_token is None:
        return None
    if self._is_token_expired(persisted_token.created_at, self.refresh_max_age):
        await self._delete_refresh_token_row(persisted_token)
        return None

    user = cast("UP", persisted_token.user)
    await self._delete_refresh_token_row(persisted_token)
    rotated_refresh_token = await self._mint_replacement_refresh_token(
        user,
        persisted_token,
        client_metadata=client_metadata,
    )
    return user, rotated_refresh_token

set_refresh_token_request_context(request)

Capture safe request metadata for the next refresh-token write or rotation.

Source code in litestar_auth/authentication/strategy/db.py
def set_refresh_token_request_context(self, request: object) -> None:
    """Capture safe request metadata for the next refresh-token write or rotation."""
    self._refresh_token_request_metadata = self._extract_refresh_token_client_metadata(request)

with_session(session)

Return a copy of the strategy bound to the provided async session.

Source code in litestar_auth/authentication/strategy/db.py
def with_session(self, session: AsyncSessionT) -> DatabaseTokenStrategy[UP, ID]:
    """Return a copy of the strategy bound to the provided async session."""
    return type(self)(
        session=session,
        token_hash_secret=self._token_hash_secret.decode(),
        token_models=self.token_models,
        max_age=self.max_age,
        refresh_max_age=self.refresh_max_age,
        token_bytes=self.token_bytes,
        unsafe_testing=self.unsafe_testing,
    )

write_refresh_token(user) async

Persist and return a new opaque refresh token for the user.

Returns:

Type Description
str

Newly created opaque refresh-token string.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def write_refresh_token(self, user: UP) -> str:
    """Persist and return a new opaque refresh token for the user.

    Returns:
        Newly created opaque refresh-token string.
    """
    token, token_digest = mint_opaque_token(token_bytes=self.token_bytes, token_hash_secret=self._token_hash_secret)
    refresh_token = self.refresh_token_model(
        token=token_digest,
        user_id=user.id,
        client_metadata=self._consume_refresh_token_request_metadata(),
    )
    await self._repository(self._refresh_token_repository_type).add(refresh_token, auto_refresh=True)
    return token

write_token(user) async

Persist and return a new opaque token for the user.

Returns:

Type Description
str

Newly created opaque token string.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def write_token(self, user: UP) -> str:
    """Persist and return a new opaque token for the user.

    Returns:
        Newly created opaque token string.
    """
    token, token_digest = mint_opaque_token(token_bytes=self.token_bytes, token_hash_secret=self._token_hash_secret)
    access_token = self.access_token_model(token=token_digest, user_id=user.id)
    await self._repository(self._access_token_repository_type).add(access_token, auto_refresh=True)
    return token

DatabaseTokenStrategyConfig(session, token_hash_secret, token_models=None, max_age=DEFAULT_MAX_AGE, refresh_max_age=DEFAULT_REFRESH_MAX_AGE, token_bytes=DEFAULT_TOKEN_BYTES, unsafe_testing=False) dataclass

Configuration for :class:DatabaseTokenStrategy.

InMemoryApiKeyNonceStore(*, clock=time.monotonic, max_entries=50000)

Async-safe process-local API-key signing nonce store.

Initialize an empty nonce cache.

Raises:

Type Description
ValueError

If max_entries is less than one.

Source code in litestar_auth/authentication/strategy/_api_key_nonce_store.py
def __init__(self, *, clock: Clock = time.monotonic, max_entries: int = 50_000) -> None:
    """Initialize an empty nonce cache.

    Raises:
        ValueError: If ``max_entries`` is less than one.
    """
    if max_entries < 1:
        msg = "max_entries must be at least 1."
        raise ValueError(msg)
    self._clock = clock
    self.max_entries = max_entries
    self._entries: dict[tuple[str, str], float] = {}
    self._lock = asyncio.Lock()

is_shared_across_workers property

In-memory state is process-local.

mark_used(*, key_id, nonce, ttl_seconds) async

Record a nonce until TTL expiry, rejecting replays fail-closed.

Returns:

Type Description
ApiKeyNonceStoreResult

Stored/replay outcome for the nonce insert.

Source code in litestar_auth/authentication/strategy/_api_key_nonce_store.py
async def mark_used(self, *, key_id: str, nonce: str, ttl_seconds: int) -> ApiKeyNonceStoreResult:
    """Record a nonce until TTL expiry, rejecting replays fail-closed.

    Returns:
        Stored/replay outcome for the nonce insert.
    """
    async with self._lock:
        now = read_clock(self._clock)
        self._prune(now)
        key = (key_id, nonce)
        if key in self._entries:
            return ApiKeyNonceStoreResult(stored=False, rejected_as_replay=True)
        if len(self._entries) >= self.max_entries:
            return ApiKeyNonceStoreResult(stored=False, rejected_as_replay=False)
        self._entries[key] = now + ttl_seconds
        return ApiKeyNonceStoreResult(stored=True)

JWTStrategy(*, config=None, **options)

JWTStrategy(*, config: JWTStrategyConfig[UP, ID])
JWTStrategy(**options: Unpack[JWTStrategyOptions[UP, ID]])

Bases: Strategy[UP, ID]

Stateless strategy that stores user identifiers inside JWTs.

JWT access tokens issued by this strategy are designed to be short-lived and stateless. Revocation uses the configured denylist keyed by the jti claim so individual tokens can be explicitly revoked before expiration when :meth:destroy_token is called.

Production deployments should pass a shared denylist store such as :class:RedisJWTDenylistStore. Single-process tests, development apps, and consciously single-process deployments can opt into :class:InMemoryJWTDenylistStore with allow_inmemory_denylist=True. Inspect :attr:revocation_posture to determine whether a concrete strategy instance uses process-local or durable shared-store revocation.

Initialize the JWT strategy.

Parameters:

Name Type Description Default
config JWTStrategyConfig[UP, ID] | None

JWT strategy configuration.

None
**options Unpack[JWTStrategyOptions[UP, ID]]

Individual JWT strategy settings. Do not combine with config.

{}
Source code in litestar_auth/authentication/strategy/jwt.py
def __init__(
    self,
    *,
    config: JWTStrategyConfig[UP, ID] | None = None,
    **options: Unpack[JWTStrategyOptions[UP, ID]],
) -> None:
    """Initialize the JWT strategy.

    Args:
        config: JWT strategy configuration.
        **options: Individual JWT strategy settings. Do not combine with
            ``config``.
    """
    settings = _resolve_jwt_strategy_config(config=config, options=options)
    _validate_jwt_algorithm_settings(settings)
    self._apply_settings(settings)

revocation_is_durable property

Return whether token revocation is backed by a shared store.

revocation_posture property

Return the explicit revocation durability contract for this strategy.

destroy_token(token, user) async

Revoke the given token by adding its jti to the configured denylist.

Tokens without a jti claim, or tokens that fail to decode, are ignored.

Raises:

Type Description
TokenError

When the denylist refuses a new revocation (for example, the compatibility in-memory store is at max_entries with no reclaimable slots).

Source code in litestar_auth/authentication/strategy/jwt.py
@override
async def destroy_token(self, token: str, user: UP) -> None:
    """Revoke the given token by adding its ``jti`` to the configured denylist.

    Tokens without a ``jti`` claim, or tokens that fail to decode, are ignored.

    Raises:
        TokenError: When the denylist refuses a new revocation (for example, the
            compatibility in-memory store is at ``max_entries`` with no reclaimable slots).
    """
    del user

    try:
        payload = decode_signed_jwt(
            token,
            config=JwtDecodeConfig(
                key=self.verify_key,
                algorithms=[self.algorithm],
                audience=JWT_ACCESS_TOKEN_AUDIENCE,
                options={"verify_exp": False},
                issuer=self.issuer,
            ),
        )
    except InvalidTokenError:
        return

    jti = payload.get("jti")
    exp = payload.get("exp")
    if not isinstance(jti, str):
        return
    ttl_seconds = 1
    if isinstance(exp, int):
        ttl_seconds = max(exp - int(time.time()), 1)
    recorded = await self._denylist_store.deny(jti, ttl_seconds=ttl_seconds)
    if not recorded:
        msg = (
            "Could not record JWT revocation in the denylist (in-memory store at capacity). "
            "Use RedisJWTDenylistStore or increase max_entries."
        )
        raise TokenError(msg)

read_token(token, user_manager) async

Decode a JWT token and load its user.

Returns:

Type Description
UP | None

The matching user, or None when the token is invalid.

Source code in litestar_auth/authentication/strategy/jwt.py
@override
async def read_token(  # noqa: PLR0911
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> UP | None:
    """Decode a JWT token and load its user.

    Returns:
        The matching user, or ``None`` when the token is invalid.
    """
    if token is None:
        return None

    payload = self._decode_verified_access_token(token)
    if payload is None:
        logger.info("JWT decode failed (expired or invalid signature)")
        return None

    if await self._is_token_denied(payload):
        logger.info("JWT denied (revoked)")
        return None

    subject = payload.get("sub")
    if not isinstance(subject, str) or not subject:
        logger.info("JWT missing or invalid 'sub' claim")
        return None

    try:
        user_id = self.subject_decoder(subject) if self.subject_decoder is not None else subject
    except ValueError:
        # Security: avoid logging the subject itself to prevent user enumeration
        # via authentication-failure log analysis (OWASP / NIST SP 800-63B §5.2.2).
        logger.info("JWT subject could not be decoded")
        return None

    if user_id is None:
        return None

    user = await user_manager.get(cast("ID", user_id))
    if user is None:
        logger.info("JWT subject references non-existent user")
        return None

    if not self._validate_fingerprint(payload, user):
        logger.info("JWT fingerprint mismatch")
        return None

    return user

write_token(user) async

Generate a JWT token for the provided user.

Returns:

Type Description
str

The encoded JWT token string.

Source code in litestar_auth/authentication/strategy/jwt.py
@override
async def write_token(self, user: UP) -> str:
    """Generate a JWT token for the provided user.

    Returns:
        The encoded JWT token string.
    """
    issued_at = datetime.now(tz=UTC)
    payload = {
        "sub": str(user.id),
        "aud": JWT_ACCESS_TOKEN_AUDIENCE,
        "iat": issued_at,
        "nbf": issued_at,
        "exp": issued_at + self.lifetime,
        "jti": secrets.token_hex(16),
    }
    if self.issuer is not None:
        payload["iss"] = self.issuer

    fingerprint = self.session_fingerprint_getter(user)
    if fingerprint is not None:
        payload[self.session_fingerprint_claim] = fingerprint
    return jwt.encode(payload, self.secret, algorithm=self.algorithm, headers=jwt_encode_headers())

JWTStrategyConfig(secret, verify_key=None, algorithm=DEFAULT_ALGORITHM, lifetime=DEFAULT_LIFETIME, subject_decoder=None, issuer=None, denylist_store=None, allow_inmemory_denylist=False, session_fingerprint_getter=None, session_fingerprint_claim='sfp') dataclass

Configuration for :class:JWTStrategy.

RedisApiKeyNonceStore(*, redis, key_prefix=DEFAULT_API_KEY_NONCE_PREFIX)

Redis-backed API-key signing nonce store.

Store the Redis client and key namespace.

Source code in litestar_auth/authentication/strategy/_api_key_nonce_store.py
def __init__(
    self,
    *,
    redis: RedisApiKeyNonceStoreClient,
    key_prefix: str = DEFAULT_API_KEY_NONCE_PREFIX,
) -> None:
    """Store the Redis client and key namespace."""
    _require_redis_asyncio(feature_name="RedisApiKeyNonceStore")
    self._redis = redis
    self._key_prefix = key_prefix

is_shared_across_workers property

Redis state is shared across workers using the same server.

mark_used(*, key_id, nonce, ttl_seconds) async

Atomically record a nonce with SET NX PX.

Returns:

Type Description
ApiKeyNonceStoreResult

Stored/replay outcome for the nonce insert.

Source code in litestar_auth/authentication/strategy/_api_key_nonce_store.py
async def mark_used(self, *, key_id: str, nonce: str, ttl_seconds: int) -> ApiKeyNonceStoreResult:
    """Atomically record a nonce with ``SET NX PX``.

    Returns:
        Stored/replay outcome for the nonce insert.
    """
    result = await self._redis.set(self._key(key_id, nonce), "1", nx=True, px=max(ttl_seconds, 1) * 1000)
    if result is True:
        return ApiKeyNonceStoreResult(stored=True)
    return ApiKeyNonceStoreResult(stored=False, rejected_as_replay=True)

RedisApiKeyNonceStoreClient

Bases: RedisConditionalSetClient, Protocol

Minimal Redis client for API-key signing nonce storage.

RedisTokenStrategy(*, config=None, **options)

RedisTokenStrategy(*, config: RedisTokenStrategyConfig[ID])
RedisTokenStrategy(**options: Unpack[RedisTokenStrategyOptions[ID]])

Bases: Strategy[UP, ID]

Stateful strategy that stores opaque tokens in Redis with TTL.

Initialize the strategy.

Parameters:

Name Type Description Default
config RedisTokenStrategyConfig[ID] | None

Redis strategy configuration.

None
**options Unpack[RedisTokenStrategyOptions[ID]]

Individual Redis strategy settings. Do not combine with config.

{}

Raises:

Type Description
ValueError

If config and keyword options are combined.

ConfigurationError

When token_hash_secret fails minimum-length requirements.

Source code in litestar_auth/authentication/strategy/redis.py
def __init__(
    self,
    *,
    config: RedisTokenStrategyConfig[ID] | None = None,
    **options: Unpack[RedisTokenStrategyOptions[ID]],
) -> None:
    """Initialize the strategy.

    Args:
        config: Redis strategy configuration.
        **options: Individual Redis strategy settings. Do not combine with
            ``config``.

    Raises:
        ValueError: If ``config`` and keyword options are combined.
        ConfigurationError: When ``token_hash_secret`` fails minimum-length requirements.
    """
    if config is not None and options:
        msg = "Pass either RedisTokenStrategyConfig or keyword options, not both."
        raise ValueError(msg)
    settings = RedisTokenStrategyConfig(**options) if config is None else config
    _load_redis_asyncio()
    try:
        validate_production_secret(settings.token_hash_secret, label="RedisTokenStrategy token_hash_secret")
    except ConfigurationError as exc:
        raise ConfigurationError(str(exc)) from exc
    validate_token_bytes(settings.token_bytes, label="RedisTokenStrategy")

    self.redis = settings.redis
    self._token_hash_secret = settings.token_hash_secret.encode()
    self.lifetime = settings.lifetime
    self.token_bytes = settings.token_bytes
    self.key_prefix = settings.key_prefix
    self.subject_decoder = settings.subject_decoder

destroy_token(token, user) async

Delete a persisted Redis token.

Source code in litestar_auth/authentication/strategy/redis.py
@override
async def destroy_token(self, token: str, user: UP) -> None:
    """Delete a persisted Redis token."""
    token_key = self._key(token)
    user_id = str(user.id)
    index_key = self._user_index_key(user_id)
    await self.redis.delete(token_key)
    await self.redis.srem(index_key, token_key)

has_recent_totp_verification(user, session_id) async

Return whether a Redis-backed session has a live TOTP step-up marker.

Source code in litestar_auth/authentication/strategy/redis.py
async def has_recent_totp_verification(self, user: UP, session_id: str) -> bool:
    """Return whether a Redis-backed session has a live TOTP step-up marker."""
    return await self.redis.get(self._totp_stepup_key(str(user.id), session_id)) is not None

invalidate_all_tokens(user) async

Delete all Redis-backed tokens associated with the given user.

This uses a per-user index to delete only the keys associated with the user, avoiding keyspace scans under the global prefix. Tokens that do not have a per-user index entry are left to expire naturally by TTL.

Source code in litestar_auth/authentication/strategy/redis.py
async def invalidate_all_tokens(self, user: UP) -> None:
    """Delete all Redis-backed tokens associated with the given user.

    This uses a per-user index to delete only the keys associated with the
    user, avoiding keyspace scans under the global prefix. Tokens that do
    not have a per-user index entry are left to expire naturally by TTL.
    """
    user_id = str(user.id)
    await self._invalidate_via_index(user_id)
    await self._invalidate_stepup_markers(user_id)

issue_totp_stepup(user, session_id, *, ttl_seconds) async

Store a short-lived TOTP step-up marker for a Redis-backed session.

Source code in litestar_auth/authentication/strategy/redis.py
async def issue_totp_stepup(self, user: UP, session_id: str, *, ttl_seconds: int) -> None:
    """Store a short-lived TOTP step-up marker for a Redis-backed session."""
    user_id = str(user.id)
    key = self._totp_stepup_key(user_id, session_id)
    index_key = self._totp_stepup_index_key(user_id)
    if ttl_seconds <= 0:
        await self.redis.delete(key)
        await self.redis.srem(index_key, key)
        return
    await self.redis.setex(key, ttl_seconds, "1")
    await self.redis.sadd(index_key, key)
    await self.redis.expire(index_key, ttl_seconds)

read_token(token, user_manager) async

Resolve a user from a Redis-backed token.

Returns:

Type Description
UP | None

The resolved user when the token exists and decodes successfully,

UP | None

otherwise None.

Source code in litestar_auth/authentication/strategy/redis.py
@override
async def read_token(
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> UP | None:
    """Resolve a user from a Redis-backed token.

    Returns:
        The resolved user when the token exists and decodes successfully,
        otherwise ``None``.
    """
    if token is None:
        return None

    stored_user_id = await self.redis.get(self._key(token))
    if stored_user_id is None:
        return None

    user_id_text = self._decode_user_id(stored_user_id)

    try:
        user_id = self.subject_decoder(user_id_text) if self.subject_decoder is not None else user_id_text
    except (TypeError, ValueError):
        return None

    return await user_manager.get(cast("ID", user_id))

write_token(user) async

Persist a new opaque token in Redis and return it.

Returns:

Type Description
str

Newly created opaque token string.

Source code in litestar_auth/authentication/strategy/redis.py
@override
async def write_token(self, user: UP) -> str:
    """Persist a new opaque token in Redis and return it.

    Returns:
        Newly created opaque token string.
    """
    token, token_key = self._mint_token_key()
    user_id = str(user.id)
    await self.redis.setex(token_key, self._ttl_seconds, user_id)
    index_key = self._user_index_key(user_id)
    await self.redis.sadd(index_key, token_key)
    await self.redis.expire(index_key, self._ttl_seconds)
    return token

RedisTokenStrategyConfig(redis, token_hash_secret, lifetime=DEFAULT_LIFETIME, token_bytes=DEFAULT_TOKEN_BYTES, key_prefix=DEFAULT_KEY_PREFIX, subject_decoder=None) dataclass

Configuration for :class:RedisTokenStrategy.

RefreshableStrategy

Bases: Protocol

Protocol for strategies that support refresh-token rotation.

Note

Refresh tokens are intentionally modeled as a separate lifecycle artifact from access tokens. In particular, Strategy.destroy_token() only targets the access token used for request authentication; refresh-token invalidation (if any) is managed by the refresh strategy itself.

rotate_refresh_token(refresh_token, user_manager) async

Consume a refresh token and return the user plus a rotated replacement.

Source code in litestar_auth/authentication/strategy/base.py
async def rotate_refresh_token(
    self,
    refresh_token: str,
    user_manager: UserManagerProtocol[UP, ID],
) -> tuple[UP, str] | None:
    """Consume a refresh token and return the user plus a rotated replacement."""

write_refresh_token(user) async

Issue a refresh token for the provided user.

Source code in litestar_auth/authentication/strategy/base.py
async def write_refresh_token(self, user: UP) -> str:
    """Issue a refresh token for the provided user."""

Strategy

Bases: ABC

Abstract base class for token storage and validation strategies.

destroy_token(token, user) abstractmethod async

Invalidate a token for the provided user.

Source code in litestar_auth/authentication/strategy/base.py
@abstractmethod
async def destroy_token(self, token: str, user: UP) -> None:
    """Invalidate a token for the provided user."""

read_token(token, user_manager) abstractmethod async

Resolve a user from a token.

Source code in litestar_auth/authentication/strategy/base.py
@abstractmethod
async def read_token(self, token: str | None, user_manager: UserManagerProtocol[UP, ID]) -> UP | None:
    """Resolve a user from a token."""

write_token(user) abstractmethod async

Issue a token for the provided user.

Source code in litestar_auth/authentication/strategy/base.py
@abstractmethod
async def write_token(self, user: UP) -> str:
    """Issue a token for the provided user."""

UserManagerProtocol

Bases: Protocol

Protocol for user manager lookups used by token strategies.

get(user_id) async

Return the user for the given identifier.

Source code in litestar_auth/authentication/strategy/base.py
async def get(self, user_id: ID) -> UP | None:
    """Return the user for the given identifier."""