Skip to content

TOTP

Time-based one-time passwords in litestar-auth split into a low-level crypto and replay layer and a login-flow orchestration layer.

litestar_auth.totp holds the primitives: generating secrets and otpauth URIs (generate_totp_secret, generate_totp_uri), verifying a code against a stored secret (verify_totp, verify_totp_with_store), the UsedTotpCodeStore protocol used to reject replay of successfully verified codes, and the TotpEnrollmentStore protocol used to keep pending enrollment secrets server-side. TOTP helpers support SHA256 and SHA512 algorithms. Built-in replay stores are InMemoryUsedTotpCodeStore (async-safe, single-process) and RedisUsedTotpCodeStore (shared Redis SET … NX semantics for multi-worker deployments). Built-in enrollment stores are InMemoryTotpEnrollmentStore (tests/single-process only) and RedisTotpEnrollmentStore (shared latest-only, single-use pending enrollment state). Choose stores that match your durability and scaling needs; production setups typically wire Redis through plugin configuration (see TOTP guide).

litestar_auth.totp_flow builds on those primitives for pending-login challenges: TotpLoginFlowService issues short-lived pending JWTs and finishes login after a valid TOTP code, using verify_totp_with_store and optional UsedTotpCodeStore / denylist wiring. Construct it with TotpLoginFlowConfig so pending-token signing, replay stores, JTI denylisting, ID parsing, and testing posture are declared as one typed configuration object. That path complements the HTTP controllers enrollment and verify routes.

Enrollment is intentionally two-phase: first enable (receive secret, otpauth material, and a short-lived enrollment token while the secret is kept in TotpEnrollmentStore, not in the JWT), then confirm with a valid code so the secret is stored—mirroring the route flow documented in TOTP (two-factor authentication). Verification (during login or disable flows) checks the current code and relies on replay protection when configured.

Generated recovery codes are 28 lowercase hex characters (112 bits). They are returned only from confirm-enable or regenerate responses and are stored as HMAC lookup digests mapped to Argon2 hashes by the manager/store surface.

Persisted secret encryption

Persisted user-row TOTP secrets are owned by BaseUserManager, not by the low-level litestar_auth.totp primitives. Configure UserManagerSecurity.totp_secret_keyring with FernetKeyringConfig(active_key_id=..., keys=...) for production. Stored non-null values use the fernet:v1:<key_id>:<ciphertext> envelope, and plaintext persisted rows fail closed.

Rotation is intentionally explicit. BaseUserManager.totp_secret_requires_reencrypt(value) checks whether one stored value uses a non-active configured key id, and BaseUserManager.reencrypt_totp_secret_for_storage(value) rewrites that one value with the active key. Operators must scan and update their own persisted rows, verify that no value still requires rotation, and then retire old key ids. Legacy unversioned Fernet rows need an explicit old-key migration path because the stored value has no key id.

Replay store contract (UsedTotpCodeStore and UsedTotpMarkResult)

Call verify_totp_with_store(secret, code, replay=TotpReplayProtection(...)) when a verified code must also be recorded in a replay store. TotpReplayProtection groups the user id, optional UsedTotpCodeStore, production replay requirement, and explicit testing override for that one verification attempt.

Custom implementations of UsedTotpCodeStore must implement mark_used(user_id, counter, ttl_seconds) and return UsedTotpMarkResult, not a bare boolean. The result tells callers whether the (user_id, counter) pair was newly recorded and, when it was not, why verification should fail:

stored rejected_as_replay Meaning
True (ignored) The pair was newly recorded; verification succeeds when the cryptographic check already passed.
False True The pair was already present—replay of a successfully verified code in the TTL window.
False False The store rejected the insert for a non-replay reason and verification fails closed. The built-in InMemoryUsedTotpCodeStore uses this when capacity is exhausted (no expired entries left to prune); RedisUsedTotpCodeStore does not use this path (a missed SET NX implies an existing key, i.e. replay).

verify_totp_with_store() uses that contract: on stored=False it logs totp_replay when rejected_as_replay=True, and totp_replay_store_capacity when rejected_as_replay=False, so operators can tell true replay from fail-closed store pressure. Authentication still returns False in both cases.

litestar_auth.totp

Time-based one-time password helpers.

SecurityWarning

Bases: UserWarning

Warning emitted for security-sensitive insecure defaults (TOTP, plugin startup, etc.).

TotpRecoveryCodeUserManager

Bases: Protocol

User-manager behavior required to verify and consume TOTP recovery codes.

recovery_code_lookup_secret property

Return the HMAC lookup key for recovery-code verification.

consume_recovery_code_by_lookup(user, lookup_hex) async

Atomically consume the active recovery-code entry for lookup_hex.

Concurrent callers presenting the same recovery code MUST observe exactly one success and N-1 failures.

Source code in litestar_auth/totp.py
async def consume_recovery_code_by_lookup(self, user: UP, lookup_hex: str) -> bool:
    """Atomically consume the active recovery-code entry for ``lookup_hex``.

    Concurrent callers presenting the same recovery code MUST observe
    exactly one success and N-1 failures.
    """

find_recovery_code_hash_by_lookup(user, lookup_hex) async

Return the Argon2 hash matching lookup_hex, if active.

Source code in litestar_auth/totp.py
async def find_recovery_code_hash_by_lookup(self, user: UP, lookup_hex: str) -> str | None:
    """Return the Argon2 hash matching ``lookup_hex``, if active."""

build_recovery_code_index(codes, *, lookup_secret, password_helper=None)

Build a keyed lookup index for TOTP recovery-code hashes.

Returns:

Type Description
dict[str, str]

Mapping of HMAC-SHA-256 lookup hex digests to Argon2 hashes.

Source code in litestar_auth/totp.py
def build_recovery_code_index(
    codes: tuple[str, ...],
    *,
    lookup_secret: bytes,
    password_helper: PasswordHelper | None = None,
) -> dict[str, str]:
    """Build a keyed lookup index for TOTP recovery-code hashes.

    Returns:
        Mapping of HMAC-SHA-256 lookup hex digests to Argon2 hashes.
    """
    helper = password_helper or PasswordHelper.from_defaults()
    return {
        _recovery_code_lookup_hex(code, lookup_secret=lookup_secret): helper.hash(code.casefold()) for code in codes
    }

generate_totp_recovery_codes(*, count=DEFAULT_TOTP_RECOVERY_CODE_COUNT)

Return distinct plaintext recovery codes for a TOTP enrollment.

Returns:

Type Description
tuple[str, ...]

A tuple of unique 112-bit hex recovery codes.

Raises:

Type Description
ValueError

If count is negative.

Source code in litestar_auth/totp.py
def generate_totp_recovery_codes(*, count: int = DEFAULT_TOTP_RECOVERY_CODE_COUNT) -> tuple[str, ...]:
    """Return distinct plaintext recovery codes for a TOTP enrollment.

    Returns:
        A tuple of unique 112-bit hex recovery codes.

    Raises:
        ValueError: If ``count`` is negative.
    """
    if count < 0:
        msg = "Recovery-code count cannot be negative."
        raise ValueError(msg)

    codes: set[str] = set()
    while len(codes) < count:
        codes.add(secrets.token_hex(TOTP_RECOVERY_CODE_HEX_BYTES))
    return tuple(codes)

generate_totp_secret(algorithm=TOTP_ALGORITHM)

Generate a base32-encoded TOTP secret sized to the algorithm's HMAC output.

Per RFC 4226 Section 4, the shared secret length should match the HMAC output length: 32 bytes for SHA-256 or 64 bytes for SHA-512.

Parameters:

Name Type Description Default
algorithm TotpAlgorithm

TOTP hash algorithm; determines secret byte length.

TOTP_ALGORITHM

Returns:

Type Description
str

A random base32 secret without RFC padding.

Source code in litestar_auth/totp.py
def generate_totp_secret(algorithm: TotpAlgorithm = TOTP_ALGORITHM) -> str:
    """Generate a base32-encoded TOTP secret sized to the algorithm's HMAC output.

    Per RFC 4226 Section 4, the shared secret length should match the HMAC
    output length: 32 bytes for SHA-256 or 64 bytes for SHA-512.

    Args:
        algorithm: TOTP hash algorithm; determines secret byte length.

    Returns:
        A random base32 secret without RFC padding.
    """
    secret_bytes = _SECRET_BYTES_BY_ALGORITHM[_validate_totp_algorithm(algorithm)]
    random_bytes = secrets.token_bytes(secret_bytes)
    return base64.b32encode(random_bytes).decode("ascii").rstrip("=")

generate_totp_uri(secret, email, issuer, *, algorithm=TOTP_ALGORITHM)

Build an otpauth URI suitable for QR-code generation.

Returns:

Type Description
str

An otpauth:// URI for authenticator apps.

Source code in litestar_auth/totp.py
def generate_totp_uri(
    secret: str,
    email: str,
    issuer: str,
    *,
    algorithm: TotpAlgorithm = TOTP_ALGORITHM,
) -> str:
    """Build an otpauth URI suitable for QR-code generation.

    Returns:
        An ``otpauth://`` URI for authenticator apps.
    """
    algorithm = _validate_totp_algorithm(algorithm)
    label = quote(f"{issuer}:{email}")
    query_params: dict[str, str] = {
        "secret": secret,
        "issuer": issuer,
        "digits": str(TOTP_DIGITS),
        "period": str(TIME_STEP_SECONDS),
        "algorithm": algorithm,
    }
    query = urlencode(query_params)
    return f"otpauth://totp/{label}?{query}"

verify_totp(secret, code, *, algorithm=TOTP_ALGORITHM)

Validate a TOTP code for the current time window only.

Returns:

Type Description
bool

True when the code matches the current time step, otherwise False.

Source code in litestar_auth/totp.py
def verify_totp(secret: str, code: str, *, algorithm: TotpAlgorithm = TOTP_ALGORITHM) -> bool:
    """Validate a TOTP code for the current time window only.

    Returns:
        ``True`` when the code matches the current time step, otherwise ``False``.
    """
    return _verify_totp_counter(secret, code, algorithm=algorithm) is not None

verify_totp_with_store(secret, code, *, replay, algorithm=TOTP_ALGORITHM) async

Validate a TOTP code and optionally reject same-window replays.

Returns:

Type Description
bool

True when the code is valid and has not already been used for replay.user_id.

Raises:

Type Description
ConfigurationError

If replay protection is required and no replay store is configured outside testing mode.

Source code in litestar_auth/totp.py
async def verify_totp_with_store(
    secret: str,
    code: str,
    *,
    replay: TotpReplayProtection,
    algorithm: TotpAlgorithm = TOTP_ALGORITHM,
) -> bool:
    """Validate a TOTP code and optionally reject same-window replays.

    Returns:
        ``True`` when the code is valid and has not already been used for ``replay.user_id``.

    Raises:
        ConfigurationError: If replay protection is required and no replay store is configured
            outside testing mode.
    """
    counter = _verify_totp_counter(secret, code, algorithm=algorithm)
    if counter is None:
        logger.warning("TOTP verification failed.", extra={"event": "totp_failed", "user_id": str(replay.user_id)})
        return False

    if replay.used_tokens_store is None:
        if replay.require_replay_protection and not replay.unsafe_testing:
            msg = "TOTP replay protection is required in production. Configure a UsedTotpCodeStore."
            raise ConfigurationError(msg)
        warnings.warn(
            "TOTP replay protection is DISABLED because used_tokens_store=None.",
            SecurityWarning,
            stacklevel=2,
        )
        return True

    mark_result = await replay.used_tokens_store.mark_used(replay.user_id, counter, USED_TOTP_CODE_TTL_SECONDS)
    if mark_result.stored:
        return True
    if mark_result.rejected_as_replay:
        logger.warning("TOTP replay detected.", extra={"event": "totp_replay", "user_id": str(replay.user_id)})
    else:
        logger.warning(
            "TOTP used-code store rejected verification under capacity pressure (fail closed).",
            extra={"event": "totp_replay_store_capacity", "user_id": str(replay.user_id)},
        )
    return False

litestar_auth.totp_flow

TOTP login-flow orchestration for pending-token issue and verification.

CompletedTotpLogin(user, used_recovery_code) dataclass

Verified pending-login result plus the factor type used.

InvalidTotpCodeError

Bases: Exception

Raised when a TOTP code cannot complete the pending login flow.

InvalidTotpPendingTokenError

Bases: Exception

Raised when a pending TOTP token is invalid or expired.

PendingTotpClientBinding(client_ip_fingerprint, user_agent_fingerprint) dataclass

Fingerprints binding a pending TOTP token to the issuing client.

PendingTotpLogin(user, pending_jti, expires_at) dataclass

Decoded pending-login state required to finish a TOTP handshake.

TotpFlowUserManagerProtocol

Bases: Protocol

User-manager behavior required by TOTP login-flow orchestration.

recovery_code_lookup_secret property

Return the HMAC lookup key for recovery-code verification.

consume_recovery_code_by_lookup(user, lookup_hex) async

Atomically consume a matched recovery-code lookup entry.

Source code in litestar_auth/totp_flow.py
async def consume_recovery_code_by_lookup(self, user: UP, lookup_hex: str) -> bool:
    """Atomically consume a matched recovery-code lookup entry."""

find_recovery_code_hash_by_lookup(user, lookup_hex) async

Return the active recovery-code hash matching lookup_hex.

Source code in litestar_auth/totp_flow.py
async def find_recovery_code_hash_by_lookup(self, user: UP, lookup_hex: str) -> str | None:
    """Return the active recovery-code hash matching ``lookup_hex``."""

get(user_id) async

Return the user for the given identifier.

Source code in litestar_auth/totp_flow.py
async def get(self, user_id: ID) -> UP | None:
    """Return the user for the given identifier."""

read_totp_secret(secret) async

Return a plain-text TOTP secret from storage.

Source code in litestar_auth/totp_flow.py
async def read_totp_secret(self, secret: str | None) -> str | None:
    """Return a plain-text TOTP secret from storage."""

TotpLoginFlowConfig(totp_pending_secret, totp_pending_lifetime=_DEFAULT_PENDING_TOKEN_LIFETIME, totp_algorithm='SHA256', require_replay_protection=True, used_tokens_store=None, pending_jti_store=None, id_parser=None, require_client_binding=True, unsafe_testing=False) dataclass

Configuration for pending-login TOTP issue and verification.

TotpLoginFlowService(*, user_manager, config)

Issue and verify pending TOTP login challenges.

Pending-login JWTs are single-use through JTI denial and, by default, carry hashed client-IP and User-Agent fingerprints. Verification accepts a current TOTP code first, then falls back to a one-time recovery code without changing the public wrong-code response shape.

Bind the dependencies used by the pending-login handshake.

Source code in litestar_auth/totp_flow.py
def __init__(
    self,
    *,
    user_manager: TotpFlowUserManagerProtocol[UP, ID],
    config: TotpLoginFlowConfig[ID],
) -> None:
    """Bind the dependencies used by the pending-login handshake."""
    self._user_manager = user_manager
    self._totp_pending_secret = config.totp_pending_secret
    self._totp_pending_lifetime = config.totp_pending_lifetime
    self._totp_algorithm = config.totp_algorithm
    self._require_replay_protection = config.require_replay_protection
    self._used_tokens_store = config.used_tokens_store
    self._pending_jti_store = config.pending_jti_store
    self._id_parser = config.id_parser
    self._require_client_binding = config.require_client_binding
    self._unsafe_testing = config.unsafe_testing
    self._password_helper = PasswordHelper.from_defaults()
    self._pending_jti_warning_state = _PendingJtiWarningState()

authenticate_pending_login(*, pending_token, code, client_binding=None, validate_user=None) async

Validate a pending-login token plus TOTP or recovery code.

Returns:

Type Description
UP

The user resolved from the verified pending-login challenge.

Invalid pending-token failures propagate as InvalidTotpPendingTokenError from token resolution before any code fallback runs.

Source code in litestar_auth/totp_flow.py
async def authenticate_pending_login(
    self,
    *,
    pending_token: str,
    code: str,
    client_binding: PendingTotpClientBinding | None = None,
    validate_user: PendingUserValidator[UP] | None = None,
) -> UP:
    """Validate a pending-login token plus TOTP or recovery code.

    Returns:
        The user resolved from the verified pending-login challenge.

    Invalid pending-token failures propagate as
    ``InvalidTotpPendingTokenError`` from token resolution before any code
    fallback runs.
    """
    completed = await self.authenticate_pending_login_with_method(
        pending_token=pending_token,
        code=code,
        client_binding=client_binding,
        validate_user=validate_user,
    )
    return completed.user

authenticate_pending_login_with_method(*, pending_token, code, client_binding=None, validate_user=None) async

Validate a pending-login token and return whether a recovery code was used.

Returns:

Type Description
CompletedTotpLogin[UP]

Completed login result with user and factor-type metadata.

Raises:

Type Description
InvalidTotpCodeError

If the TOTP/recovery code is invalid, already consumed, or TOTP is not enabled.

Source code in litestar_auth/totp_flow.py
async def authenticate_pending_login_with_method(
    self,
    *,
    pending_token: str,
    code: str,
    client_binding: PendingTotpClientBinding | None = None,
    validate_user: PendingUserValidator[UP] | None = None,
) -> CompletedTotpLogin[UP]:
    """Validate a pending-login token and return whether a recovery code was used.

    Returns:
        Completed login result with user and factor-type metadata.

    Raises:
        InvalidTotpCodeError: If the TOTP/recovery code is invalid,
            already consumed, or TOTP is not enabled.
    """
    pending_login = await self._resolve_pending_login(pending_token, client_binding=client_binding)
    if validate_user is not None:
        await validate_user(pending_login.user)
    secret = await self._user_manager.read_totp_secret(pending_login.user.totp_secret)
    if not secret:
        raise InvalidTotpCodeError
    totp_verified = await verify_totp_with_store(
        secret,
        code,
        replay=TotpReplayProtection(
            user_id=pending_login.user.id,
            used_tokens_store=self._used_tokens_store,
            require_replay_protection=self._require_replay_protection,
            unsafe_testing=self._unsafe_testing,
        ),
        algorithm=self._totp_algorithm,
    )
    recovery_code_verified = False
    if not totp_verified:
        recovery_code_verified = await _consume_matching_recovery_code(
            self._user_manager,
            pending_login.user,
            code,
            password_helper=self._password_helper,
        )
    if not totp_verified and not recovery_code_verified:
        raise InvalidTotpCodeError
    await self._deny_pending_login(pending_login)
    return CompletedTotpLogin(user=pending_login.user, used_recovery_code=recovery_code_verified)

issue_pending_token(user, *, client_binding=None) async

Return a pending-login JWT when the user has TOTP enabled.

When client binding is required, client_binding supplies the hashed cip and uaf claims that /verify must match.

Raises:

Type Description
InvalidTotpPendingTokenError

If client binding is required but unavailable.

Source code in litestar_auth/totp_flow.py
async def issue_pending_token(
    self,
    user: UP,
    *,
    client_binding: PendingTotpClientBinding | None = None,
) -> str | None:
    """Return a pending-login JWT when the user has TOTP enabled.

    When client binding is required, ``client_binding`` supplies the hashed
    ``cip`` and ``uaf`` claims that `/verify` must match.

    Raises:
        InvalidTotpPendingTokenError: If client binding is required but unavailable.
    """
    if await self._user_manager.read_totp_secret(user.totp_secret) is None:
        return None
    issued_at = datetime.now(tz=UTC)
    payload = {
        "sub": str(user.id),
        "aud": TOTP_PENDING_AUDIENCE,
        "iat": issued_at,
        "nbf": issued_at,
        "exp": issued_at + self._totp_pending_lifetime,
        "jti": secrets.token_hex(16),
    }
    if self._require_client_binding:
        if client_binding is None:
            raise InvalidTotpPendingTokenError
        payload[_CLIENT_IP_FINGERPRINT_CLAIM] = client_binding.client_ip_fingerprint
        payload[_USER_AGENT_FINGERPRINT_CLAIM] = client_binding.user_agent_fingerprint
    return jwt.encode(payload, self._totp_pending_secret, algorithm="HS256", headers=jwt_encode_headers())

build_pending_totp_client_binding(request, *, pending_secret, trusted_proxy=False, trusted_headers=('X-Forwarded-For',))

Return keyed client-IP and User-Agent fingerprints for a TOTP pending token.

Source code in litestar_auth/totp_flow.py
def build_pending_totp_client_binding(
    request: Request[Any, Any, Any],
    *,
    pending_secret: str,
    trusted_proxy: bool = False,
    trusted_headers: tuple[str, ...] = ("X-Forwarded-For",),
) -> PendingTotpClientBinding:
    """Return keyed client-IP and User-Agent fingerprints for a TOTP pending token."""
    client_ip = _client_host(request, trusted_proxy=trusted_proxy, trusted_headers=trusted_headers)
    user_agent = (request.headers.get("User-Agent") or request.headers.get("user-agent") or "")[
        :_USER_AGENT_FINGERPRINT_MAX_BYTES
    ]
    key = pending_secret.encode()
    return PendingTotpClientBinding(
        client_ip_fingerprint=_fingerprint_client_binding_value(client_ip, key=key),
        user_agent_fingerprint=_fingerprint_client_binding_value(user_agent, key=key),
    )