Skip to content

Strategies

litestar_auth.authentication.strategy

Issue, validate, rotate, and revoke tokens (JWT, database, or Redis).

Strategies pair with :mod:litestar_auth.authentication.transport implementations inside :class:~litestar_auth.authentication.backend.AuthenticationBackend.

DatabaseTokenModels is the explicit contract for DatabaseTokenStrategy when you swap in mixin-composed token ORM classes. import_token_orm_models() remains re-exported here only for compatibility with existing imports; the canonical public bootstrap helper lives at litestar_auth.models.import_token_orm_models().

DatabaseTokenModels(access_token_model=AccessToken, refresh_token_model=RefreshToken) dataclass

Explicit access-token and refresh-token ORM contract for DatabaseTokenStrategy.

The supplied models must expose mapped token, created_at, user_id, and user attributes compatible with the persistence operations performed by the DB token strategy. Defaults preserve the bundled AccessToken / RefreshToken behavior.

__post_init__()

Validate the supplied token-model classes eagerly.

Source code in litestar_auth/authentication/strategy/db_models.py
def __post_init__(self) -> None:
    """Validate the supplied token-model classes eagerly."""
    _validate_token_model_contract(self.access_token_model, field_name="access_token_model")
    _validate_token_model_contract(self.refresh_token_model, field_name="refresh_token_model")

DatabaseTokenStrategy(*, session, token_hash_secret, token_models=None, max_age=DEFAULT_MAX_AGE, refresh_max_age=DEFAULT_REFRESH_MAX_AGE, token_bytes=DEFAULT_TOKEN_BYTES, accept_legacy_plaintext_tokens=False)

Bases: Strategy[UP, ID], RefreshableStrategy[UP, ID]

Stateful strategy that persists opaque tokens in the database.

Initialize the strategy.

Parameters:

Name Type Description Default
session AsyncSessionT

SQLAlchemy session used by the repository.

required
token_hash_secret str

High-entropy secret used for keyed token hashing (HMAC-SHA256).

required
token_models DatabaseTokenModels | None

Access-token and refresh-token ORM models used for persistence. Omit to keep the bundled AccessToken / RefreshToken behavior.

None
max_age timedelta

Maximum token age before it is rejected.

DEFAULT_MAX_AGE
refresh_max_age timedelta

Maximum refresh-token age before it is rejected.

DEFAULT_REFRESH_MAX_AGE
token_bytes int

Number of random bytes used for token generation.

DEFAULT_TOKEN_BYTES
accept_legacy_plaintext_tokens bool

When enabled, accept previously persisted raw tokens stored before digest-at-rest hardening. This is intended for migrations only.

False

Raises:

Type Description
ConfigurationError

When token_hash_secret fails minimum-length requirements.

Source code in litestar_auth/authentication/strategy/db.py
def __init__(  # noqa: PLR0913
    self,
    *,
    session: AsyncSessionT,
    token_hash_secret: str,
    token_models: DatabaseTokenModels | None = None,
    max_age: timedelta = DEFAULT_MAX_AGE,
    refresh_max_age: timedelta = DEFAULT_REFRESH_MAX_AGE,
    token_bytes: int = DEFAULT_TOKEN_BYTES,
    accept_legacy_plaintext_tokens: bool = False,
) -> None:
    """Initialize the strategy.

    Args:
        session: SQLAlchemy session used by the repository.
        token_hash_secret: High-entropy secret used for keyed token hashing (HMAC-SHA256).
        token_models: Access-token and refresh-token ORM models used for persistence. Omit to
            keep the bundled ``AccessToken`` / ``RefreshToken`` behavior.
        max_age: Maximum token age before it is rejected.
        refresh_max_age: Maximum refresh-token age before it is rejected.
        token_bytes: Number of random bytes used for token generation.
        accept_legacy_plaintext_tokens: When enabled, accept previously persisted raw tokens
            stored before digest-at-rest hardening. This is intended for migrations only.

    Raises:
        ConfigurationError: When ``token_hash_secret`` fails minimum-length requirements.
    """
    try:
        validate_secret_length(token_hash_secret, label="DatabaseTokenStrategy token_hash_secret")
    except ConfigurationError as exc:
        raise ConfigurationError(str(exc)) from exc

    self.session = session
    self._token_hash_secret = token_hash_secret.encode()
    self.token_models = DatabaseTokenModels() if token_models is None else token_models
    self.access_token_model = self.token_models.access_token_model
    self.refresh_token_model = self.token_models.refresh_token_model
    self._access_token_repository_type = _build_token_repository(self.access_token_model)
    self._refresh_token_repository_type = _build_token_repository(self.refresh_token_model)
    self.max_age = max_age
    self.refresh_max_age = refresh_max_age
    self.token_bytes = token_bytes
    self.accept_legacy_plaintext_tokens = accept_legacy_plaintext_tokens
    if accept_legacy_plaintext_tokens and not is_testing():
        logger.warning(
            build_legacy_plaintext_tokens_warning_message(),
            extra={"event": "db_tokens_accept_legacy_plaintext"},
        )

cleanup_expired_tokens(session) async

Delete expired access and refresh tokens for the configured TTLs.

Returns:

Type Description
int

Total number of deleted access-token and refresh-token rows.

Source code in litestar_auth/authentication/strategy/db.py
async def cleanup_expired_tokens(self, session: AsyncSession) -> int:
    """Delete expired access and refresh tokens for the configured TTLs.

    Returns:
        Total number of deleted access-token and refresh-token rows.
    """
    now = datetime.now(tz=UTC)
    access_cutoff = now - self.max_age
    refresh_cutoff = now - self.refresh_max_age

    access_result = await session.execute(
        delete(self.access_token_model).where(self.access_token_model.created_at <= access_cutoff),
    )
    refresh_result = await session.execute(
        delete(self.refresh_token_model).where(self.refresh_token_model.created_at <= refresh_cutoff),
    )
    await session.commit()

    access_rowcount = getattr(access_result, "rowcount", 0) or 0
    refresh_rowcount = getattr(refresh_result, "rowcount", 0) or 0
    return access_rowcount + refresh_rowcount

destroy_token(token, user) async

Delete a persisted token.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def destroy_token(self, token: str, user: UP) -> None:
    """Delete a persisted token."""
    del user
    token_digest = self._token_digest(token)
    await self._repository().delete_where(token=token_digest, auto_commit=False)
    if self.accept_legacy_plaintext_tokens:
        await self._repository().delete_where(token=token, auto_commit=False)
    await self.session.commit()

invalidate_all_tokens(user) async

Delete all persisted access and refresh tokens for the given user.

Source code in litestar_auth/authentication/strategy/db.py
async def invalidate_all_tokens(self, user: UP) -> None:
    """Delete all persisted access and refresh tokens for the given user."""
    await self._repository().delete_where(user_id=user.id, auto_commit=False)
    await self._refresh_repository().delete_where(user_id=user.id, auto_commit=False)
    await self.session.commit()

read_token(token, user_manager) async

Resolve a user from an opaque database token.

Returns:

Type Description
UP | None

Related user when the token exists and is not expired, otherwise None.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def read_token(
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> UP | None:
    """Resolve a user from an opaque database token.

    Returns:
        Related user when the token exists and is not expired, otherwise ``None``.
    """
    del user_manager
    if token is None:
        return None

    access_token = cast(
        "Any",
        await self._resolve_token(
            self._repository(),
            token,
            load=[self.access_token_model.user],
        ),
    )
    if access_token is None or self._is_token_expired(access_token.created_at, self.max_age):
        return None

    return cast("UP", access_token.user)

rotate_refresh_token(refresh_token, user_manager) async

Rotate a refresh token and return the related user plus replacement.

Returns:

Type Description
tuple[UP, str] | None

Tuple of the resolved user and rotated refresh token, or None when invalid.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def rotate_refresh_token(
    self,
    refresh_token: str,
    user_manager: UserManagerProtocol[UP, ID],
) -> tuple[UP, str] | None:
    """Rotate a refresh token and return the related user plus replacement.

    Returns:
        Tuple of the resolved user and rotated refresh token, or ``None`` when invalid.
    """
    del user_manager
    persisted_token = cast(
        "Any",
        await self._resolve_token(
            self._refresh_repository(),
            refresh_token,
            load=[self.refresh_token_model.user],
        ),
    )
    if persisted_token is None:
        return None
    if self._is_token_expired(persisted_token.created_at, self.refresh_max_age):
        await self._refresh_repository().delete_where(token=persisted_token.token, auto_commit=False)
        return None

    user = cast("UP", persisted_token.user)
    await self._refresh_repository().delete_where(token=persisted_token.token, auto_commit=False)
    rotated_refresh_token = secrets.token_urlsafe(self.token_bytes)
    rotated_model = self.refresh_token_model(token=self._token_digest(rotated_refresh_token), user_id=user.id)
    await self._refresh_repository().add(rotated_model, auto_commit=False, auto_refresh=True)
    return user, rotated_refresh_token

with_session(session)

Return a copy of the strategy bound to the provided async session.

Source code in litestar_auth/authentication/strategy/db.py
def with_session(self, session: AsyncSessionT) -> DatabaseTokenStrategy[UP, ID]:
    """Return a copy of the strategy bound to the provided async session."""
    return type(self)(
        session=session,
        token_hash_secret=self._token_hash_secret.decode(),
        token_models=self.token_models,
        max_age=self.max_age,
        refresh_max_age=self.refresh_max_age,
        token_bytes=self.token_bytes,
        accept_legacy_plaintext_tokens=self.accept_legacy_plaintext_tokens,
    )

write_refresh_token(user) async

Persist and return a new opaque refresh token for the user.

Returns:

Type Description
str

Newly created opaque refresh-token string.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def write_refresh_token(self, user: UP) -> str:
    """Persist and return a new opaque refresh token for the user.

    Returns:
        Newly created opaque refresh-token string.
    """
    token = secrets.token_urlsafe(self.token_bytes)
    refresh_token = self.refresh_token_model(token=self._token_digest(token), user_id=user.id)
    await self._refresh_repository().add(refresh_token, auto_refresh=True)
    return token

write_token(user) async

Persist and return a new opaque token for the user.

Returns:

Type Description
str

Newly created opaque token string.

Source code in litestar_auth/authentication/strategy/db.py
@override
async def write_token(self, user: UP) -> str:
    """Persist and return a new opaque token for the user.

    Returns:
        Newly created opaque token string.
    """
    token = secrets.token_urlsafe(self.token_bytes)
    access_token = self.access_token_model(token=self._token_digest(token), user_id=user.id)
    await self._repository().add(access_token, auto_refresh=True)
    return token

JWTStrategy(*, secret, verify_key=None, algorithm=DEFAULT_ALGORITHM, lifetime=DEFAULT_LIFETIME, subject_decoder=None, issuer=None, denylist_store=None, session_fingerprint_getter=None, session_fingerprint_claim='sfp')

Bases: Strategy[UP, ID]

Stateless strategy that stores user identifiers inside JWTs.

JWT access tokens issued by this strategy are designed to be short-lived and stateless. This implementation also maintains a lightweight, in-memory denylist keyed by the jti claim so that individual tokens can be explicitly revoked before expiration when :meth:destroy_token is called.

The in-memory denylist is process-local and is not persisted across worker restarts; deployments that require stronger revocation guarantees should wrap or subclass this strategy and back the denylist with a shared store such as Redis.

Initialize the JWT strategy.

Parameters:

Name Type Description Default
secret str

Signing secret or private key used for JWT encoding.

required
verify_key str | None

Optional verification key used for JWT decoding. When omitted, secret is used for both encoding and decoding.

None
algorithm str

JWT signing algorithm.

DEFAULT_ALGORITHM
lifetime timedelta

Token lifetime added to the expiration claim.

DEFAULT_LIFETIME
subject_decoder Callable[[str], ID] | None

Optional callable that converts the sub claim into the identifier type expected by the user manager.

None
issuer str | None

Optional issuer string to embed in the iss claim and validate when decoding.

None
denylist_store JWTDenylistStore | None

Optional shared denylist backend for durable token revocation across workers.

None
session_fingerprint_getter Callable[[UP], str | None] | None

Optional callable that returns a fingerprint representing a user's current security state. When the returned value is embedded into tokens, password/email changes can invalidate old tokens without server-side session storage.

None
session_fingerprint_claim str

JWT claim name used to store the session fingerprint.

'sfp'

Raises:

Type Description
ValueError

Raised when the configured algorithm is not allow-listed.

Source code in litestar_auth/authentication/strategy/jwt.py
def __init__(  # noqa: PLR0913
    self,
    *,
    secret: str,
    verify_key: str | None = None,
    algorithm: str = DEFAULT_ALGORITHM,
    lifetime: timedelta = DEFAULT_LIFETIME,
    subject_decoder: Callable[[str], ID] | None = None,
    issuer: str | None = None,
    denylist_store: JWTDenylistStore | None = None,
    session_fingerprint_getter: Callable[[UP], str | None] | None = None,
    session_fingerprint_claim: str = "sfp",
) -> None:
    """Initialize the JWT strategy.

    Args:
        secret: Signing secret or private key used for JWT encoding.
        verify_key: Optional verification key used for JWT decoding. When
            omitted, ``secret`` is used for both encoding and decoding.
        algorithm: JWT signing algorithm.
        lifetime: Token lifetime added to the expiration claim.
        subject_decoder: Optional callable that converts the ``sub`` claim
            into the identifier type expected by the user manager.
        issuer: Optional issuer string to embed in the ``iss`` claim and
            validate when decoding.
        denylist_store: Optional shared denylist backend for durable token
            revocation across workers.
        session_fingerprint_getter: Optional callable that returns a fingerprint
            representing a user's current security state. When the returned value
            is embedded into tokens, password/email changes can invalidate old
            tokens without server-side session storage.
        session_fingerprint_claim: JWT claim name used to store the session
            fingerprint.

    Raises:
        ValueError: Raised when the configured algorithm is not allow-listed.
    """
    if algorithm not in _ALLOWED_ALGORITHMS:
        msg = f"Unsupported JWT algorithm '{algorithm}'. Allowed algorithms: {sorted(_ALLOWED_ALGORITHMS)}"
        raise ValueError(msg)

    validate_secret_length(secret, label="JWT signing secret")
    self.secret = secret
    self.verify_key = verify_key if verify_key is not None else secret
    self.algorithm = algorithm
    self.lifetime = lifetime
    self.subject_decoder = subject_decoder
    self.issuer = issuer
    self._denylist_store: JWTDenylistStore = denylist_store or InMemoryJWTDenylistStore()
    # Security: always derive the fingerprint HMAC key from the signing secret
    # (kept private by the strategy), never from the public verify_key.
    fingerprint_key = self.secret.encode()
    self.session_fingerprint_getter = session_fingerprint_getter or _default_session_fingerprint(fingerprint_key)
    self.session_fingerprint_claim = session_fingerprint_claim

revocation_is_durable property

Return whether token revocation is backed by a shared store.

destroy_token(token, user) async

Revoke the given token by adding its jti to the denylist.

The denylist is maintained in memory on the strategy instance. Tokens without a jti claim, or tokens that fail to decode, are ignored.

Source code in litestar_auth/authentication/strategy/jwt.py
@override
async def destroy_token(self, token: str, user: UP) -> None:
    """Revoke the given token by adding its ``jti`` to the denylist.

    The denylist is maintained in memory on the strategy instance. Tokens
    without a ``jti`` claim, or tokens that fail to decode, are ignored.
    """
    del user

    try:
        payload = jwt.decode(
            token,
            self.verify_key,
            algorithms=[self.algorithm],
            audience=JWT_ACCESS_TOKEN_AUDIENCE,
            options={
                "verify_exp": False,
                "verify_iss": False,
            },
        )
    except InvalidTokenError:
        return

    jti = payload.get("jti")
    exp = payload.get("exp")
    if not isinstance(jti, str):
        return
    ttl_seconds = 1
    if isinstance(exp, int):
        ttl_seconds = max(exp - int(time.time()), 1)
    await self._denylist_store.deny(jti, ttl_seconds=ttl_seconds)

read_token(token, user_manager) async

Decode a JWT token and load its user.

Returns:

Type Description
UP | None

The matching user, or None when the token is invalid.

Source code in litestar_auth/authentication/strategy/jwt.py
@override
async def read_token(  # noqa: PLR0911
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> UP | None:
    """Decode a JWT token and load its user.

    Returns:
        The matching user, or ``None`` when the token is invalid.
    """
    if token is None:
        return None

    payload = self._decode_verified_access_token(token)
    if payload is None:
        logger.info("JWT decode failed (expired or invalid signature)")
        return None

    if await self._is_token_denied(payload):
        logger.info("JWT denied (revoked)")
        return None

    subject = payload.get("sub")
    if not isinstance(subject, str) or not subject:
        logger.info("JWT missing or invalid 'sub' claim")
        return None

    try:
        user_id = self.subject_decoder(subject) if self.subject_decoder is not None else subject
    except ValueError:
        logger.info("JWT subject could not be decoded: %s", subject)
        return None

    if user_id is None:
        return None

    user = await user_manager.get(cast("ID", user_id))
    if user is None:
        logger.info("JWT subject references non-existent user: %s", subject)
        return None

    if not self._validate_fingerprint(payload, user):
        logger.info("JWT fingerprint mismatch for user: %s", subject)
        return None

    return user

write_token(user) async

Generate a JWT token for the provided user.

Returns:

Type Description
str

The encoded JWT token string.

Source code in litestar_auth/authentication/strategy/jwt.py
@override
async def write_token(self, user: UP) -> str:
    """Generate a JWT token for the provided user.

    Returns:
        The encoded JWT token string.
    """
    issued_at = datetime.now(tz=UTC)
    payload = {
        "sub": str(user.id),
        "aud": JWT_ACCESS_TOKEN_AUDIENCE,
        "iat": issued_at,
        "nbf": issued_at,
        "exp": issued_at + self.lifetime,
        "jti": secrets.token_hex(16),
    }
    if self.issuer is not None:
        payload["iss"] = self.issuer

    fingerprint = self.session_fingerprint_getter(user)
    if fingerprint is not None:
        payload[self.session_fingerprint_claim] = fingerprint
    return jwt.encode(payload, self.secret, algorithm=self.algorithm)

RedisTokenStrategy(*, redis, token_hash_secret, lifetime=DEFAULT_LIFETIME, token_bytes=DEFAULT_TOKEN_BYTES, key_prefix=DEFAULT_KEY_PREFIX, subject_decoder=None, max_scan_keys=DEFAULT_MAX_SCAN_KEYS)

Bases: Strategy[UP, ID]

Stateful strategy that stores opaque tokens in Redis with TTL.

Initialize the strategy.

Parameters:

Name Type Description Default
redis RedisClientProtocol

Async Redis client compatible with redis.asyncio.Redis.

required
token_hash_secret str

High-entropy secret used for keyed token hashing (HMAC-SHA256).

required
lifetime timedelta

Token TTL applied to persisted keys.

DEFAULT_LIFETIME
token_bytes int

Number of random bytes used for token generation.

DEFAULT_TOKEN_BYTES
key_prefix str

Prefix used to namespace token keys in Redis.

DEFAULT_KEY_PREFIX
subject_decoder Callable[[str], ID] | None

Optional callable that converts the stored user id string into the identifier type expected by the user manager.

None
max_scan_keys int

Safety cap on keys examined during scan-based fallback invalidation. Prevents runaway iteration on large keyspaces.

DEFAULT_MAX_SCAN_KEYS

Raises:

Type Description
ConfigurationError

When token_hash_secret fails minimum-length requirements.

Source code in litestar_auth/authentication/strategy/redis.py
def __init__(  # noqa: PLR0913
    self,
    *,
    redis: RedisClientProtocol,
    token_hash_secret: str,
    lifetime: timedelta = DEFAULT_LIFETIME,
    token_bytes: int = DEFAULT_TOKEN_BYTES,
    key_prefix: str = DEFAULT_KEY_PREFIX,
    subject_decoder: Callable[[str], ID] | None = None,
    max_scan_keys: int = DEFAULT_MAX_SCAN_KEYS,
) -> None:
    """Initialize the strategy.

    Args:
        redis: Async Redis client compatible with ``redis.asyncio.Redis``.
        token_hash_secret: High-entropy secret used for keyed token hashing (HMAC-SHA256).
        lifetime: Token TTL applied to persisted keys.
        token_bytes: Number of random bytes used for token generation.
        key_prefix: Prefix used to namespace token keys in Redis.
        subject_decoder: Optional callable that converts the stored user id
            string into the identifier type expected by the user manager.
        max_scan_keys: Safety cap on keys examined during scan-based fallback
            invalidation.  Prevents runaway iteration on large keyspaces.

    Raises:
        ConfigurationError: When ``token_hash_secret`` fails minimum-length requirements.
    """
    _load_redis_asyncio()
    try:
        validate_secret_length(token_hash_secret, label="RedisTokenStrategy token_hash_secret")
    except ConfigurationError as exc:
        raise ConfigurationError(str(exc)) from exc

    self.redis = redis
    self._token_hash_secret = token_hash_secret.encode()
    self.lifetime = lifetime
    self.token_bytes = token_bytes
    self.key_prefix = key_prefix
    self.subject_decoder = subject_decoder
    self._max_scan_keys = max_scan_keys

destroy_token(token, user) async

Delete a persisted Redis token.

Source code in litestar_auth/authentication/strategy/redis.py
@override
async def destroy_token(self, token: str, user: UP) -> None:
    """Delete a persisted Redis token."""
    token_key = self._key(token)
    user_id = str(user.id)
    index_key = self._user_index_key(user_id)
    await self.redis.delete(token_key)
    await self.redis.srem(index_key, token_key)

invalidate_all_tokens(user) async

Delete all Redis-backed tokens associated with the given user.

This uses a per-user index to delete only the keys associated with the user, avoiding keyspace scans under the global prefix.

Backward compatibility

If the per-user index is missing, we fall back to a best-effort scan over keys matching the configured prefix and remove those whose stored subject matches user.id.

Source code in litestar_auth/authentication/strategy/redis.py
async def invalidate_all_tokens(self, user: UP) -> None:
    """Delete all Redis-backed tokens associated with the given user.

    This uses a per-user index to delete only the keys associated with the
    user, avoiding keyspace scans under the global prefix.

    Backward compatibility:
        If the per-user index is missing, we fall back to a best-effort scan
        over keys matching the configured prefix and remove those whose
        stored subject matches ``user.id``.
    """
    user_id = str(user.id)
    if await self._invalidate_via_index(user_id):
        return
    await self._invalidate_via_scan(user_id)

read_token(token, user_manager) async

Resolve a user from a Redis-backed token.

Returns:

Type Description
UP | None

The resolved user when the token exists and decodes successfully,

UP | None

otherwise None.

Source code in litestar_auth/authentication/strategy/redis.py
@override
async def read_token(
    self,
    token: str | None,
    user_manager: UserManagerProtocol[UP, ID],
) -> UP | None:
    """Resolve a user from a Redis-backed token.

    Returns:
        The resolved user when the token exists and decodes successfully,
        otherwise ``None``.
    """
    if token is None:
        return None

    stored_user_id = await self.redis.get(self._key(token))
    if stored_user_id is None:
        return None

    user_id_text = self._decode_user_id(stored_user_id)

    try:
        user_id = self.subject_decoder(user_id_text) if self.subject_decoder is not None else user_id_text
    except (TypeError, ValueError):
        return None

    return await user_manager.get(cast("ID", user_id))

write_token(user) async

Persist a new opaque token in Redis and return it.

Returns:

Type Description
str

Newly created opaque token string.

Source code in litestar_auth/authentication/strategy/redis.py
@override
async def write_token(self, user: UP) -> str:
    """Persist a new opaque token in Redis and return it.

    Returns:
        Newly created opaque token string.
    """
    token = secrets.token_urlsafe(self.token_bytes)
    token_key = self._key(token)
    user_id = str(user.id)
    await self.redis.setex(token_key, self._ttl_seconds, user_id)
    index_key = self._user_index_key(user_id)
    await self.redis.sadd(index_key, token_key)
    await self.redis.expire(index_key, self._ttl_seconds)
    return token

RefreshableStrategy

Bases: Protocol

Protocol for strategies that support refresh-token rotation.

Note

Refresh tokens are intentionally modeled as a separate lifecycle artifact from access tokens. In particular, Strategy.destroy_token() only targets the access token used for request authentication; refresh-token invalidation (if any) is managed by the refresh strategy itself.

rotate_refresh_token(refresh_token, user_manager) async

Consume a refresh token and return the user plus a rotated replacement.

Source code in litestar_auth/authentication/strategy/base.py
async def rotate_refresh_token(
    self,
    refresh_token: str,
    user_manager: UserManagerProtocol[UP, ID],
) -> tuple[UP, str] | None:
    """Consume a refresh token and return the user plus a rotated replacement."""

write_refresh_token(user) async

Issue a refresh token for the provided user.

Source code in litestar_auth/authentication/strategy/base.py
async def write_refresh_token(self, user: UP) -> str:
    """Issue a refresh token for the provided user."""

Strategy

Bases: ABC

Abstract base class for token storage and validation strategies.

destroy_token(token, user) abstractmethod async

Invalidate a token for the provided user.

Source code in litestar_auth/authentication/strategy/base.py
@abstractmethod
async def destroy_token(self, token: str, user: UP) -> None:
    """Invalidate a token for the provided user."""

read_token(token, user_manager) abstractmethod async

Resolve a user from a token.

Source code in litestar_auth/authentication/strategy/base.py
@abstractmethod
async def read_token(self, token: str | None, user_manager: UserManagerProtocol[UP, ID]) -> UP | None:
    """Resolve a user from a token."""

write_token(user) abstractmethod async

Issue a token for the provided user.

Source code in litestar_auth/authentication/strategy/base.py
@abstractmethod
async def write_token(self, user: UP) -> str:
    """Issue a token for the provided user."""

UserManagerProtocol

Bases: Protocol

Protocol for user manager lookups used by token strategies.

get(user_id) async

Return the user for the given identifier.

Source code in litestar_auth/authentication/strategy/base.py
async def get(self, user_id: ID) -> UP | None:
    """Return the user for the given identifier."""

import_token_orm_models()

Return the bundled token ORM models for compatibility and low-level imports.

Prefer litestar_auth.models.import_token_orm_models() for public explicit mapper registration. This module-level helper remains available for strategy-layer compatibility.

Source code in litestar_auth/authentication/strategy/db_models.py
def import_token_orm_models() -> tuple[type[AccessToken], type[RefreshToken]]:
    """Return the bundled token ORM models for compatibility and low-level imports.

    Prefer ``litestar_auth.models.import_token_orm_models()`` for public explicit mapper
    registration. This module-level helper remains available for strategy-layer compatibility.
    """
    return AccessToken, RefreshToken