Skip to content

TOTP

litestar_auth.totp

Time-based one-time password helpers.

InMemoryUsedTotpCodeStore(*, clock=time.monotonic, max_entries=50000)

Async-safe in-memory replay cache for successful TOTP verifications.

Not safe for multi-process or multi-host deployments; use :class:RedisUsedTotpCodeStore for shared storage (e.g. multi-worker or multi-pod).

Store the monotonic clock and initialize cache state.

Raises:

Type Description
ValueError

If max_entries is less than 1.

Source code in litestar_auth/totp.py
def __init__(
    self,
    *,
    clock: Callable[[], float] = time.monotonic,
    max_entries: int = 50_000,
) -> None:
    """Store the monotonic clock and initialize cache state.

    Raises:
        ValueError: If ``max_entries`` is less than 1.
    """
    if max_entries < 1:
        msg = "max_entries must be at least 1"
        raise ValueError(msg)

    self._clock = clock
    self.max_entries = max_entries
    self._entries: dict[tuple[Hashable, int], float] = {}
    self._lock = asyncio.Lock()

mark_used(user_id, counter, ttl_seconds) async

Store a used code pair until its TTL elapses.

Returns:

Type Description
bool

True when the pair was stored, otherwise False for a replay.

Source code in litestar_auth/totp.py
async def mark_used(self, user_id: Hashable, counter: int, ttl_seconds: float) -> bool:
    """Store a used code pair until its TTL elapses.

    Returns:
        ``True`` when the pair was stored, otherwise ``False`` for a replay.
    """
    async with self._lock:
        now = self._clock()
        self._prune(now)
        key = (user_id, counter)
        if key in self._entries:
            return False

        if len(self._entries) >= self.max_entries:
            self._prune(now)
            self._evict_oldest_until_below_cap()

        self._entries[key] = now + ttl_seconds
        return True

RedisUsedTotpCodeStore(*, redis, key_prefix=DEFAULT_TOTP_USED_KEY_PREFIX)

Redis-backed replay store for TOTP codes; safe for multi-worker and multi-pod deployments.

Store the Redis client and key prefix.

Parameters:

Name Type Description Default
redis RedisUsedTotpCodeStoreClient

Async Redis client supporting set(name, value, nx=True, px=ttl_ms) (e.g. redis.asyncio.Redis).

required
key_prefix str

Prefix for replay keys; keys are {key_prefix}{user_id}:{counter}.

DEFAULT_TOTP_USED_KEY_PREFIX
Source code in litestar_auth/totp.py
def __init__(
    self,
    *,
    redis: RedisUsedTotpCodeStoreClient,
    key_prefix: str = DEFAULT_TOTP_USED_KEY_PREFIX,
) -> None:
    """Store the Redis client and key prefix.

    Args:
        redis: Async Redis client supporting ``set(name, value, nx=True, px=ttl_ms)``
            (e.g. ``redis.asyncio.Redis``).
        key_prefix: Prefix for replay keys; keys are ``{key_prefix}{user_id}:{counter}``.
    """
    _load_redis_asyncio()
    self._redis = redis
    self._key_prefix = key_prefix

mark_used(user_id, counter, ttl_seconds) async

Atomically record a used (user_id, counter) pair via SET key 1 NX PX ttl_ms.

Returns:

Type Description
bool

True when the pair was newly stored, False for a replay.

Source code in litestar_auth/totp.py
async def mark_used(self, user_id: Hashable, counter: int, ttl_seconds: float) -> bool:
    """Atomically record a used (user_id, counter) pair via SET key 1 NX PX ttl_ms.

    Returns:
        ``True`` when the pair was newly stored, ``False`` for a replay.
    """
    key = self._key(user_id, counter)
    ttl_ms = int(ttl_seconds * 1000)
    result = await self._redis.set(key, "1", nx=True, px=ttl_ms)
    return result is True

RedisUsedTotpCodeStoreClient

Bases: Protocol

Minimal Redis client interface for TOTP replay store (SET key value NX PX ttl_ms).

set(name, value, *, nx=False, px=None) async

Set a key; with nx=True only set if not exists; px is TTL in milliseconds.

Returns:

Type Description
bool | None

True when the key was set, False when nx=True and key already existed.

Source code in litestar_auth/totp.py
async def set(
    self,
    name: str,
    value: str,
    *,
    nx: bool = False,
    px: int | None = None,
) -> bool | None:
    """Set a key; with ``nx=True`` only set if not exists; ``px`` is TTL in milliseconds.

    Returns:
        ``True`` when the key was set, ``False`` when ``nx=True`` and key already existed.
    """

SecurityWarning

Bases: UserWarning

Warning emitted for security-sensitive insecure defaults (TOTP, plugin startup, etc.).

UsedTotpCodeStore

Bases: Protocol

Persistence for used TOTP codes keyed by user and counter.

mark_used(user_id, counter, ttl_seconds) async

Atomically record a (user_id, counter) pair when unused.

Returns:

Type Description
bool

True when the pair was newly stored, otherwise False.

Source code in litestar_auth/totp.py
async def mark_used(self, user_id: Hashable, counter: int, ttl_seconds: float) -> bool:
    """Atomically record a `(user_id, counter)` pair when unused.

    Returns:
        ``True`` when the pair was newly stored, otherwise ``False``.
    """

generate_totp_secret(algorithm=TOTP_ALGORITHM)

Generate a base32-encoded TOTP secret sized to the algorithm's HMAC output.

Per RFC 4226 Section 4, the shared secret length should match the HMAC output length: 20 bytes for SHA-1, 32 bytes for SHA-256, 64 bytes for SHA-512. The SECRET_BYTES constant is retained for backward compatibility but is no longer the sole source of truth.

Parameters:

Name Type Description Default
algorithm TotpAlgorithm

TOTP hash algorithm; determines secret byte length.

TOTP_ALGORITHM

Returns:

Type Description
str

A random base32 secret without RFC padding.

Source code in litestar_auth/totp.py
def generate_totp_secret(algorithm: TotpAlgorithm = TOTP_ALGORITHM) -> str:
    """Generate a base32-encoded TOTP secret sized to the algorithm's HMAC output.

    Per RFC 4226 Section 4, the shared secret length should match the HMAC
    output length: 20 bytes for SHA-1, 32 bytes for SHA-256, 64 bytes for
    SHA-512.  The ``SECRET_BYTES`` constant is retained for backward
    compatibility but is no longer the sole source of truth.

    Args:
        algorithm: TOTP hash algorithm; determines secret byte length.

    Returns:
        A random base32 secret without RFC padding.
    """
    secret_bytes = _SECRET_BYTES_BY_ALGORITHM.get(algorithm, SECRET_BYTES)
    random_bytes = secrets.token_bytes(secret_bytes)
    return base64.b32encode(random_bytes).decode("ascii").rstrip("=")

generate_totp_uri(secret, email, issuer, *, algorithm=TOTP_ALGORITHM)

Build an otpauth URI suitable for QR-code generation.

Returns:

Type Description
str

An otpauth:// URI for authenticator apps.

Source code in litestar_auth/totp.py
def generate_totp_uri(
    secret: str,
    email: str,
    issuer: str,
    *,
    algorithm: TotpAlgorithm = TOTP_ALGORITHM,
) -> str:
    """Build an otpauth URI suitable for QR-code generation.

    Returns:
        An ``otpauth://`` URI for authenticator apps.
    """
    label = quote(f"{issuer}:{email}")
    query_params: dict[str, str] = {
        "secret": secret,
        "issuer": issuer,
        "digits": str(TOTP_DIGITS),
        "period": str(TIME_STEP_SECONDS),
        "algorithm": algorithm,
    }
    query = urlencode(query_params)
    return f"otpauth://totp/{label}?{query}"

verify_totp(secret, code, *, algorithm=TOTP_ALGORITHM)

Validate a TOTP code for the current time window only.

Returns:

Type Description
bool

True when the code matches the current time step, otherwise False.

Source code in litestar_auth/totp.py
def verify_totp(secret: str, code: str, *, algorithm: TotpAlgorithm = TOTP_ALGORITHM) -> bool:
    """Validate a TOTP code for the current time window only.

    Returns:
        ``True`` when the code matches the current time step, otherwise ``False``.
    """
    return _verify_totp_counter(secret, code, algorithm=algorithm) is not None

verify_totp_with_store(secret, code, *, user_id, used_tokens_store=None, algorithm=TOTP_ALGORITHM, require_replay_protection=True) async

Validate a TOTP code and optionally reject same-window replays.

Returns:

Type Description
bool

True when the code is valid and has not already been used for user_id.

Raises:

Type Description
ConfigurationError

If require_replay_protection=True and no replay store is configured outside testing mode.

Source code in litestar_auth/totp.py
async def verify_totp_with_store(  # noqa: PLR0913
    secret: str,
    code: str,
    *,
    user_id: Hashable,
    used_tokens_store: UsedTotpCodeStore | None = None,
    algorithm: TotpAlgorithm = TOTP_ALGORITHM,
    require_replay_protection: bool = True,
) -> bool:
    """Validate a TOTP code and optionally reject same-window replays.

    Returns:
        ``True`` when the code is valid and has not already been used for ``user_id``.

    Raises:
        ConfigurationError: If ``require_replay_protection=True`` and no replay store is configured
            outside testing mode.
    """
    counter = _verify_totp_counter(secret, code, algorithm=algorithm)
    if counter is None:
        logger.warning("TOTP verification failed.", extra={"event": "totp_failed", "user_id": str(user_id)})
        return False

    if used_tokens_store is None:
        if require_replay_protection and not is_testing():
            msg = "TOTP replay protection is required in production. Configure a UsedTotpCodeStore."
            raise ConfigurationError(msg)
        warnings.warn(
            "TOTP replay protection is DISABLED because used_tokens_store=None.",
            SecurityWarning,
            stacklevel=2,
        )
        return True

    accepted = await used_tokens_store.mark_used(user_id, counter, USED_TOTP_CODE_TTL_SECONDS)
    if not accepted:
        logger.warning("TOTP replay detected.", extra={"event": "totp_replay", "user_id": str(user_id)})
        return False
    return True

litestar_auth.totp_flow

TOTP login-flow orchestration for pending-token issue and verification.

InvalidTotpCodeError

Bases: Exception

Raised when a TOTP code cannot complete the pending login flow.

InvalidTotpPendingTokenError

Bases: Exception

Raised when a pending TOTP token is invalid or expired.

PendingTotpLogin(user, pending_jti, expires_at) dataclass

Decoded pending-login state required to finish a TOTP handshake.

TotpFlowUserManagerProtocol

Bases: Protocol

User-manager behavior required by TOTP login-flow orchestration.

get(user_id) async

Return the user for the given identifier.

Source code in litestar_auth/totp_flow.py
async def get(self, user_id: ID) -> UP | None:
    """Return the user for the given identifier."""

read_totp_secret(secret) async

Return a plain-text TOTP secret from storage.

Source code in litestar_auth/totp_flow.py
async def read_totp_secret(self, secret: str | None) -> str | None:
    """Return a plain-text TOTP secret from storage."""

TotpLoginFlowService(*, user_manager, totp_pending_secret, totp_pending_lifetime=_DEFAULT_PENDING_TOKEN_LIFETIME, totp_algorithm='SHA256', require_replay_protection=True, used_tokens_store=None, pending_jti_store=None, id_parser=None)

Issue and verify pending TOTP login challenges.

Bind the dependencies used by the pending-login handshake.

Source code in litestar_auth/totp_flow.py
def __init__(  # noqa: PLR0913
    self,
    *,
    user_manager: TotpFlowUserManagerProtocol[UP, ID],
    totp_pending_secret: str,
    totp_pending_lifetime: timedelta = _DEFAULT_PENDING_TOKEN_LIFETIME,
    totp_algorithm: TotpAlgorithm = "SHA256",
    require_replay_protection: bool = True,
    used_tokens_store: UsedTotpCodeStore | None = None,
    pending_jti_store: JWTDenylistStore | None = None,
    id_parser: Callable[[str], ID] | None = None,
) -> None:
    """Bind the dependencies used by the pending-login handshake."""
    self._user_manager = user_manager
    self._totp_pending_secret = totp_pending_secret
    self._totp_pending_lifetime = totp_pending_lifetime
    self._totp_algorithm = totp_algorithm
    self._require_replay_protection = require_replay_protection
    self._used_tokens_store = used_tokens_store
    self._pending_jti_store = pending_jti_store
    self._id_parser = id_parser

authenticate_pending_login(*, pending_token, code, validate_user=None) async

Validate a pending-login token plus TOTP code and return the resolved user.

Returns:

Type Description
UP

The user resolved from the verified pending-login challenge.

Raises:

Type Description
InvalidTotpCodeError

If the TOTP code is invalid or TOTP is not enabled.

Source code in litestar_auth/totp_flow.py
async def authenticate_pending_login(
    self,
    *,
    pending_token: str,
    code: str,
    validate_user: PendingUserValidator[UP] | None = None,
) -> UP:
    """Validate a pending-login token plus TOTP code and return the resolved user.

    Returns:
        The user resolved from the verified pending-login challenge.

    Raises:
        InvalidTotpCodeError: If the TOTP code is invalid or TOTP is not enabled.
    """
    pending_login = await self._resolve_pending_login(pending_token)
    if validate_user is not None:
        await validate_user(pending_login.user)
    secret = await self._user_manager.read_totp_secret(pending_login.user.totp_secret)
    if not secret or not await verify_totp_with_store(
        secret,
        code,
        user_id=pending_login.user.id,
        used_tokens_store=self._used_tokens_store,
        algorithm=self._totp_algorithm,
        require_replay_protection=self._require_replay_protection,
    ):
        raise InvalidTotpCodeError
    await self._deny_pending_login(pending_login)
    return pending_login.user

issue_pending_token(user) async

Return a pending-login JWT when the user has TOTP enabled.

Source code in litestar_auth/totp_flow.py
async def issue_pending_token(self, user: UP) -> str | None:
    """Return a pending-login JWT when the user has TOTP enabled."""
    if await self._user_manager.read_totp_secret(user.totp_secret) is None:
        return None
    issued_at = datetime.now(tz=UTC)
    payload = {
        "sub": str(user.id),
        "aud": TOTP_PENDING_AUDIENCE,
        "iat": issued_at,
        "nbf": issued_at,
        "exp": issued_at + self._totp_pending_lifetime,
        "jti": secrets.token_hex(16),
    }
    return jwt.encode(payload, self._totp_pending_secret, algorithm="HS256")