Skip to content

API Reference

The root package __all__ is the single source for stable root-level exports. Prefer from fast_healthchecks import Probe, run_probe, healthcheck_shutdown, HealthCheckReport, HealthCheckResult, Check, FunctionConfig and the exception hierarchy (HealthCheckError, HealthCheckTimeoutError, HealthCheckSSRFError). Check classes (e.g. RedisHealthCheck) and other configs are available from fast_healthchecks.checks or their submodules (e.g. fast_healthchecks.checks.redis). See fast_healthchecks.__all__ and fast_healthchecks.checks.__all__.

Config types (e.g. RedisConfig, UrlConfig) in fast_healthchecks.checks.configs are part of the supported API for passing config=... to check constructors. The to_dict() methods on check classes are for internal test use only and are not part of the supported public API; do not rely on them in production code.

Secrets and redaction: Check and config to_dict(redact_secrets=True) redacts credential-style keys (same set as in fast_healthchecks.utils). The structured logging layer used by run_probe does not log config or secrets; see fast_healthchecks.logging and the run_probe docs.

Exception hierarchy (public)

The following exceptions are part of the public API and are documented for callers who want to handle them explicitly. Existing code that catches asyncio.TimeoutError or ValueError continues to work, because the new types subclass those.

  • HealthCheckError — Base for health-check-related exceptions.
  • HealthCheckTimeoutError — Raised when a probe or check run times out. Subclass of HealthCheckError and asyncio.TimeoutError.
  • HealthCheckSSRFError — Raised when URL/host SSRF validation fails (e.g. validate_url_ssrf, validate_host_ssrf_async). Subclass of HealthCheckError and ValueError. See SSRF documentation for behaviour and edge cases.

Structured Error Reporting with HealthError

For programmatic error handling and detailed diagnostics, use the HealthError dataclass which provides structured error information:

Machine-readable error details for failed health checks.

Attributes:

Name Type Description
code str

Machine-readable error code (e.g., "PROBE_TIMEOUT", "CHECK_EXCEPTION").

message str

Human-readable error message describing what went wrong.

duration_ms int

Time in milliseconds that the probe took to execute.

timeout_ms int | None

Timeout limit in milliseconds that was applied to the probe.

meta dict[str, object]

Additional metadata about the error (e.g., exception type, URL). Secrets are automatically redacted from this field.

Source code in fast_healthchecks/models.py
@dataclass(frozen=True)
class HealthError:
    """Machine-readable error details for failed health checks.

    Attributes:
        code: Machine-readable error code (e.g., "PROBE_TIMEOUT", "CHECK_EXCEPTION").
        message: Human-readable error message describing what went wrong.
        duration_ms: Time in milliseconds that the probe took to execute.
        timeout_ms: Timeout limit in milliseconds that was applied to the probe.
        meta: Additional metadata about the error (e.g., exception type, URL).
            Secrets are automatically redacted from this field.
    """

    code: str
    message: str
    duration_ms: int = 0
    timeout_ms: int | None = None
    meta: dict[str, object] = field(default_factory=dict)

Error Codes:

Code Description
CHECK_TIMEOUT Overall check execution exceeded timeout
PROBE_TIMEOUT Individual probe exceeded its timeout
CHECK_EXCEPTION Check raised an unexpected exception
DEPENDENCY_UNHEALTHY A dependency was marked unhealthy

Troubleshooting: Legacy Exception Mapping

The following table maps legacy exception classes to their corresponding ErrorCode values:

Legacy Exception ErrorCode Notes
HealthCheckTimeoutError CHECK_TIMEOUT Overall check execution timeout
asyncio.TimeoutError CHECK_TIMEOUT Caught at runner level
HealthCheckSSRFError CHECK_EXCEPTION SSRF validation failure
HealthCheckError CHECK_EXCEPTION General check failure
Other Exception CHECK_EXCEPTION Unexpected errors

Automatic mapping: The map_exception_to_health_error() function automatically converts legacy exceptions to HealthError with the appropriate error code:

from fast_healthchecks.errors import map_exception_to_health_error

# These are equivalent:
result.error = map_exception_to_health_error(exc)
result.error = HealthError(code="CHECK_EXCEPTION", message=str(exc), ...)

Migration guide:

  1. Replace except HealthCheckTimeoutError with if result.error.code == "CHECK_TIMEOUT"
  2. Replace except HealthCheckError with if result.error.code == "CHECK_EXCEPTION"
  3. Access structured error info via result.error.code, result.error.message, result.error.meta

Migration from legacy exceptions:

The old exception-based approach (HealthCheckError, HealthCheckTimeoutError) is still supported for backward compatibility. However, the new HealthError model provides more structured information:

# Old way (still works)
try:
    await runner.run(probe)
except asyncio.TimeoutError as e:
    logger.error(f"Timeout: {e}")

# New way (recommended) - use map_exception_to_health_error
from fast_healthchecks.errors import map_exception_to_health_error

result = await runner.run(probe)
if not result.healthy and result.error:
    # Structured error information
    print(f"Error code: {result.error.code}")      # e.g., "PROBE_TIMEOUT"
    print(f"Message: {result.error.message}")      # Human-readable message
    print(f"Duration: {result.error.duration_ms}ms")
    print(f"Meta: {result.error.meta}")            # Additional context

The map_exception_to_health_error function in fast_healthchecks.errors converts exceptions to structured HealthError instances with official error codes.

Models for healthchecks.

HealthCheckError

Bases: Exception

Base exception for health-check-related failures.

Raised or used as a base for timeouts, SSRF validation, and other health-check errors. Subclasses preserve the original exception type (e.g. HealthCheckTimeoutError is also an asyncio.TimeoutError) so existing code that catches TimeoutError or ValueError continues to work.

Source code in fast_healthchecks/models.py
class HealthCheckError(Exception):
    """Base exception for health-check-related failures.

    Raised or used as a base for timeouts, SSRF validation, and other
    health-check errors. Subclasses preserve the original exception type
    (e.g. HealthCheckTimeoutError is also an asyncio.TimeoutError) so
    existing code that catches TimeoutError or ValueError continues to work.
    """

HealthCheckReport dataclass

Report of healthchecks.

Attributes:

Name Type Description
results list[HealthCheckResult]

List of healthcheck results.

allow_partial_failure bool

If True, report is healthy when at least one check passes.

Source code in fast_healthchecks/models.py
@dataclass(frozen=True)
class HealthCheckReport:
    """Report of healthchecks.

    Attributes:
        results: List of healthcheck results.
        allow_partial_failure: If True, report is healthy when at least one check passes.
    """

    results: list[HealthCheckResult]
    allow_partial_failure: bool = False

    def __str__(self) -> str:
        """Return a string representation of the report."""
        return "\n".join(str(result) for result in self.results)

    @property
    def healthy(self) -> bool:
        """Return whether all healthchecks passed (or allowed partial failure)."""
        if self.allow_partial_failure:
            return any(result.healthy for result in self.results)
        return all(result.healthy for result in self.results)

healthy property

Return whether all healthchecks passed (or allowed partial failure).

__str__()

Return a string representation of the report.

Source code in fast_healthchecks/models.py
def __str__(self) -> str:
    """Return a string representation of the report."""
    return "\n".join(str(result) for result in self.results)

HealthCheckResult dataclass

Result of a healthcheck.

Attributes:

Name Type Description
name str

Name of the healthcheck.

healthy bool

Whether the healthcheck passed.

error HealthError | None

Structured error details if the healthcheck failed.

Source code in fast_healthchecks/models.py
@dataclass(frozen=True, init=False)
class HealthCheckResult:
    """Result of a healthcheck.

    Attributes:
        name: Name of the healthcheck.
        healthy: Whether the healthcheck passed.
        error: Structured error details if the healthcheck failed.
    """

    name: str
    healthy: bool
    error: HealthError | None = None

    def __init__(
        self,
        name: str,
        healthy: bool,  # noqa: FBT001
        error: HealthError | None = None,
        *,
        error_details: str | None = None,
    ) -> None:
        """Create a health check result.

        The ``error_details`` keyword is accepted for backward compatibility
        and is converted into ``error.message``.

        Raises:
            ValueError: If both ``error`` and ``error_details`` are provided.
        """
        if error is not None and error_details is not None:
            msg = "Provide either error or error_details, not both"
            raise ValueError(msg)

        resolved_error = error
        if resolved_error is None and error_details is not None:
            resolved_error = HealthError(code="CHECK_EXCEPTION", message=error_details)

        object.__setattr__(self, "name", name)
        object.__setattr__(self, "healthy", healthy)
        object.__setattr__(self, "error", resolved_error)

    @property
    def error_details(self) -> str | None:
        """Backward-compatible error message accessor."""
        if self.error is None:
            return None
        return self.error.message

    def __str__(self) -> str:
        """Return a string representation of the result."""
        return f"{self.name}: {'healthy' if self.healthy else 'unhealthy'}"

error_details property

Backward-compatible error message accessor.

__init__(name, healthy, error=None, *, error_details=None)

Create a health check result.

The error_details keyword is accepted for backward compatibility and is converted into error.message.

Raises:

Type Description
ValueError

If both error and error_details are provided.

Source code in fast_healthchecks/models.py
def __init__(
    self,
    name: str,
    healthy: bool,  # noqa: FBT001
    error: HealthError | None = None,
    *,
    error_details: str | None = None,
) -> None:
    """Create a health check result.

    The ``error_details`` keyword is accepted for backward compatibility
    and is converted into ``error.message``.

    Raises:
        ValueError: If both ``error`` and ``error_details`` are provided.
    """
    if error is not None and error_details is not None:
        msg = "Provide either error or error_details, not both"
        raise ValueError(msg)

    resolved_error = error
    if resolved_error is None and error_details is not None:
        resolved_error = HealthError(code="CHECK_EXCEPTION", message=error_details)

    object.__setattr__(self, "name", name)
    object.__setattr__(self, "healthy", healthy)
    object.__setattr__(self, "error", resolved_error)

__str__()

Return a string representation of the result.

Source code in fast_healthchecks/models.py
def __str__(self) -> str:
    """Return a string representation of the result."""
    return f"{self.name}: {'healthy' if self.healthy else 'unhealthy'}"

HealthCheckSSRFError

Bases: HealthCheckError, ValueError

Raised when URL or host validation fails (SSRF / block_private_hosts).

Subclass of both HealthCheckError and ValueError so that except ValueError still catches it.

Source code in fast_healthchecks/models.py
class HealthCheckSSRFError(HealthCheckError, ValueError):
    """Raised when URL or host validation fails (SSRF / block_private_hosts).

    Subclass of both HealthCheckError and ValueError so that
    ``except ValueError`` still catches it.
    """

HealthCheckTimeoutError

Bases: HealthCheckError, TimeoutError

Raised when a probe or check run exceeds its timeout.

Subclass of both HealthCheckError and asyncio.TimeoutError so that except asyncio.TimeoutError or except TimeoutError still catch it.

Source code in fast_healthchecks/models.py
class HealthCheckTimeoutError(HealthCheckError, asyncio.TimeoutError):
    """Raised when a probe or check run exceeds its timeout.

    Subclass of both HealthCheckError and asyncio.TimeoutError so that
    ``except asyncio.TimeoutError`` or ``except TimeoutError`` still catch it.
    """

    code: str

    def __init__(self, message: str = "Probe timed out", *, code: str = "PROBE_TIMEOUT") -> None:
        """Create timeout error with machine-readable timeout code."""
        super().__init__(message)
        self.code = code

__init__(message='Probe timed out', *, code='PROBE_TIMEOUT')

Create timeout error with machine-readable timeout code.

Source code in fast_healthchecks/models.py
def __init__(self, message: str = "Probe timed out", *, code: str = "PROBE_TIMEOUT") -> None:
    """Create timeout error with machine-readable timeout code."""
    super().__init__(message)
    self.code = code

HealthError dataclass

Machine-readable error details for failed health checks.

Attributes:

Name Type Description
code str

Machine-readable error code (e.g., "PROBE_TIMEOUT", "CHECK_EXCEPTION").

message str

Human-readable error message describing what went wrong.

duration_ms int

Time in milliseconds that the probe took to execute.

timeout_ms int | None

Timeout limit in milliseconds that was applied to the probe.

meta dict[str, object]

Additional metadata about the error (e.g., exception type, URL). Secrets are automatically redacted from this field.

Source code in fast_healthchecks/models.py
@dataclass(frozen=True)
class HealthError:
    """Machine-readable error details for failed health checks.

    Attributes:
        code: Machine-readable error code (e.g., "PROBE_TIMEOUT", "CHECK_EXCEPTION").
        message: Human-readable error message describing what went wrong.
        duration_ms: Time in milliseconds that the probe took to execute.
        timeout_ms: Timeout limit in milliseconds that was applied to the probe.
        meta: Additional metadata about the error (e.g., exception type, URL).
            Secrets are automatically redacted from this field.
    """

    code: str
    message: str
    duration_ms: int = 0
    timeout_ms: int | None = None
    meta: dict[str, object] = field(default_factory=dict)

Execution primitives and policy models for healthchecks.

ProbeRunner dataclass

Immutable runner that executes probes according to RunPolicy.

Use as an async context manager to ensure proper resource cleanup:

async with ProbeRunner(policy) as runner:
    report = await runner.run(probe)

Attributes:

Name Type Description
policy RunPolicy

RunPolicy instance controlling execution behavior.

_probes list[Probe]

Internal list of probes that have been registered with this runner.

_probe_ids set[int]

Internal set of probe object IDs for deduplication.

Source code in fast_healthchecks/execution.py
@dataclass(frozen=True)
class ProbeRunner:
    """Immutable runner that executes probes according to RunPolicy.

    Use as an async context manager to ensure proper resource cleanup:

        async with ProbeRunner(policy) as runner:
            report = await runner.run(probe)

    Attributes:
        policy: RunPolicy instance controlling execution behavior.
        _probes: Internal list of probes that have been registered with this runner.
        _probe_ids: Internal set of probe object IDs for deduplication.
    """

    policy: RunPolicy = field(default_factory=RunPolicy)
    _probes: list[Probe] = field(default_factory=list, init=False, repr=False, compare=False)
    _probe_ids: set[int] = field(default_factory=set, init=False, repr=False, compare=False)

    async def __aenter__(self) -> ProbeRunner:  # noqa: PYI034
        """Return self for async context-manager usage."""
        return self

    async def __aexit__(
        self,
        _exc_type: type[BaseException] | None,
        _exc: BaseException | None,
        _tb: TracebackType | None,
    ) -> None:
        """Always close managed probes on context exit."""
        await self.close()

    async def run(self, probe: Probe) -> HealthCheckReport:
        """Run probe checks and return a report.

        Args:
            probe: Probe with checks to execute.

        Returns:
            HealthCheckReport for the probe run.
        """
        probe_id = id(probe)
        if probe_id not in self._probe_ids:
            self._probe_ids.add(probe_id)
            self._probes.append(probe)

        timeout = None
        if self.policy.probe_timeout_ms is not None:
            timeout = self.policy.probe_timeout_ms / 1000

        on_check_start = _noop_on_check_start if self.policy.execution == "sequential" else None

        report = await run_probe(
            probe,
            timeout=timeout,
            on_check_start=on_check_start,
            on_timeout_return_failure=self.policy.mode == "reporting",
        )

        if self.policy.health_evaluation == "partial_allowed":
            return HealthCheckReport(results=report.results, allow_partial_failure=True)
        return report

    async def close(self) -> None:
        """Close resources for probes executed by this runner."""
        await close_probes(self._probes)

__aenter__() async

Return self for async context-manager usage.

Source code in fast_healthchecks/execution.py
async def __aenter__(self) -> ProbeRunner:  # noqa: PYI034
    """Return self for async context-manager usage."""
    return self

__aexit__(_exc_type, _exc, _tb) async

Always close managed probes on context exit.

Source code in fast_healthchecks/execution.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc: BaseException | None,
    _tb: TracebackType | None,
) -> None:
    """Always close managed probes on context exit."""
    await self.close()

close() async

Close resources for probes executed by this runner.

Source code in fast_healthchecks/execution.py
async def close(self) -> None:
    """Close resources for probes executed by this runner."""
    await close_probes(self._probes)

run(probe) async

Run probe checks and return a report.

Parameters:

Name Type Description Default
probe Probe

Probe with checks to execute.

required

Returns:

Type Description
HealthCheckReport

HealthCheckReport for the probe run.

Source code in fast_healthchecks/execution.py
async def run(self, probe: Probe) -> HealthCheckReport:
    """Run probe checks and return a report.

    Args:
        probe: Probe with checks to execute.

    Returns:
        HealthCheckReport for the probe run.
    """
    probe_id = id(probe)
    if probe_id not in self._probe_ids:
        self._probe_ids.add(probe_id)
        self._probes.append(probe)

    timeout = None
    if self.policy.probe_timeout_ms is not None:
        timeout = self.policy.probe_timeout_ms / 1000

    on_check_start = _noop_on_check_start if self.policy.execution == "sequential" else None

    report = await run_probe(
        probe,
        timeout=timeout,
        on_check_start=on_check_start,
        on_timeout_return_failure=self.policy.mode == "reporting",
    )

    if self.policy.health_evaluation == "partial_allowed":
        return HealthCheckReport(results=report.results, allow_partial_failure=True)
    return report

RunPolicy dataclass

Immutable policy controlling probe execution behavior.

Attributes:

Name Type Description
mode RunMode

Controls how probe failures affect overall health status. - "strict": Any failure marks the health check as failed - "reporting": Failures are reported but don't fail the check

execution ExecutionMode

Controls how probes are executed. - "parallel": All probes run concurrently - "sequential": Probes run one at a time

probe_timeout_ms int | None

Optional timeout in milliseconds for each probe. If None, probes use their default timeout.

health_evaluation HealthEvaluationMode

Controls evaluation strategy. - "all_required": All probes must pass for overall health - "partial_allowed": Some probes can fail without failing overall

Source code in fast_healthchecks/execution.py
@dataclass(frozen=True)
class RunPolicy:
    """Immutable policy controlling probe execution behavior.

    Attributes:
        mode: Controls how probe failures affect overall health status.
            - "strict": Any failure marks the health check as failed
            - "reporting": Failures are reported but don't fail the check
        execution: Controls how probes are executed.
            - "parallel": All probes run concurrently
            - "sequential": Probes run one at a time
        probe_timeout_ms: Optional timeout in milliseconds for each probe.
            If None, probes use their default timeout.
        health_evaluation: Controls evaluation strategy.
            - "all_required": All probes must pass for overall health
            - "partial_allowed": Some probes can fail without failing overall
    """

    mode: RunMode = "strict"
    execution: ExecutionMode = "parallel"
    probe_timeout_ms: int | None = None
    health_evaluation: HealthEvaluationMode = "all_required"

    def __post_init__(self) -> None:
        """Validate policy values.

        Raises:
            ValueError: If mode/execution/health_evaluation is invalid or timeout is non-positive.
        """
        if self.mode not in _VALID_RUN_MODES:
            msg = f"Invalid run mode: {self.mode}"
            raise ValueError(msg)

        if self.execution not in _VALID_EXECUTION_MODES:
            msg = f"Invalid execution mode: {self.execution}"
            raise ValueError(msg)

        if self.health_evaluation not in _VALID_HEALTH_EVALUATIONS:
            msg = f"Invalid health evaluation mode: {self.health_evaluation}"
            raise ValueError(msg)

        if self.probe_timeout_ms is not None and self.probe_timeout_ms <= 0:
            msg = "probe_timeout_ms must be > 0 when provided"
            raise ValueError(msg)

__post_init__()

Validate policy values.

Raises:

Type Description
ValueError

If mode/execution/health_evaluation is invalid or timeout is non-positive.

Source code in fast_healthchecks/execution.py
def __post_init__(self) -> None:
    """Validate policy values.

    Raises:
        ValueError: If mode/execution/health_evaluation is invalid or timeout is non-positive.
    """
    if self.mode not in _VALID_RUN_MODES:
        msg = f"Invalid run mode: {self.mode}"
        raise ValueError(msg)

    if self.execution not in _VALID_EXECUTION_MODES:
        msg = f"Invalid execution mode: {self.execution}"
        raise ValueError(msg)

    if self.health_evaluation not in _VALID_HEALTH_EVALUATIONS:
        msg = f"Invalid health evaluation mode: {self.health_evaluation}"
        raise ValueError(msg)

    if self.probe_timeout_ms is not None and self.probe_timeout_ms <= 0:
        msg = "probe_timeout_ms must be > 0 when provided"
        raise ValueError(msg)

DSN NewTypes for type hints only.

These types annotate DSN strings (e.g. AmqpDsn, RedisDsn) but are not used at runtime by check classes. Each HealthCheckDSN subclass implements its own parse_dsn() and validate_dsn(); dsn.py provides no parsing or validation logic. Use these types to annotate configuration or function parameters.

Checks

Immutable configuration dataclasses for health checks.

Encapsulates connection parameters to avoid long parameter lists (PLR0913) and centralize serialization for to_dict().

FunctionConfig dataclass

Configuration for function health check.

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class FunctionConfig:
    """Configuration for function health check."""

    args: tuple[object, ...] = ()
    kwargs: dict[str, object] | None = None
    timeout: float = DEFAULT_HC_TIMEOUT

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        d = asdict(self)
        d["args"] = list(d.get("args") or ())
        d["kwargs"] = dict(d["kwargs"]) if d.get("kwargs") else {}
        return d

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    d = asdict(self)
    d["args"] = list(d.get("args") or ())
    d["kwargs"] = dict(d["kwargs"]) if d.get("kwargs") else {}
    return d

KafkaConfig dataclass

Configuration for Kafka health check.

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class KafkaConfig:
    """Configuration for Kafka health check."""

    bootstrap_servers: str = "localhost:9092"
    ssl_context: _ssl.SSLContext | None = None
    security_protocol: SecurityProtocol = "PLAINTEXT"
    sasl_mechanism: SaslMechanism = "PLAIN"
    sasl_plain_username: str | None = None
    sasl_plain_password: str | None = None
    timeout: float = DEFAULT_HC_TIMEOUT

    def __post_init__(self) -> None:
        """Validate security_protocol and sasl_mechanism.

        Raises:
            ValueError: If security_protocol or sasl_mechanism is invalid.
        """
        if self.security_protocol not in VALID_SECURITY_PROTOCOLS:
            msg = f"Invalid security protocol: {self.security_protocol}"
            raise ValueError(msg)
        if self.sasl_mechanism not in VALID_SASL_MECHANISMS:
            msg = f"Invalid SASL mechanism: {self.sasl_mechanism}"
            raise ValueError(msg)

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        # ssl_context is not deepcopy-safe; build dict manually
        return {
            "bootstrap_servers": self.bootstrap_servers,
            "ssl_context": self.ssl_context,
            "security_protocol": self.security_protocol,
            "sasl_mechanism": self.sasl_mechanism,
            "sasl_plain_username": self.sasl_plain_username,
            "sasl_plain_password": self.sasl_plain_password,
            "timeout": self.timeout,
        }

__post_init__()

Validate security_protocol and sasl_mechanism.

Raises:

Type Description
ValueError

If security_protocol or sasl_mechanism is invalid.

Source code in fast_healthchecks/checks/configs.py
def __post_init__(self) -> None:
    """Validate security_protocol and sasl_mechanism.

    Raises:
        ValueError: If security_protocol or sasl_mechanism is invalid.
    """
    if self.security_protocol not in VALID_SECURITY_PROTOCOLS:
        msg = f"Invalid security protocol: {self.security_protocol}"
        raise ValueError(msg)
    if self.sasl_mechanism not in VALID_SASL_MECHANISMS:
        msg = f"Invalid SASL mechanism: {self.sasl_mechanism}"
        raise ValueError(msg)

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    # ssl_context is not deepcopy-safe; build dict manually
    return {
        "bootstrap_servers": self.bootstrap_servers,
        "ssl_context": self.ssl_context,
        "security_protocol": self.security_protocol,
        "sasl_mechanism": self.sasl_mechanism,
        "sasl_plain_username": self.sasl_plain_username,
        "sasl_plain_password": self.sasl_plain_password,
        "timeout": self.timeout,
    }

MongoConfig dataclass

Configuration for MongoDB health check.

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class MongoConfig:
    """Configuration for MongoDB health check."""

    hosts: str | list[str] = "localhost"
    port: int | None = 27017
    user: str | None = None
    password: str | None = None
    database: str | None = None
    auth_source: str = "admin"
    timeout: float = DEFAULT_HC_TIMEOUT

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        return asdict(self)

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    return asdict(self)

OpenSearchConfig dataclass

Configuration for OpenSearch health check.

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class OpenSearchConfig:
    """Configuration for OpenSearch health check."""

    hosts: list[str] = field(default_factory=lambda: ["localhost:9200"])
    http_auth: tuple[str, str] | None = None
    use_ssl: bool = False
    verify_certs: bool = False
    ssl_show_warn: bool = False
    ca_certs: str | None = None
    timeout: float = DEFAULT_HC_TIMEOUT

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        return asdict(self)

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    return asdict(self)

PostgresAsyncPGConfig dataclass

Configuration for PostgreSQL health check (asyncpg driver).

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class PostgresAsyncPGConfig:
    """Configuration for PostgreSQL health check (asyncpg driver)."""

    host: str = "localhost"
    port: int = 5432
    user: str | None = None
    password: str | None = None
    database: str | None = None
    ssl: _ssl.SSLContext | None = None
    direct_tls: bool = False
    timeout: float = DEFAULT_HC_TIMEOUT

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        # ssl is not serializable; include as None in dict for consistent keys
        return {
            "host": self.host,
            "port": self.port,
            "user": self.user,
            "password": self.password,
            "database": self.database,
            "ssl": self.ssl,
            "direct_tls": self.direct_tls,
            "timeout": self.timeout,
        }

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    # ssl is not serializable; include as None in dict for consistent keys
    return {
        "host": self.host,
        "port": self.port,
        "user": self.user,
        "password": self.password,
        "database": self.database,
        "ssl": self.ssl,
        "direct_tls": self.direct_tls,
        "timeout": self.timeout,
    }

PostgresPsycopgConfig dataclass

Configuration for PostgreSQL health check (psycopg driver).

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class PostgresPsycopgConfig:
    """Configuration for PostgreSQL health check (psycopg driver)."""

    host: str = "localhost"
    port: int = 5432
    user: str | None = None
    password: str | None = None
    database: str | None = None
    sslmode: str | None = None
    sslcert: str | None = None
    sslkey: str | None = None
    sslrootcert: str | None = None
    timeout: float = DEFAULT_HC_TIMEOUT

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        return asdict(self)

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    return asdict(self)

RabbitMQConfig dataclass

Configuration for RabbitMQ health check.

Security: The default user and password ("guest") match RabbitMQ's default credentials and are intended for local development only. Do not use these defaults in production or on non-local brokers; set explicit credentials or use a secrets manager. See SECURITY.md.

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class RabbitMQConfig:
    """Configuration for RabbitMQ health check.

    **Security:** The default ``user`` and ``password`` (``"guest"``) match
    RabbitMQ's default credentials and are intended for local development only.
    Do not use these defaults in production or on non-local brokers; set
    explicit credentials or use a secrets manager. See SECURITY.md.
    """

    host: str = "localhost"
    port: int = 5672
    user: str = "guest"
    password: str = "guest"  # noqa: S105
    vhost: str = "/"
    secure: bool = False
    timeout: float = DEFAULT_HC_TIMEOUT

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        return asdict(self)

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    return asdict(self)

RedisConfig dataclass

Configuration for Redis health check.

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class RedisConfig:
    """Configuration for Redis health check."""

    host: str = "localhost"
    port: int = 6379
    database: int | str = 0
    user: str | None = None
    password: str | None = None
    ssl: bool = False
    ssl_ca_certs: str | None = None
    timeout: float = DEFAULT_HC_TIMEOUT

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        return asdict(self)

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    return asdict(self)

UrlConfig dataclass

Configuration for URL health check.

Use only trusted URLs from application configuration; do not pass user-controlled input to avoid SSRF. Validation and behaviour are provided by :func:~fast_healthchecks.utils.validate_url_ssrf and :func:~fast_healthchecks.utils.validate_host_ssrf_async. See the SSRF documentation in the docs.

Source code in fast_healthchecks/checks/configs.py
@dataclass(frozen=True)
class UrlConfig:
    """Configuration for URL health check.

    Use only trusted URLs from application configuration; do not pass
    user-controlled input to avoid SSRF. Validation and behaviour are
    provided by :func:`~fast_healthchecks.utils.validate_url_ssrf` and
    :func:`~fast_healthchecks.utils.validate_host_ssrf_async`. See the
    SSRF documentation in the docs.
    """

    url: str = ""
    username: str | None = None
    password: str | None = None
    verify_ssl: bool = True
    follow_redirects: bool = True
    timeout: float = DEFAULT_HC_TIMEOUT
    block_private_hosts: bool = False

    def to_dict(self) -> dict[str, Any]:
        """Return config as a dict for serialization."""
        return asdict(self)

to_dict()

Return config as a dict for serialization.

Source code in fast_healthchecks/checks/configs.py
def to_dict(self) -> dict[str, Any]:
    """Return config as a dict for serialization."""
    return asdict(self)

Type aliases for health checks.

HealthCheck

Bases: Protocol[T_co]

Base class for health checks.

Source code in fast_healthchecks/checks/_base.py
class HealthCheck(Protocol[T_co]):
    """Base class for health checks."""

    async def __call__(self) -> T_co: ...

HealthCheckDSN

Bases: ConfigDictMixin, HealthCheck[T_co], Generic[T_co, T_parsed]

Base class for health checks that can be created from a DSN.

Contract: subclasses must define _allowed_schemes(), _default_name(), parse_dsn(), and _from_parsed_dsn(). The check stores its display name in _name (used in HealthCheckResult and error reporting). DSN validation uses validate_dsn(); fast_healthchecks.dsn NewTypes are typing-only, not runtime.

Type parameters: T_co is the result type (e.g. HealthCheckResult); T_parsed is the type returned by parse_dsn() and accepted by _from_parsed_dsn().

Source code in fast_healthchecks/checks/_base.py
class HealthCheckDSN(ConfigDictMixin, HealthCheck[T_co], Generic[T_co, T_parsed]):
    """Base class for health checks that can be created from a DSN.

    Contract: subclasses must define _allowed_schemes(), _default_name(),
    parse_dsn(), and _from_parsed_dsn(). The check stores its display name in
    _name (used in HealthCheckResult and error reporting). DSN validation uses
    validate_dsn(); fast_healthchecks.dsn NewTypes are typing-only, not runtime.

    Type parameters: T_co is the result type (e.g. HealthCheckResult);
    T_parsed is the type returned by parse_dsn() and accepted by _from_parsed_dsn().
    """

    @classmethod
    @abstractmethod
    def _allowed_schemes(cls) -> tuple[str, ...]:
        """Return DSN schemes allowed for this check (e.g. ('redis', 'rediss'))."""
        raise NotImplementedError  # pragma: no cover

    @classmethod
    @abstractmethod
    def _default_name(cls) -> str:
        """Return the default check name for from_dsn (e.g. 'Redis')."""
        raise NotImplementedError  # pragma: no cover

    @classmethod
    @abstractmethod
    def parse_dsn(cls, dsn: str) -> T_parsed:
        """Parse the DSN string. Subclasses must implement."""
        raise NotImplementedError  # pragma: no cover

    @classmethod
    @abstractmethod
    def _from_parsed_dsn(
        cls,
        parsed: T_parsed,
        *,
        name: str = "Service",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **kwargs: object,
    ) -> HealthCheckDSN[T_co, T_parsed]:
        """Create a check instance from parsed DSN."""
        raise NotImplementedError  # pragma: no cover

    @classmethod
    def from_dsn(
        cls,
        dsn: str,
        *,
        name: str | None = None,
        timeout: float = DEFAULT_HC_TIMEOUT,
        **kwargs: object,
    ) -> HealthCheckDSN[T_co, T_parsed]:
        """Create a check instance from a DSN string.

        Returns:
            HealthCheckDSN: Configured check instance.
        """
        if name is None:
            name = cls._default_name()
        dsn = cls.validate_dsn(dsn, allowed_schemes=cls._allowed_schemes())
        parsed = cls.parse_dsn(dsn)
        return cls._from_parsed_dsn(parsed, name=name, timeout=timeout, **kwargs)

    @classmethod
    def validate_dsn(cls, dsn: str, *, allowed_schemes: tuple[str, ...]) -> str:
        """Validate the DSN has an allowed scheme.

        Allows compound schemes (e.g. postgresql+asyncpg) when the base
        part before '+' is in allowed_schemes. Scheme comparison is case-insensitive.

        Returns:
            str: The DSN string (stripped of leading/trailing whitespace).

        Raises:
            TypeError: If dsn is not a string.
            ValueError: If DSN is empty or scheme is not in allowed_schemes.
        """
        if not isinstance(dsn, str):
            msg = f"DSN must be str, got {type(dsn).__name__!r}"
            raise TypeError(msg) from None

        dsn = dsn.strip()
        if not dsn:
            msg = "DSN cannot be empty"
            raise ValueError(msg) from None

        if not allowed_schemes:
            msg = "allowed_schemes cannot be empty"
            raise ValueError(msg) from None

        parsed = urlsplit(dsn)
        scheme = (parsed.scheme or "").lower()
        base_scheme = scheme.split("+", 1)[0] if "+" in scheme else scheme

        allowed_set = frozenset(s.lower() for s in allowed_schemes)
        if scheme not in allowed_set and base_scheme not in allowed_set:
            schemes_str = ", ".join(sorted(allowed_set))
            msg = f"DSN scheme must be one of {schemes_str} (or compound e.g. postgresql+driver), got {scheme!r}"
            raise ValueError(msg) from None

        return dsn

from_dsn(dsn, *, name=None, timeout=DEFAULT_HC_TIMEOUT, **kwargs) classmethod

Create a check instance from a DSN string.

Returns:

Name Type Description
HealthCheckDSN HealthCheckDSN[T_co, T_parsed]

Configured check instance.

Source code in fast_healthchecks/checks/_base.py
@classmethod
def from_dsn(
    cls,
    dsn: str,
    *,
    name: str | None = None,
    timeout: float = DEFAULT_HC_TIMEOUT,
    **kwargs: object,
) -> HealthCheckDSN[T_co, T_parsed]:
    """Create a check instance from a DSN string.

    Returns:
        HealthCheckDSN: Configured check instance.
    """
    if name is None:
        name = cls._default_name()
    dsn = cls.validate_dsn(dsn, allowed_schemes=cls._allowed_schemes())
    parsed = cls.parse_dsn(dsn)
    return cls._from_parsed_dsn(parsed, name=name, timeout=timeout, **kwargs)

parse_dsn(dsn) abstractmethod classmethod

Parse the DSN string. Subclasses must implement.

Source code in fast_healthchecks/checks/_base.py
@classmethod
@abstractmethod
def parse_dsn(cls, dsn: str) -> T_parsed:
    """Parse the DSN string. Subclasses must implement."""
    raise NotImplementedError  # pragma: no cover

validate_dsn(dsn, *, allowed_schemes) classmethod

Validate the DSN has an allowed scheme.

Allows compound schemes (e.g. postgresql+asyncpg) when the base part before '+' is in allowed_schemes. Scheme comparison is case-insensitive.

Returns:

Name Type Description
str str

The DSN string (stripped of leading/trailing whitespace).

Raises:

Type Description
TypeError

If dsn is not a string.

ValueError

If DSN is empty or scheme is not in allowed_schemes.

Source code in fast_healthchecks/checks/_base.py
@classmethod
def validate_dsn(cls, dsn: str, *, allowed_schemes: tuple[str, ...]) -> str:
    """Validate the DSN has an allowed scheme.

    Allows compound schemes (e.g. postgresql+asyncpg) when the base
    part before '+' is in allowed_schemes. Scheme comparison is case-insensitive.

    Returns:
        str: The DSN string (stripped of leading/trailing whitespace).

    Raises:
        TypeError: If dsn is not a string.
        ValueError: If DSN is empty or scheme is not in allowed_schemes.
    """
    if not isinstance(dsn, str):
        msg = f"DSN must be str, got {type(dsn).__name__!r}"
        raise TypeError(msg) from None

    dsn = dsn.strip()
    if not dsn:
        msg = "DSN cannot be empty"
        raise ValueError(msg) from None

    if not allowed_schemes:
        msg = "allowed_schemes cannot be empty"
        raise ValueError(msg) from None

    parsed = urlsplit(dsn)
    scheme = (parsed.scheme or "").lower()
    base_scheme = scheme.split("+", 1)[0] if "+" in scheme else scheme

    allowed_set = frozenset(s.lower() for s in allowed_schemes)
    if scheme not in allowed_set and base_scheme not in allowed_set:
        schemes_str = ", ".join(sorted(allowed_set))
        msg = f"DSN scheme must be one of {schemes_str} (or compound e.g. postgresql+driver), got {scheme!r}"
        raise ValueError(msg) from None

    return dsn

Health check that runs a user-provided callable (sync or async).

FunctionHealthCheck runs the callable each time the check is executed; sync functions are run in a thread pool via run_in_executor.

FunctionHealthCheck

Bases: ConfigDictMixin, HealthCheck[HealthCheckResult]

Health check that runs a callable (sync or async) each time it is executed.

Synchronous functions are run via loop.run_in_executor(executor, ...). The default executor is None (shared thread pool). Long-running blocking sync checks can exhaust the pool; pass a dedicated :class:Executor if needed.

Source code in fast_healthchecks/checks/function.py
@final
class FunctionHealthCheck(ConfigDictMixin, HealthCheck[HealthCheckResult]):
    """Health check that runs a callable (sync or async) each time it is executed.

    Synchronous functions are run via ``loop.run_in_executor(executor, ...)``.
    The default executor is ``None`` (shared thread pool). Long-running blocking
    sync checks can exhaust the pool; pass a dedicated :class:`Executor` if needed.
    """

    __slots__ = ("_config", "_executor", "_func", "_name")

    _config: FunctionConfig
    _func: Callable[..., Any]
    _executor: Executor | None
    _name: str

    def __init__(
        self,
        *,
        config: FunctionConfig | None = None,
        func: Callable[..., Any] | None = None,
        name: str = "Function",
        executor: Executor | None = None,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the FunctionHealthCheck.

        Args:
            config: Config (args, kwargs, timeout). If None, built from kwargs.
            func: The function to perform the health check on (required if config is None).
            name: The name of the health check.
            executor: Executor for sync functions. Defaults to None (thread pool).
            **kwargs: Passed to FunctionConfig when config is None (args, kwargs, timeout).

        Raises:
            TypeError: When func is not provided.
        """
        if config is None:
            kwargs_copy = dict(kwargs)
            func = kwargs_copy.pop("func", func)
            executor = kwargs_copy.pop("executor", executor)
            if func is None:
                msg = "func is required when config is not provided"
                raise TypeError(msg)
            config = FunctionConfig(**kwargs_copy)
        elif func is None:
            msg = "func is required"
            raise TypeError(msg)
        self._config = config
        self._func = func
        self._executor = executor
        self._name = name

    @healthcheck_safe(invalidate_on_error=False)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check on the function.

        Sync functions run in the given executor (default: shared thread pool).

        Returns:
            HealthCheckResult: The result of the health check.
        """
        c = self._config
        args = c.args or ()
        kwargs = dict(c.kwargs) if c.kwargs else {}
        task: asyncio.Future[Any]
        if inspect.iscoroutinefunction(self._func):
            task = self._func(*args, **kwargs)
        else:
            loop = asyncio.get_running_loop()
            task = loop.run_in_executor(
                self._executor,
                functools.partial(self._func, *args, **kwargs),
            )
        result = await asyncio.wait_for(task, timeout=c.timeout)
        healthy = bool(result) if isinstance(result, bool) else True
        if healthy:
            return HealthCheckResult(name=self._name, healthy=True)
        return result_unhealthy_dependency(name=self._name, message="Function returned unhealthy status")

__call__() async

Perform the health check on the function.

Sync functions run in the given executor (default: shared thread pool).

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/function.py
@healthcheck_safe(invalidate_on_error=False)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check on the function.

    Sync functions run in the given executor (default: shared thread pool).

    Returns:
        HealthCheckResult: The result of the health check.
    """
    c = self._config
    args = c.args or ()
    kwargs = dict(c.kwargs) if c.kwargs else {}
    task: asyncio.Future[Any]
    if inspect.iscoroutinefunction(self._func):
        task = self._func(*args, **kwargs)
    else:
        loop = asyncio.get_running_loop()
        task = loop.run_in_executor(
            self._executor,
            functools.partial(self._func, *args, **kwargs),
        )
    result = await asyncio.wait_for(task, timeout=c.timeout)
    healthy = bool(result) if isinstance(result, bool) else True
    if healthy:
        return HealthCheckResult(name=self._name, healthy=True)
    return result_unhealthy_dependency(name=self._name, message="Function returned unhealthy status")

__init__(*, config=None, func=None, name='Function', executor=None, **kwargs)

Initialize the FunctionHealthCheck.

Parameters:

Name Type Description Default
config FunctionConfig | None

Config (args, kwargs, timeout). If None, built from kwargs.

None
func Callable[..., Any] | None

The function to perform the health check on (required if config is None).

None
name str

The name of the health check.

'Function'
executor Executor | None

Executor for sync functions. Defaults to None (thread pool).

None
**kwargs Any

Passed to FunctionConfig when config is None (args, kwargs, timeout).

{}

Raises:

Type Description
TypeError

When func is not provided.

Source code in fast_healthchecks/checks/function.py
def __init__(
    self,
    *,
    config: FunctionConfig | None = None,
    func: Callable[..., Any] | None = None,
    name: str = "Function",
    executor: Executor | None = None,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the FunctionHealthCheck.

    Args:
        config: Config (args, kwargs, timeout). If None, built from kwargs.
        func: The function to perform the health check on (required if config is None).
        name: The name of the health check.
        executor: Executor for sync functions. Defaults to None (thread pool).
        **kwargs: Passed to FunctionConfig when config is None (args, kwargs, timeout).

    Raises:
        TypeError: When func is not provided.
    """
    if config is None:
        kwargs_copy = dict(kwargs)
        func = kwargs_copy.pop("func", func)
        executor = kwargs_copy.pop("executor", executor)
        if func is None:
            msg = "func is required when config is not provided"
            raise TypeError(msg)
        config = FunctionConfig(**kwargs_copy)
    elif func is None:
        msg = "func is required"
        raise TypeError(msg)
    self._config = config
    self._func = func
    self._executor = executor
    self._name = name

This module provides a health check class for Redis.

Classes:

Name Description
RedisHealthCheck

A class to perform health checks on Redis.

Usage

The RedisHealthCheck class can be used to perform health checks on Redis by calling it.

Example

health_check = RedisHealthCheck( host="localhost", port=6379, ) result = await health_check() print(result.healthy)

RedisHealthCheck

Bases: ClientCachingMixin['Redis'], HealthCheckDSN[HealthCheckResult, RedisParseDsnResult]

A class to perform health checks on Redis.

Attributes:

Name Type Description
_database

The database to connect to.

_host

The host to connect to.

_name str

The name of the health check.

_password str

The password to authenticate with.

_port str

The port to connect to.

_timeout str

The timeout for the connection.

_user str

The user to authenticate with.

_ssl str

Whether to use SSL or not.

_ssl_ca_certs str

The path to the CA certificate.

Source code in fast_healthchecks/checks/redis.py
@final
class RedisHealthCheck(ClientCachingMixin["Redis"], HealthCheckDSN[HealthCheckResult, RedisParseDsnResult]):
    """A class to perform health checks on Redis.

    Attributes:
        _database: The database to connect to.
        _host: The host to connect to.
        _name: The name of the health check.
        _password: The password to authenticate with.
        _port: The port to connect to.
        _timeout: The timeout for the connection.
        _user: The user to authenticate with.
        _ssl: Whether to use SSL or not.
        _ssl_ca_certs: The path to the CA certificate.
    """

    __slots__ = (*_CLIENT_CACHING_SLOTS, "_config", "_name")

    _config: RedisConfig
    _name: str
    _client: Redis | None
    _client_loop: asyncio.AbstractEventLoop | None

    def __init__(
        self,
        *,
        config: RedisConfig | None = None,
        name: str = "Redis",
        close_client_fn: Callable[[Redis], Awaitable[None]] = _close_redis_client,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the RedisHealthCheck class.

        Args:
            config: Connection config. If None, built from kwargs (host, port, database, etc.).
            name: The name of the health check.
            close_client_fn: Callable to close the cached client. Defaults to the
                standard Redis aclose.
            **kwargs: Passed to RedisConfig when config is None (host, port, database,
                user, password, ssl, ssl_ca_certs, timeout).
        """
        if config is None:
            config = RedisConfig(**kwargs)
        self._config = config
        self._name = name
        super().__init__(close_client_fn=close_client_fn)

    def _create_client(self) -> Redis:
        c = self._config
        return Redis(
            host=c.host,
            port=c.port,
            db=c.database,
            username=c.user,
            password=c.password,
            socket_timeout=c.timeout,
            single_connection_client=True,
            ssl=c.ssl,
            ssl_ca_certs=c.ssl_ca_certs,
        )

    @classmethod
    def _allowed_schemes(cls) -> tuple[str, ...]:
        return ("redis", "rediss")

    @classmethod
    def _default_name(cls) -> str:
        return "Redis"

    @classmethod
    def parse_dsn(cls, dsn: str) -> RedisParseDsnResult:
        """Parse the DSN and return the results.

        Args:
            dsn: The DSN to parse.

        Returns:
            RedisParseDsnResult: The results of parsing the DSN.
        """
        parse_result: ConnectKwargs = parse_url(str(dsn))
        scheme = urlsplit(dsn).scheme.lower()
        return {"parse_result": parse_result, "scheme": scheme}

    @classmethod
    def _from_parsed_dsn(
        cls,
        parsed: RedisParseDsnResult,
        *,
        name: str = "Redis",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **_kwargs: object,
    ) -> RedisHealthCheck:
        parse_result = parsed["parse_result"]
        scheme = parsed.get("scheme", "")
        ssl_ca_certs: str | None = parse_result.get("ssl_ca_certs")
        ssl = parse_result.get("ssl", False) or bool(ssl_ca_certs) or (scheme == "rediss")
        config = RedisConfig(
            host=parse_result.get("host", "localhost"),
            port=parse_result.get("port", 6379),
            database=parse_result.get("db", 0),
            user=parse_result.get("username"),
            password=parse_result.get("password"),
            ssl=ssl,
            ssl_ca_certs=ssl_ca_certs,
            timeout=timeout,
        )
        return cls(config=config, name=name)

    @healthcheck_safe(invalidate_on_error=True)
    async def __call__(self) -> HealthCheckResult:
        """Perform a health check on Redis.

        Returns:
            HealthCheckResult: The result of the health check.
        """
        redis = await self._ensure_client()
        ping_result = redis.ping()
        healthy = bool(await ping_result) if asyncio.iscoroutine(ping_result) else bool(ping_result)
        if healthy:
            return HealthCheckResult(name=self._name, healthy=True)
        return result_unhealthy_dependency(name=self._name, message="Redis ping returned unhealthy status")

__call__() async

Perform a health check on Redis.

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/redis.py
@healthcheck_safe(invalidate_on_error=True)
async def __call__(self) -> HealthCheckResult:
    """Perform a health check on Redis.

    Returns:
        HealthCheckResult: The result of the health check.
    """
    redis = await self._ensure_client()
    ping_result = redis.ping()
    healthy = bool(await ping_result) if asyncio.iscoroutine(ping_result) else bool(ping_result)
    if healthy:
        return HealthCheckResult(name=self._name, healthy=True)
    return result_unhealthy_dependency(name=self._name, message="Redis ping returned unhealthy status")

__init__(*, config=None, name='Redis', close_client_fn=_close_redis_client, **kwargs)

Initialize the RedisHealthCheck class.

Parameters:

Name Type Description Default
config RedisConfig | None

Connection config. If None, built from kwargs (host, port, database, etc.).

None
name str

The name of the health check.

'Redis'
close_client_fn Callable[[Redis], Awaitable[None]]

Callable to close the cached client. Defaults to the standard Redis aclose.

_close_redis_client
**kwargs Any

Passed to RedisConfig when config is None (host, port, database, user, password, ssl, ssl_ca_certs, timeout).

{}
Source code in fast_healthchecks/checks/redis.py
def __init__(
    self,
    *,
    config: RedisConfig | None = None,
    name: str = "Redis",
    close_client_fn: Callable[[Redis], Awaitable[None]] = _close_redis_client,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the RedisHealthCheck class.

    Args:
        config: Connection config. If None, built from kwargs (host, port, database, etc.).
        name: The name of the health check.
        close_client_fn: Callable to close the cached client. Defaults to the
            standard Redis aclose.
        **kwargs: Passed to RedisConfig when config is None (host, port, database,
            user, password, ssl, ssl_ca_certs, timeout).
    """
    if config is None:
        config = RedisConfig(**kwargs)
    self._config = config
    self._name = name
    super().__init__(close_client_fn=close_client_fn)

parse_dsn(dsn) classmethod

Parse the DSN and return the results.

Parameters:

Name Type Description Default
dsn str

The DSN to parse.

required

Returns:

Name Type Description
RedisParseDsnResult RedisParseDsnResult

The results of parsing the DSN.

Source code in fast_healthchecks/checks/redis.py
@classmethod
def parse_dsn(cls, dsn: str) -> RedisParseDsnResult:
    """Parse the DSN and return the results.

    Args:
        dsn: The DSN to parse.

    Returns:
        RedisParseDsnResult: The results of parsing the DSN.
    """
    parse_result: ConnectKwargs = parse_url(str(dsn))
    scheme = urlsplit(dsn).scheme.lower()
    return {"parse_result": parse_result, "scheme": scheme}

This module provides a health check class for Kafka.

Classes:

Name Description
KafkaHealthCheck

A class to perform health checks on Kafka.

Usage

The KafkaHealthCheck class can be used to perform health checks on Kafka by calling it.

Example

health_check = KafkaHealthCheck( bootstrap_servers="localhost:9092", security_protocol="PLAINTEXT", ) result = await health_check() print(result.healthy)

KafkaHealthCheck

Bases: ClientCachingMixin['AIOKafkaAdminClient'], HealthCheckDSN[HealthCheckResult, KafkaParseDsnResult]

A class to perform health checks on Kafka.

Attributes:

Name Type Description
_bootstrap_servers

The Kafka bootstrap servers.

_name str

The name of the health check.

_sasl_mechanism str

The SASL mechanism to use.

_sasl_plain_password str

The SASL plain password.

_sasl_plain_username str

The SASL plain username.

_security_protocol str

The security protocol to use.

_ssl_context str

The SSL context to use.

_timeout str

The timeout for the health check.

Source code in fast_healthchecks/checks/kafka.py
@final
class KafkaHealthCheck(
    ClientCachingMixin["AIOKafkaAdminClient"],
    HealthCheckDSN[HealthCheckResult, KafkaParseDsnResult],
):
    """A class to perform health checks on Kafka.

    Attributes:
        _bootstrap_servers: The Kafka bootstrap servers.
        _name: The name of the health check.
        _sasl_mechanism: The SASL mechanism to use.
        _sasl_plain_password: The SASL plain password.
        _sasl_plain_username: The SASL plain username.
        _security_protocol: The security protocol to use.
        _ssl_context: The SSL context to use.
        _timeout: The timeout for the health check.
    """

    __slots__ = (*_CLIENT_CACHING_SLOTS, "_config", "_name")

    _config: KafkaConfig
    _name: str
    _client: AIOKafkaAdminClient | None
    _client_loop: asyncio.AbstractEventLoop | None

    def __init__(
        self,
        *,
        config: KafkaConfig | None = None,
        name: str = "Kafka",
        close_client_fn: Callable[[AIOKafkaAdminClient], Awaitable[None]] = _close_kafka_client,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the KafkaHealthCheck.

        Args:
            config: Connection config. If None, built from kwargs (bootstrap_servers, etc.).
            name: The name of the health check.
            close_client_fn: Callable to close the cached client.
            **kwargs: Passed to KafkaConfig when config is None.
        """
        if config is None:
            config = KafkaConfig(**kwargs)
        self._config = config
        self._name = name
        super().__init__(close_client_fn=close_client_fn)

    def _create_client(self) -> AIOKafkaAdminClient:
        c = self._config
        return AIOKafkaAdminClient(
            bootstrap_servers=c.bootstrap_servers,
            client_id="fast_healthchecks",
            request_timeout_ms=int(c.timeout * 1000),
            ssl_context=c.ssl_context,
            security_protocol=c.security_protocol,
            sasl_mechanism=c.sasl_mechanism,
            sasl_plain_username=c.sasl_plain_username,
            sasl_plain_password=c.sasl_plain_password,
        )

    @classmethod
    def _allowed_schemes(cls) -> tuple[str, ...]:
        return ("kafka", "kafkas")

    @classmethod
    def _default_name(cls) -> str:
        return "Kafka"

    @classmethod
    def parse_dsn(cls, dsn: str) -> KafkaParseDsnResult:
        """Parse the Kafka DSN and return the results.

        Scheme ``kafkas`` implies SSL (SASL_SSL when credentials present).
        Scheme ``kafka`` implies PLAINTEXT (SASL_PLAINTEXT when credentials present).
        Kwargs to from_dsn override DSN-derived values.

        Args:
            dsn: The DSN to parse.

        Returns:
            KafkaParseDsnResult: The results of parsing the DSN.

        Raises:
            ValueError: If bootstrap servers are missing.
        """
        parsed = urlsplit(dsn)
        scheme = (parsed.scheme or "kafka").lower()
        netloc = parsed.netloc
        sasl_plain_username: str | None = None
        sasl_plain_password: str | None = None
        if "@" in netloc:
            userinfo, hosts = netloc.rsplit("@", 1)
            netloc = hosts
            if ":" in userinfo:
                username, password = userinfo.split(":", 1)
                sasl_plain_username = unquote(username) or None
                sasl_plain_password = unquote(password) or None
            else:
                sasl_plain_username = unquote(userinfo) or None

        bootstrap_servers = netloc or parsed.path.lstrip("/")
        if not bootstrap_servers:
            msg = "Kafka DSN must include bootstrap servers"
            raise ValueError(msg) from None

        if scheme == "kafkas":
            security_protocol = "SASL_SSL" if (sasl_plain_username or sasl_plain_password) else "SSL"
        else:
            security_protocol = "SASL_PLAINTEXT" if (sasl_plain_username or sasl_plain_password) else "PLAINTEXT"

        return {
            "bootstrap_servers": bootstrap_servers,
            "sasl_plain_username": sasl_plain_username,
            "sasl_plain_password": sasl_plain_password,
            "security_protocol": security_protocol,
        }

    @classmethod
    def _from_parsed_dsn(
        cls,
        parsed: KafkaParseDsnResult,
        *,
        name: str = "Kafka",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **kwargs: object,
    ) -> KafkaHealthCheck:
        config = KafkaConfig(
            bootstrap_servers=parsed["bootstrap_servers"],
            ssl_context=cast("ssl.SSLContext | None", kwargs.get("ssl_context")),
            security_protocol=cast(
                "SecurityProtocol",
                kwargs.get("security_protocol", parsed["security_protocol"]) or "PLAINTEXT",
            ),
            sasl_mechanism=cast("SaslMechanism", kwargs.get("sasl_mechanism", "PLAIN")),
            sasl_plain_username=parsed["sasl_plain_username"],
            sasl_plain_password=parsed["sasl_plain_password"],
            timeout=timeout,
        )
        return cls(config=config, name=name)

    @healthcheck_safe(invalidate_on_error=True)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check on Kafka.

        Returns:
            HealthCheckResult: The result of the health check.
        """
        client = await self._ensure_client()
        if not getattr(client, "_started", False):  # pragma: no branch
            await client.start()
            with contextlib.suppress(AttributeError):
                client._started = True  # noqa: SLF001
        await client.list_topics()
        return HealthCheckResult(name=self._name, healthy=True)

__call__() async

Perform the health check on Kafka.

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/kafka.py
@healthcheck_safe(invalidate_on_error=True)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check on Kafka.

    Returns:
        HealthCheckResult: The result of the health check.
    """
    client = await self._ensure_client()
    if not getattr(client, "_started", False):  # pragma: no branch
        await client.start()
        with contextlib.suppress(AttributeError):
            client._started = True  # noqa: SLF001
    await client.list_topics()
    return HealthCheckResult(name=self._name, healthy=True)

__init__(*, config=None, name='Kafka', close_client_fn=_close_kafka_client, **kwargs)

Initialize the KafkaHealthCheck.

Parameters:

Name Type Description Default
config KafkaConfig | None

Connection config. If None, built from kwargs (bootstrap_servers, etc.).

None
name str

The name of the health check.

'Kafka'
close_client_fn Callable[[AIOKafkaAdminClient], Awaitable[None]]

Callable to close the cached client.

_close_kafka_client
**kwargs Any

Passed to KafkaConfig when config is None.

{}
Source code in fast_healthchecks/checks/kafka.py
def __init__(
    self,
    *,
    config: KafkaConfig | None = None,
    name: str = "Kafka",
    close_client_fn: Callable[[AIOKafkaAdminClient], Awaitable[None]] = _close_kafka_client,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the KafkaHealthCheck.

    Args:
        config: Connection config. If None, built from kwargs (bootstrap_servers, etc.).
        name: The name of the health check.
        close_client_fn: Callable to close the cached client.
        **kwargs: Passed to KafkaConfig when config is None.
    """
    if config is None:
        config = KafkaConfig(**kwargs)
    self._config = config
    self._name = name
    super().__init__(close_client_fn=close_client_fn)

parse_dsn(dsn) classmethod

Parse the Kafka DSN and return the results.

Scheme kafkas implies SSL (SASL_SSL when credentials present). Scheme kafka implies PLAINTEXT (SASL_PLAINTEXT when credentials present). Kwargs to from_dsn override DSN-derived values.

Parameters:

Name Type Description Default
dsn str

The DSN to parse.

required

Returns:

Name Type Description
KafkaParseDsnResult KafkaParseDsnResult

The results of parsing the DSN.

Raises:

Type Description
ValueError

If bootstrap servers are missing.

Source code in fast_healthchecks/checks/kafka.py
@classmethod
def parse_dsn(cls, dsn: str) -> KafkaParseDsnResult:
    """Parse the Kafka DSN and return the results.

    Scheme ``kafkas`` implies SSL (SASL_SSL when credentials present).
    Scheme ``kafka`` implies PLAINTEXT (SASL_PLAINTEXT when credentials present).
    Kwargs to from_dsn override DSN-derived values.

    Args:
        dsn: The DSN to parse.

    Returns:
        KafkaParseDsnResult: The results of parsing the DSN.

    Raises:
        ValueError: If bootstrap servers are missing.
    """
    parsed = urlsplit(dsn)
    scheme = (parsed.scheme or "kafka").lower()
    netloc = parsed.netloc
    sasl_plain_username: str | None = None
    sasl_plain_password: str | None = None
    if "@" in netloc:
        userinfo, hosts = netloc.rsplit("@", 1)
        netloc = hosts
        if ":" in userinfo:
            username, password = userinfo.split(":", 1)
            sasl_plain_username = unquote(username) or None
            sasl_plain_password = unquote(password) or None
        else:
            sasl_plain_username = unquote(userinfo) or None

    bootstrap_servers = netloc or parsed.path.lstrip("/")
    if not bootstrap_servers:
        msg = "Kafka DSN must include bootstrap servers"
        raise ValueError(msg) from None

    if scheme == "kafkas":
        security_protocol = "SASL_SSL" if (sasl_plain_username or sasl_plain_password) else "SSL"
    else:
        security_protocol = "SASL_PLAINTEXT" if (sasl_plain_username or sasl_plain_password) else "PLAINTEXT"

    return {
        "bootstrap_servers": bootstrap_servers,
        "sasl_plain_username": sasl_plain_username,
        "sasl_plain_password": sasl_plain_password,
        "security_protocol": security_protocol,
    }

This module provides a health check class for MongoDB.

Classes:

Name Description
MongoHealthCheck

A class to perform health checks on MongoDB.

Usage

The MongoHealthCheck class can be used to perform health checks on MongoDB by calling it.

Example

health_check = MongoHealthCheck( hosts=["host1:27017", "host2:27017"], # or hosts="localhost", port=27017, user="myuser", password="mypassword", database="mydatabase" ) result = await health_check() print(result.healthy)

MongoHealthCheck

Bases: ClientCachingMixin['AsyncIOMotorClient[dict[str, Any]]'], HealthCheckDSN[HealthCheckResult, MongoParseDsnResult]

A class to perform health checks on MongoDB.

Attributes:

Name Type Description
_auth_source

The MongoDB authentication source.

_database

The MongoDB database to use.

_hosts

The MongoDB host or a list of hosts.

_name str

The name of the health check.

_password str

The MongoDB password.

_port str

The MongoDB port.

_timeout str

The timeout for the health check.

_user str

The MongoDB user.

Source code in fast_healthchecks/checks/mongo.py
@final
class MongoHealthCheck(
    ClientCachingMixin["AsyncIOMotorClient[dict[str, Any]]"],
    HealthCheckDSN[HealthCheckResult, MongoParseDsnResult],
):
    """A class to perform health checks on MongoDB.

    Attributes:
        _auth_source: The MongoDB authentication source.
        _database: The MongoDB database to use.
        _hosts: The MongoDB host or a list of hosts.
        _name: The name of the health check.
        _password: The MongoDB password.
        _port: The MongoDB port.
        _timeout: The timeout for the health check.
        _user: The MongoDB user.
    """

    __slots__ = (*_CLIENT_CACHING_SLOTS, "_config", "_name")

    _config: MongoConfig
    _name: str
    _client: AsyncIOMotorClient[dict[str, Any]] | None
    _client_loop: asyncio.AbstractEventLoop | None

    def __init__(
        self,
        *,
        config: MongoConfig | None = None,
        name: str = "MongoDB",
        close_client_fn: Callable[
            [AsyncIOMotorClient[dict[str, Any]]],
            Awaitable[None],
        ] = _close_mongo_client,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the MongoHealthCheck.

        Args:
            config: Connection config. If None, built from kwargs (hosts, port, etc.).
            name: The name of the health check.
            close_client_fn: Callable to close the cached client.
            **kwargs: Passed to MongoConfig when config is None.
        """
        if config is None:
            config = MongoConfig(**kwargs)
        self._config = config
        self._name = name
        super().__init__(close_client_fn=close_client_fn)

    def _create_client(self) -> AsyncIOMotorClient[dict[str, Any]]:
        c = self._config
        return AsyncIOMotorClient(
            host=c.hosts,
            port=c.port,
            username=c.user,
            password=c.password,
            authSource=c.auth_source,
            serverSelectionTimeoutMS=int(c.timeout * 1000),
        )

    @classmethod
    def _allowed_schemes(cls) -> tuple[str, ...]:
        return ("mongodb", "mongodb+srv")

    @classmethod
    def _default_name(cls) -> str:
        return "MongoDB"

    @classmethod
    def parse_dsn(cls, dsn: str) -> MongoParseDsnResult:
        """Parse the DSN and return the results.

        Args:
            dsn: The DSN to parse.

        Returns:
            MongoParseDsnResult: The results of parsing the DSN.
        """
        parse_result = urlsplit(dsn)
        query = parse_query_string(parse_result.query)
        return {"parse_result": parse_result, "authSource": query.get("authSource", "admin")}

    @classmethod
    def _from_parsed_dsn(
        cls,
        parsed: MongoParseDsnResult,
        *,
        name: str = "MongoDB",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **_kwargs: object,
    ) -> MongoHealthCheck:
        parse_result = parsed["parse_result"]
        hosts: str | list[str]
        port: int | None
        if "," in parse_result.netloc:
            hosts = parse_result.netloc.split("@")[-1].split(",")
            port = None
        else:
            hosts = parse_result.hostname or "localhost"
            port = parse_result.port or 27017
        config = MongoConfig(
            hosts=hosts,
            port=port,
            user=parse_result.username,
            password=parse_result.password,
            database=parse_result.path.lstrip("/") or None,
            auth_source=parsed["authSource"],
            timeout=timeout,
        )
        return cls(config=config, name=name)

    @healthcheck_safe(invalidate_on_error=True)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check on MongoDB.

        Returns:
            HealthCheckResult: The result of the health check.
        """
        client = await self._ensure_client()
        database = client[self._config.database] if self._config.database else client[self._config.auth_source]
        res = await database.command("ping")
        ok_raw = res.get("ok")
        ok_value = ok_raw if isinstance(ok_raw, (bool, int, float)) else 0
        if int(ok_value) == 1:
            return HealthCheckResult(name=self._name, healthy=True)
        return result_unhealthy_dependency(
            name=self._name,
            message="MongoDB ping returned unhealthy status",
            meta={"ok": ok_raw if isinstance(ok_raw, (bool, int, float, str)) else str(ok_raw)},
        )

__call__() async

Perform the health check on MongoDB.

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/mongo.py
@healthcheck_safe(invalidate_on_error=True)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check on MongoDB.

    Returns:
        HealthCheckResult: The result of the health check.
    """
    client = await self._ensure_client()
    database = client[self._config.database] if self._config.database else client[self._config.auth_source]
    res = await database.command("ping")
    ok_raw = res.get("ok")
    ok_value = ok_raw if isinstance(ok_raw, (bool, int, float)) else 0
    if int(ok_value) == 1:
        return HealthCheckResult(name=self._name, healthy=True)
    return result_unhealthy_dependency(
        name=self._name,
        message="MongoDB ping returned unhealthy status",
        meta={"ok": ok_raw if isinstance(ok_raw, (bool, int, float, str)) else str(ok_raw)},
    )

__init__(*, config=None, name='MongoDB', close_client_fn=_close_mongo_client, **kwargs)

Initialize the MongoHealthCheck.

Parameters:

Name Type Description Default
config MongoConfig | None

Connection config. If None, built from kwargs (hosts, port, etc.).

None
name str

The name of the health check.

'MongoDB'
close_client_fn Callable[[AsyncIOMotorClient[dict[str, Any]]], Awaitable[None]]

Callable to close the cached client.

_close_mongo_client
**kwargs Any

Passed to MongoConfig when config is None.

{}
Source code in fast_healthchecks/checks/mongo.py
def __init__(
    self,
    *,
    config: MongoConfig | None = None,
    name: str = "MongoDB",
    close_client_fn: Callable[
        [AsyncIOMotorClient[dict[str, Any]]],
        Awaitable[None],
    ] = _close_mongo_client,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the MongoHealthCheck.

    Args:
        config: Connection config. If None, built from kwargs (hosts, port, etc.).
        name: The name of the health check.
        close_client_fn: Callable to close the cached client.
        **kwargs: Passed to MongoConfig when config is None.
    """
    if config is None:
        config = MongoConfig(**kwargs)
    self._config = config
    self._name = name
    super().__init__(close_client_fn=close_client_fn)

parse_dsn(dsn) classmethod

Parse the DSN and return the results.

Parameters:

Name Type Description Default
dsn str

The DSN to parse.

required

Returns:

Name Type Description
MongoParseDsnResult MongoParseDsnResult

The results of parsing the DSN.

Source code in fast_healthchecks/checks/mongo.py
@classmethod
def parse_dsn(cls, dsn: str) -> MongoParseDsnResult:
    """Parse the DSN and return the results.

    Args:
        dsn: The DSN to parse.

    Returns:
        MongoParseDsnResult: The results of parsing the DSN.
    """
    parse_result = urlsplit(dsn)
    query = parse_query_string(parse_result.query)
    return {"parse_result": parse_result, "authSource": query.get("authSource", "admin")}

This module provides a health check class for OpenSearch.

Classes:

Name Description
OpenSearchHealthCheck

A class to perform health checks on OpenSearch.

Usage

The OpenSearchHealthCheck class can be used to perform health checks on OpenSearch by calling it.

Example

health_check = OpenSearchHealthCheck( hosts=["localhost:9200"], http_auth=("username", "password"), use_ssl=True, verify_certs=True, ssl_show_warn=False, ca_certs="/path/to/ca.pem", ) result = await health_check() print(result.healthy)

OpenSearchHealthCheck

Bases: ClientCachingMixin['AsyncOpenSearch'], HealthCheckDSN[HealthCheckResult, OpenSearchParseDsnResult]

A class to perform health checks on OpenSearch.

Attributes:

Name Type Description
_hosts

The OpenSearch hosts.

_name str

The name of the health check.

_http_auth str

The HTTP authentication.

_use_ssl str

Whether to use SSL or not.

_verify_certs str

Whether to verify certificates or not.

_ssl_show_warn str

Whether to show SSL warnings or not.

_ca_certs str

The CA certificates.

_timeout str

The timeout for the health check.

Source code in fast_healthchecks/checks/opensearch.py
@final
class OpenSearchHealthCheck(
    ClientCachingMixin["AsyncOpenSearch"],
    HealthCheckDSN[HealthCheckResult, OpenSearchParseDsnResult],
):
    """A class to perform health checks on OpenSearch.

    Attributes:
        _hosts: The OpenSearch hosts.
        _name: The name of the health check.
        _http_auth: The HTTP authentication.
        _use_ssl: Whether to use SSL or not.
        _verify_certs: Whether to verify certificates or not.
        _ssl_show_warn: Whether to show SSL warnings or not.
        _ca_certs: The CA certificates.
        _timeout: The timeout for the health check.
    """

    __slots__ = (*_CLIENT_CACHING_SLOTS, "_config", "_name")

    _config: OpenSearchConfig
    _name: str
    _client: AsyncOpenSearch | None
    _client_loop: asyncio.AbstractEventLoop | None

    def __init__(
        self,
        *,
        config: OpenSearchConfig | None = None,
        name: str = "OpenSearch",
        close_client_fn: Callable[[AsyncOpenSearch], Awaitable[None]] = _close_opensearch_client,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the OpenSearchHealthCheck.

        Args:
            config: Connection config. If None, built from kwargs (hosts, http_auth, etc.).
            name: The name of the health check.
            close_client_fn: Callable to close the cached client.
            **kwargs: Passed to OpenSearchConfig when config is None.
        """
        if config is None:
            config = OpenSearchConfig(**kwargs)
        self._config = config
        self._name = name
        super().__init__(close_client_fn=close_client_fn)

    def _create_client(self) -> AsyncOpenSearch:
        c = self._config
        return AsyncOpenSearch(
            hosts=c.hosts,
            http_auth=c.http_auth,
            use_ssl=c.use_ssl,
            verify_certs=c.verify_certs,
            ssl_show_warn=c.ssl_show_warn,
            ca_certs=c.ca_certs,
            timeout=c.timeout,
        )

    @classmethod
    def _allowed_schemes(cls) -> tuple[str, ...]:
        return ("http", "https")

    @classmethod
    def _default_name(cls) -> str:
        return "OpenSearch"

    @classmethod
    def parse_dsn(cls, dsn: str) -> OpenSearchParseDsnResult:
        """Parse the OpenSearch DSN and return the results.

        Args:
            dsn: The DSN to parse.

        Returns:
            OpenSearchParseDsnResult: The results of parsing the DSN.

        Raises:
            ValueError: If DSN has missing host.
        """
        parsed = urlsplit(dsn)
        if not parsed.hostname:
            msg = "OpenSearch DSN must include a host"
            raise ValueError(msg) from None

        http_auth: tuple[str, str] | None = None
        if parsed.username is not None:
            http_auth = (unquote(parsed.username), unquote(parsed.password or ""))

        port = parsed.port or (443 if parsed.scheme == "https" else 9200)
        return {
            "hosts": [f"{parsed.hostname}:{port}"],
            "http_auth": http_auth,
            "use_ssl": parsed.scheme == "https",
        }

    @classmethod
    def _from_parsed_dsn(
        cls,
        parsed: OpenSearchParseDsnResult,
        *,
        name: str = "OpenSearch",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **kwargs: object,
    ) -> OpenSearchHealthCheck:
        config = OpenSearchConfig(
            hosts=parsed["hosts"],
            http_auth=parsed["http_auth"],
            use_ssl=parsed["use_ssl"],
            verify_certs=cast("bool", kwargs.get("verify_certs", False)),
            ssl_show_warn=cast("bool", kwargs.get("ssl_show_warn", False)),
            ca_certs=cast("str | None", kwargs.get("ca_certs")),
            timeout=timeout,
        )
        return cls(config=config, name=name)

    @healthcheck_safe(invalidate_on_error=True)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check on OpenSearch.

        Returns:
            HealthCheckResult: The result of the health check.
        """
        client = await self._ensure_client()
        await client.info()
        return HealthCheckResult(name=self._name, healthy=True)

__call__() async

Perform the health check on OpenSearch.

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/opensearch.py
@healthcheck_safe(invalidate_on_error=True)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check on OpenSearch.

    Returns:
        HealthCheckResult: The result of the health check.
    """
    client = await self._ensure_client()
    await client.info()
    return HealthCheckResult(name=self._name, healthy=True)

__init__(*, config=None, name='OpenSearch', close_client_fn=_close_opensearch_client, **kwargs)

Initialize the OpenSearchHealthCheck.

Parameters:

Name Type Description Default
config OpenSearchConfig | None

Connection config. If None, built from kwargs (hosts, http_auth, etc.).

None
name str

The name of the health check.

'OpenSearch'
close_client_fn Callable[[AsyncOpenSearch], Awaitable[None]]

Callable to close the cached client.

_close_opensearch_client
**kwargs Any

Passed to OpenSearchConfig when config is None.

{}
Source code in fast_healthchecks/checks/opensearch.py
def __init__(
    self,
    *,
    config: OpenSearchConfig | None = None,
    name: str = "OpenSearch",
    close_client_fn: Callable[[AsyncOpenSearch], Awaitable[None]] = _close_opensearch_client,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the OpenSearchHealthCheck.

    Args:
        config: Connection config. If None, built from kwargs (hosts, http_auth, etc.).
        name: The name of the health check.
        close_client_fn: Callable to close the cached client.
        **kwargs: Passed to OpenSearchConfig when config is None.
    """
    if config is None:
        config = OpenSearchConfig(**kwargs)
    self._config = config
    self._name = name
    super().__init__(close_client_fn=close_client_fn)

parse_dsn(dsn) classmethod

Parse the OpenSearch DSN and return the results.

Parameters:

Name Type Description Default
dsn str

The DSN to parse.

required

Returns:

Name Type Description
OpenSearchParseDsnResult OpenSearchParseDsnResult

The results of parsing the DSN.

Raises:

Type Description
ValueError

If DSN has missing host.

Source code in fast_healthchecks/checks/opensearch.py
@classmethod
def parse_dsn(cls, dsn: str) -> OpenSearchParseDsnResult:
    """Parse the OpenSearch DSN and return the results.

    Args:
        dsn: The DSN to parse.

    Returns:
        OpenSearchParseDsnResult: The results of parsing the DSN.

    Raises:
        ValueError: If DSN has missing host.
    """
    parsed = urlsplit(dsn)
    if not parsed.hostname:
        msg = "OpenSearch DSN must include a host"
        raise ValueError(msg) from None

    http_auth: tuple[str, str] | None = None
    if parsed.username is not None:
        http_auth = (unquote(parsed.username), unquote(parsed.password or ""))

    port = parsed.port or (443 if parsed.scheme == "https" else 9200)
    return {
        "hosts": [f"{parsed.hostname}:{port}"],
        "http_auth": http_auth,
        "use_ssl": parsed.scheme == "https",
    }

This module provides a health check class for RabbitMQ.

Classes:

Name Description
RabbitMQHealthCheck

A class to perform health checks on RabbitMQ.

Security: When using DSN or config without a password (or with default credentials), the library falls back to user="guest" and password="guest". These defaults are for local development only; do not use them in production or on non-local brokers. See SECURITY.md and :class:RabbitMQConfig docstring.

Usage

The RabbitMQHealthCheck class can be used to perform health checks on RabbitMQ by calling it.

Example

health_check = RabbitMQHealthCheck( host="localhost", port=5672, username="guest", password="guest", ) result = await health_check() print(result.healthy)

RabbitMQHealthCheck

Bases: ClientCachingMixin['AbstractRobustConnection'], HealthCheckDSN[HealthCheckResult, RabbitMQParseDsnResult]

A class to perform health checks on RabbitMQ.

Uses ClientCachingMixin to reuse a single connection instead of opening a new one on every check.

Attributes:

Name Type Description
_host

The RabbitMQ host.

_name str

The name of the health check.

_password str

The RabbitMQ password.

_port str

The RabbitMQ port.

_secure str

Whether to use a secure connection.

_timeout str

The timeout for the health check.

_user str

The RabbitMQ user.

_vhost str

The RabbitMQ virtual host.

Source code in fast_healthchecks/checks/rabbitmq.py
@final
class RabbitMQHealthCheck(
    ClientCachingMixin["AbstractRobustConnection"],
    HealthCheckDSN[HealthCheckResult, RabbitMQParseDsnResult],
):
    """A class to perform health checks on RabbitMQ.

    Uses ClientCachingMixin to reuse a single connection instead of opening
    a new one on every check.

    Attributes:
        _host: The RabbitMQ host.
        _name: The name of the health check.
        _password: The RabbitMQ password.
        _port: The RabbitMQ port.
        _secure: Whether to use a secure connection.
        _timeout: The timeout for the health check.
        _user: The RabbitMQ user.
        _vhost: The RabbitMQ virtual host.
    """

    __slots__ = (*_CLIENT_CACHING_SLOTS, "_config", "_name")

    _config: RabbitMQConfig
    _name: str
    _client: AbstractRobustConnection | None
    _client_loop: asyncio.AbstractEventLoop | None

    def __init__(
        self,
        *,
        config: RabbitMQConfig | None = None,
        name: str = "RabbitMQ",
        close_client_fn: Callable[[AbstractRobustConnection], Awaitable[None]] = _close_rabbitmq_client,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the RabbitMQHealthCheck.

        Args:
            config: Connection config. If None, built from kwargs (host, user, password, etc.).
            name: The name of the health check.
            close_client_fn: Callable to close the cached connection.
            **kwargs: Passed to RabbitMQConfig when config is None.
        """
        if config is None:
            config = RabbitMQConfig(**kwargs)
        self._config = config
        self._name = name
        super().__init__(close_client_fn=close_client_fn)

    def _create_client(self) -> Awaitable[AbstractRobustConnection]:
        c = self._config
        return aio_pika.connect_robust(
            host=c.host,
            port=c.port,
            login=c.user,
            password=c.password,
            ssl=c.secure,
            virtualhost=c.vhost,
            timeout=c.timeout,
        )

    @classmethod
    def _allowed_schemes(cls) -> tuple[str, ...]:
        return ("amqp", "amqps")

    @classmethod
    def _default_name(cls) -> str:
        return "RabbitMQ"

    @classmethod
    def parse_dsn(cls, dsn: str) -> RabbitMQParseDsnResult:
        """Parse the DSN and return the results.

        Args:
            dsn: The DSN to parse.

        Returns:
            RabbitMQParseDsnResult: The results of parsing the DSN.
        """
        parse_result = urlsplit(dsn)
        return {"parse_result": parse_result}

    @classmethod
    def _from_parsed_dsn(
        cls,
        parsed: RabbitMQParseDsnResult,
        *,
        name: str = "RabbitMQ",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **_kwargs: object,
    ) -> RabbitMQHealthCheck:
        parse_result = parsed["parse_result"]
        # Default "guest"/"guest" is development-only; see SECURITY.md and RabbitMQConfig.
        config = RabbitMQConfig(
            host=parse_result.hostname or "localhost",
            user=parse_result.username or "guest",
            password=parse_result.password or "guest",
            port=parse_result.port or 5672,
            vhost=parse_result.path.lstrip("/") or "/",
            secure=parse_result.scheme == "amqps",
            timeout=timeout,
        )
        return cls(config=config, name=name)

    @healthcheck_safe(invalidate_on_error=True)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check on RabbitMQ.

        ClientCachingMixin handles connection persistence; _ensure_client
        validates the connection via aio-pika's robust logic.

        Returns:
            HealthCheckResult: The result of the health check.
        """
        _ = await self._ensure_client()
        return HealthCheckResult(name=self._name, healthy=True)

__call__() async

Perform the health check on RabbitMQ.

ClientCachingMixin handles connection persistence; _ensure_client validates the connection via aio-pika's robust logic.

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/rabbitmq.py
@healthcheck_safe(invalidate_on_error=True)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check on RabbitMQ.

    ClientCachingMixin handles connection persistence; _ensure_client
    validates the connection via aio-pika's robust logic.

    Returns:
        HealthCheckResult: The result of the health check.
    """
    _ = await self._ensure_client()
    return HealthCheckResult(name=self._name, healthy=True)

__init__(*, config=None, name='RabbitMQ', close_client_fn=_close_rabbitmq_client, **kwargs)

Initialize the RabbitMQHealthCheck.

Parameters:

Name Type Description Default
config RabbitMQConfig | None

Connection config. If None, built from kwargs (host, user, password, etc.).

None
name str

The name of the health check.

'RabbitMQ'
close_client_fn Callable[[AbstractRobustConnection], Awaitable[None]]

Callable to close the cached connection.

_close_rabbitmq_client
**kwargs Any

Passed to RabbitMQConfig when config is None.

{}
Source code in fast_healthchecks/checks/rabbitmq.py
def __init__(
    self,
    *,
    config: RabbitMQConfig | None = None,
    name: str = "RabbitMQ",
    close_client_fn: Callable[[AbstractRobustConnection], Awaitable[None]] = _close_rabbitmq_client,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the RabbitMQHealthCheck.

    Args:
        config: Connection config. If None, built from kwargs (host, user, password, etc.).
        name: The name of the health check.
        close_client_fn: Callable to close the cached connection.
        **kwargs: Passed to RabbitMQConfig when config is None.
    """
    if config is None:
        config = RabbitMQConfig(**kwargs)
    self._config = config
    self._name = name
    super().__init__(close_client_fn=close_client_fn)

parse_dsn(dsn) classmethod

Parse the DSN and return the results.

Parameters:

Name Type Description Default
dsn str

The DSN to parse.

required

Returns:

Name Type Description
RabbitMQParseDsnResult RabbitMQParseDsnResult

The results of parsing the DSN.

Source code in fast_healthchecks/checks/rabbitmq.py
@classmethod
def parse_dsn(cls, dsn: str) -> RabbitMQParseDsnResult:
    """Parse the DSN and return the results.

    Args:
        dsn: The DSN to parse.

    Returns:
        RabbitMQParseDsnResult: The results of parsing the DSN.
    """
    parse_result = urlsplit(dsn)
    return {"parse_result": parse_result}

Health check that performs an HTTP GET to a URL.

UrlHealthCheck caches an httpx AsyncClient and supports optional basic auth, SSL verification, and SSRF protection (block_private_hosts).

UrlHealthCheck

Bases: ClientCachingMixin['AsyncClient'], ConfigDictMixin, HealthCheck[HealthCheckResult]

Health check that performs an HTTP GET to a configurable URL.

Supports basic auth, custom timeout, SSL verification, and optional SSRF protection via block_private_hosts (see config).

Source code in fast_healthchecks/checks/url.py
@final
class UrlHealthCheck(ClientCachingMixin["AsyncClient"], ConfigDictMixin, HealthCheck[HealthCheckResult]):
    """Health check that performs an HTTP GET to a configurable URL.

    Supports basic auth, custom timeout, SSL verification, and optional
    SSRF protection via ``block_private_hosts`` (see config).
    """

    __slots__ = (*_CLIENT_CACHING_SLOTS, "_config", "_name")

    _config: UrlConfig
    _name: str
    _client: AsyncClient | None
    _client_loop: asyncio.AbstractEventLoop | None

    @property
    def _auth(self) -> BasicAuth | None:
        c = self._config
        return BasicAuth(c.username, c.password or "") if c.username else None

    @property
    def _transport(self) -> AsyncHTTPTransport:
        return AsyncHTTPTransport(verify=self._config.verify_ssl)

    @property
    def _block_private_hosts(self) -> bool:
        return self._config.block_private_hosts

    def __init__(
        self,
        *,
        config: UrlConfig | None = None,
        name: str = "HTTP",
        close_client_fn: Callable[[AsyncClient], Awaitable[None]] = _close_url_client,
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the health check.

        Warning:
            Pass only trusted URLs from application configuration. Do not use
            user-controlled input for ``url`` to avoid SSRF.

        Args:
            config: Connection config. If None, built from kwargs (url, username, etc.).
            name: The name of the health check.
            close_client_fn: Callable to close the cached client.
            **kwargs: Passed to UrlConfig when config is None (url required).
        """
        if config is None:
            kwargs = dict(kwargs)
            if "url" in kwargs:
                kwargs["url"] = str(kwargs["url"])
            config = UrlConfig(**kwargs)
        validate_url_ssrf(config.url, block_private_hosts=config.block_private_hosts)
        self._config = config
        self._name = name
        super().__init__(close_client_fn=close_client_fn)

    def _create_client(self) -> AsyncClient:
        c = self._config
        transport = AsyncHTTPTransport(verify=c.verify_ssl)
        return AsyncClient(
            auth=self._auth,
            timeout=c.timeout,
            transport=transport,
            follow_redirects=c.follow_redirects,
        )

    @healthcheck_safe(invalidate_on_error=True)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check.

        When block_private_hosts is True, resolves the URL host before the request
        and rejects if it resolves to loopback/private (SSRF/DNS rebinding protection).

        Returns:
            HealthCheckResult: Result with healthy=True if response is success.
        """
        if self._config.block_private_hosts:
            parsed = urlparse(self._config.url)
            host = parsed.hostname or ""
            await validate_host_ssrf_async(host)
        client = await self._ensure_client()
        response: Response = await client.get(self._config.url)
        if response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR or (
            self._config.username and response.status_code in {HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN}
        ):
            response.raise_for_status()
        if response.is_success:
            return HealthCheckResult(name=self._name, healthy=True)
        return result_unhealthy_dependency(
            name=self._name,
            message=f"HTTP dependency returned status {response.status_code}",
            meta={"status_code": response.status_code},
        )

__call__() async

Perform the health check.

When block_private_hosts is True, resolves the URL host before the request and rejects if it resolves to loopback/private (SSRF/DNS rebinding protection).

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

Result with healthy=True if response is success.

Source code in fast_healthchecks/checks/url.py
@healthcheck_safe(invalidate_on_error=True)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check.

    When block_private_hosts is True, resolves the URL host before the request
    and rejects if it resolves to loopback/private (SSRF/DNS rebinding protection).

    Returns:
        HealthCheckResult: Result with healthy=True if response is success.
    """
    if self._config.block_private_hosts:
        parsed = urlparse(self._config.url)
        host = parsed.hostname or ""
        await validate_host_ssrf_async(host)
    client = await self._ensure_client()
    response: Response = await client.get(self._config.url)
    if response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR or (
        self._config.username and response.status_code in {HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN}
    ):
        response.raise_for_status()
    if response.is_success:
        return HealthCheckResult(name=self._name, healthy=True)
    return result_unhealthy_dependency(
        name=self._name,
        message=f"HTTP dependency returned status {response.status_code}",
        meta={"status_code": response.status_code},
    )

__init__(*, config=None, name='HTTP', close_client_fn=_close_url_client, **kwargs)

Initialize the health check.

Warning

Pass only trusted URLs from application configuration. Do not use user-controlled input for url to avoid SSRF.

Parameters:

Name Type Description Default
config UrlConfig | None

Connection config. If None, built from kwargs (url, username, etc.).

None
name str

The name of the health check.

'HTTP'
close_client_fn Callable[[AsyncClient], Awaitable[None]]

Callable to close the cached client.

_close_url_client
**kwargs Any

Passed to UrlConfig when config is None (url required).

{}
Source code in fast_healthchecks/checks/url.py
def __init__(
    self,
    *,
    config: UrlConfig | None = None,
    name: str = "HTTP",
    close_client_fn: Callable[[AsyncClient], Awaitable[None]] = _close_url_client,
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the health check.

    Warning:
        Pass only trusted URLs from application configuration. Do not use
        user-controlled input for ``url`` to avoid SSRF.

    Args:
        config: Connection config. If None, built from kwargs (url, username, etc.).
        name: The name of the health check.
        close_client_fn: Callable to close the cached client.
        **kwargs: Passed to UrlConfig when config is None (url required).
    """
    if config is None:
        kwargs = dict(kwargs)
        if "url" in kwargs:
            kwargs["url"] = str(kwargs["url"])
        config = UrlConfig(**kwargs)
    validate_url_ssrf(config.url, block_private_hosts=config.block_private_hosts)
    self._config = config
    self._name = name
    super().__init__(close_client_fn=close_client_fn)

This module provides a health check class for PostgreSQL using asyncpg.

Classes:

Name Description
PostgreSQLAsyncPGHealthCheck

A class to perform health checks on a PostgreSQL database using asyncpg.

Usage

The PostgreSQLAsyncPGHealthCheck class can be used to perform health checks on a PostgreSQL database by connecting to the database and executing a simple query.

Example

health_check = PostgreSQLAsyncPGHealthCheck( host="localhost", port=5432, user="username", password="password", database="dbname" )

or

health_check = PostgreSQLAsyncPGHealthCheck.from_dsn( "postgresql://username:password@localhost:5432/dbname", ) result = await health_check() print(result.healthy)

PostgreSQLAsyncPGHealthCheck

Bases: BasePostgreSQLHealthCheck[HealthCheckResult]

Health check class for PostgreSQL using asyncpg.

Attributes:

Name Type Description
_name str

The name of the health check.

_host str

The hostname of the PostgreSQL server.

_port str

The port number of the PostgreSQL server.

_user str

The username for authentication.

_password str

The password for authentication.

_database str

The database name.

_ssl str

The SSL context for secure connections.

_direct_tls str

Whether to use direct TLS.

_timeout str

The timeout for the connection.

Source code in fast_healthchecks/checks/postgresql/asyncpg.py
class PostgreSQLAsyncPGHealthCheck(BasePostgreSQLHealthCheck[HealthCheckResult]):
    """Health check class for PostgreSQL using asyncpg.

    Attributes:
        _name: The name of the health check.
        _host: The hostname of the PostgreSQL server.
        _port: The port number of the PostgreSQL server.
        _user: The username for authentication.
        _password: The password for authentication.
        _database: The database name.
        _ssl: The SSL context for secure connections.
        _direct_tls: Whether to use direct TLS.
        _timeout: The timeout for the connection.
    """

    __slots__ = ("_config", "_name")

    _config: PostgresAsyncPGConfig
    _name: str

    def __init__(
        self,
        *,
        config: PostgresAsyncPGConfig | None = None,
        name: str = "PostgreSQL",
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the PostgreSQLAsyncPGHealthCheck.

        Args:
            config: Connection config. If None, built from kwargs (host, port, user, etc.).
            name: The name of the health check.
            **kwargs: Passed to PostgresAsyncPGConfig when config is None.
        """
        if config is None:
            config = PostgresAsyncPGConfig(**kwargs)
        self._config = config
        self._name = name

    @classmethod
    def _from_parsed_dsn(
        cls,
        parsed: "PostgresParseDsnResult",  # noqa: UP037
        *,
        name: str = "PostgreSQL",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **_kwargs: object,
    ) -> PostgreSQLAsyncPGHealthCheck:
        parse_result = parsed["parse_result"]
        sslctx = parsed["sslctx"]
        config = PostgresAsyncPGConfig(
            host=parse_result.hostname or "localhost",
            port=parse_result.port or 5432,
            user=parse_result.username,
            password=parse_result.password,
            database=parse_result.path.lstrip("/"),
            ssl=sslctx,
            direct_tls=parsed["direct_tls"],
            timeout=timeout,
        )
        return cls(config=config, name=name)

    @healthcheck_safe(invalidate_on_error=False)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check.

        Returns:
            HealthCheckResult: The result of the health check.
        """
        c = self._config
        connection: Connection | None = None
        try:
            connection = await asyncpg.connect(
                host=c.host,
                port=c.port,
                user=c.user,
                password=c.password,
                database=c.database,
                timeout=c.timeout,
                ssl=c.ssl,
                direct_tls=c.direct_tls,
            )
            async with connection.transaction(readonly=True):
                healthy: bool = bool(await connection.fetchval("SELECT 1"))
                if healthy:
                    return HealthCheckResult(name=self._name, healthy=True)
                return result_unhealthy_dependency(
                    name=self._name,
                    message="PostgreSQL check query returned unhealthy status",
                )
        finally:
            if connection is not None and not connection.is_closed():
                await connection.close(timeout=c.timeout)

__call__() async

Perform the health check.

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/postgresql/asyncpg.py
@healthcheck_safe(invalidate_on_error=False)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check.

    Returns:
        HealthCheckResult: The result of the health check.
    """
    c = self._config
    connection: Connection | None = None
    try:
        connection = await asyncpg.connect(
            host=c.host,
            port=c.port,
            user=c.user,
            password=c.password,
            database=c.database,
            timeout=c.timeout,
            ssl=c.ssl,
            direct_tls=c.direct_tls,
        )
        async with connection.transaction(readonly=True):
            healthy: bool = bool(await connection.fetchval("SELECT 1"))
            if healthy:
                return HealthCheckResult(name=self._name, healthy=True)
            return result_unhealthy_dependency(
                name=self._name,
                message="PostgreSQL check query returned unhealthy status",
            )
    finally:
        if connection is not None and not connection.is_closed():
            await connection.close(timeout=c.timeout)

__init__(*, config=None, name='PostgreSQL', **kwargs)

Initialize the PostgreSQLAsyncPGHealthCheck.

Parameters:

Name Type Description Default
config PostgresAsyncPGConfig | None

Connection config. If None, built from kwargs (host, port, user, etc.).

None
name str

The name of the health check.

'PostgreSQL'
**kwargs Any

Passed to PostgresAsyncPGConfig when config is None.

{}
Source code in fast_healthchecks/checks/postgresql/asyncpg.py
def __init__(
    self,
    *,
    config: PostgresAsyncPGConfig | None = None,
    name: str = "PostgreSQL",
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the PostgreSQLAsyncPGHealthCheck.

    Args:
        config: Connection config. If None, built from kwargs (host, port, user, etc.).
        name: The name of the health check.
        **kwargs: Passed to PostgresAsyncPGConfig when config is None.
    """
    if config is None:
        config = PostgresAsyncPGConfig(**kwargs)
    self._config = config
    self._name = name

This module provides a health check for PostgreSQL using psycopg.

Classes:

Name Description
PostgreSQLPsycopgHealthCheck

A class for health checking PostgreSQL using psycopg.

Usage

The PostgreSQLPsycopgHealthCheck class can be used to perform health checks on a PostgreSQL database by connecting to the database and executing a simple query.

Example

health_check = PostgreSQLPsycopgHealthCheck( host="localhost", port=5432, user="username", password="password", database="dbname" )

or

health_check = PostgreSQLPsycopgHealthCheck.from_dsn( "postgresql://username:password@localhost:5432/dbname", ) result = await health_check() print(result.healthy)

PostgreSQLPsycopgHealthCheck

Bases: BasePostgreSQLHealthCheck[HealthCheckResult]

Health check class for PostgreSQL using psycopg.

Attributes:

Name Type Description
_name str

The name of the health check.

_host str

The hostname of the PostgreSQL server.

_port str

The port number of the PostgreSQL server.

_user str

The username for authentication.

_password str

The password for authentication.

_database str

The database name.

_sslmode str

The SSL mode to use for the connection.

_sslcert str

The path to the SSL certificate file.

_sslkey str

The path to the SSL key file.

_sslrootcert str

The path to the SSL root certificate file.

_timeout str

The timeout for the health check.

Source code in fast_healthchecks/checks/postgresql/psycopg.py
class PostgreSQLPsycopgHealthCheck(BasePostgreSQLHealthCheck[HealthCheckResult]):
    """Health check class for PostgreSQL using psycopg.

    Attributes:
        _name: The name of the health check.
        _host: The hostname of the PostgreSQL server.
        _port: The port number of the PostgreSQL server.
        _user: The username for authentication.
        _password: The password for authentication.
        _database: The database name.
        _sslmode: The SSL mode to use for the connection.
        _sslcert: The path to the SSL certificate file.
        _sslkey: The path to the SSL key file.
        _sslrootcert: The path to the SSL root certificate file.
        _timeout: The timeout for the health check.
    """

    __slots__ = ("_config", "_name")

    _config: PostgresPsycopgConfig
    _name: str

    def __init__(
        self,
        *,
        config: PostgresPsycopgConfig | None = None,
        name: str = "PostgreSQL",
        **kwargs: Any,  # noqa: ANN401
    ) -> None:
        """Initialize the PostgreSQLPsycopgHealthCheck.

        Args:
            config: Connection config. If None, built from kwargs (host, port, user, etc.).
            name: The name of the health check.
            **kwargs: Passed to PostgresPsycopgConfig when config is None.
        """
        if config is None:
            config = PostgresPsycopgConfig(**kwargs)
        self._config = config
        self._name = name

    @classmethod
    def _from_parsed_dsn(
        cls,
        parsed: "PostgresParseDsnResult",  # noqa: UP037
        *,
        name: str = "PostgreSQL",
        timeout: float = DEFAULT_HC_TIMEOUT,
        **_kwargs: object,
    ) -> PostgreSQLPsycopgHealthCheck:
        parse_result = parsed["parse_result"]
        config = PostgresPsycopgConfig(
            host=parse_result.hostname or "localhost",
            port=parse_result.port or 5432,
            user=parse_result.username,
            password=parse_result.password,
            database=parse_result.path.lstrip("/"),
            sslmode=parsed["sslmode"],
            sslcert=parsed["sslcert"],
            sslkey=parsed["sslkey"],
            sslrootcert=parsed["sslrootcert"],
            timeout=timeout,
        )
        return cls(config=config, name=name)

    @healthcheck_safe(invalidate_on_error=False)
    async def __call__(self) -> HealthCheckResult:
        """Perform the health check.

        Returns:
            HealthCheckResult: The result of the health check.
        """
        c = self._config
        connection: AsyncConnection | None = None
        try:
            connection = await psycopg.AsyncConnection.connect(
                host=c.host,
                port=c.port,
                user=c.user,
                password=c.password,
                dbname=c.database,
                sslmode=c.sslmode,
                sslcert=c.sslcert,
                sslkey=c.sslkey,
                sslrootcert=c.sslrootcert,
                connect_timeout=int(c.timeout),
            )
            async with connection.cursor() as cursor:
                await cursor.execute("SELECT 1")
                healthy: bool = bool(await cursor.fetchone())
                if healthy:
                    return HealthCheckResult(name=self._name, healthy=True)
                return result_unhealthy_dependency(
                    name=self._name,
                    message="PostgreSQL check query returned unhealthy status",
                )
        finally:
            if connection is not None and not connection.closed:
                await connection.cancel_safe(timeout=c.timeout)
                await connection.close()

__call__() async

Perform the health check.

Returns:

Name Type Description
HealthCheckResult HealthCheckResult

The result of the health check.

Source code in fast_healthchecks/checks/postgresql/psycopg.py
@healthcheck_safe(invalidate_on_error=False)
async def __call__(self) -> HealthCheckResult:
    """Perform the health check.

    Returns:
        HealthCheckResult: The result of the health check.
    """
    c = self._config
    connection: AsyncConnection | None = None
    try:
        connection = await psycopg.AsyncConnection.connect(
            host=c.host,
            port=c.port,
            user=c.user,
            password=c.password,
            dbname=c.database,
            sslmode=c.sslmode,
            sslcert=c.sslcert,
            sslkey=c.sslkey,
            sslrootcert=c.sslrootcert,
            connect_timeout=int(c.timeout),
        )
        async with connection.cursor() as cursor:
            await cursor.execute("SELECT 1")
            healthy: bool = bool(await cursor.fetchone())
            if healthy:
                return HealthCheckResult(name=self._name, healthy=True)
            return result_unhealthy_dependency(
                name=self._name,
                message="PostgreSQL check query returned unhealthy status",
            )
    finally:
        if connection is not None and not connection.closed:
            await connection.cancel_safe(timeout=c.timeout)
            await connection.close()

__init__(*, config=None, name='PostgreSQL', **kwargs)

Initialize the PostgreSQLPsycopgHealthCheck.

Parameters:

Name Type Description Default
config PostgresPsycopgConfig | None

Connection config. If None, built from kwargs (host, port, user, etc.).

None
name str

The name of the health check.

'PostgreSQL'
**kwargs Any

Passed to PostgresPsycopgConfig when config is None.

{}
Source code in fast_healthchecks/checks/postgresql/psycopg.py
def __init__(
    self,
    *,
    config: PostgresPsycopgConfig | None = None,
    name: str = "PostgreSQL",
    **kwargs: Any,  # noqa: ANN401
) -> None:
    """Initialize the PostgreSQLPsycopgHealthCheck.

    Args:
        config: Connection config. If None, built from kwargs (host, port, user, etc.).
        name: The name of the health check.
        **kwargs: Passed to PostgresPsycopgConfig when config is None.
    """
    if config is None:
        config = PostgresPsycopgConfig(**kwargs)
    self._config = config
    self._name = name

Integrations

Base for FastAPI, FastStream, and Litestar integrations.

Provides Probe, run_probe(), healthcheck_shutdown(), and helpers to build health routes. Framework-specific routers use these to expose liveness/readiness.

Probe

Bases: NamedTuple

A probe is a collection of health checks that can be run together.

Attributes:

Name Type Description
name str

The name of the probe.

checks Sequence[Check]

A sequence of health checks to run.

summary str | None

A summary of the probe. If not provided, a default summary will be generated.

default_check_timeout_ms int | None

Default per-check timeout (ms) when check timeout is not set.

Source code in fast_healthchecks/integrations/base.py
class Probe(NamedTuple):
    """A probe is a collection of health checks that can be run together.

    Attributes:
        name: The name of the probe.
        checks: A sequence of health checks to run.
        summary: A summary of the probe. If not provided, a default summary will be generated.
        default_check_timeout_ms: Default per-check timeout (ms) when check timeout is not set.
    """

    name: str
    checks: Sequence[Check]
    summary: str | None = None
    default_check_timeout_ms: int | None = None

    @property
    def endpoint_summary(self) -> str:
        """Return a summary for the endpoint.

        If a summary is provided, it will be used. Otherwise, a default summary will be generated.
        """
        if self.summary:
            return self.summary
        title = re.sub(
            pattern=r"[^a-z0-9]+",
            repl=" ",
            string=self.name.lower().capitalize(),
            flags=re.IGNORECASE,
        )
        return f"{title} probe"

endpoint_summary property

Return a summary for the endpoint.

If a summary is provided, it will be used. Otherwise, a default summary will be generated.

ProbeAsgi

An ASGI probe.

Parameters:

Name Type Description Default
probe Probe

The probe to run.

required
options ProbeRouteOptions | None

Route options (handlers, status codes, debug, timeout). When None, defaults from build_probe_route_options() are used.

None
Source code in fast_healthchecks/integrations/base.py
class ProbeAsgi:
    """An ASGI probe.

    Args:
        probe: The probe to run.
        options: Route options (handlers, status codes, debug, timeout).
            When None, defaults from build_probe_route_options() are used.
    """

    __slots__ = (
        "_debug",
        "_exclude_fields",
        "_failure_handler",
        "_failure_status",
        "_probe",
        "_runner",
        "_success_handler",
        "_success_status",
        "_timeout",
    )

    _probe: Probe
    _success_handler: HandlerType
    _failure_handler: HandlerType
    _success_status: int
    _failure_status: int
    _debug: bool
    _exclude_fields: set[str]
    _timeout: float | None

    def __init__(
        self,
        probe: Probe,
        *,
        options: ProbeRouteOptions | None = None,
        runner: ProbeRunner | None = None,
    ) -> None:
        """Initialize the ASGI probe.

        Args:
            probe: The probe to run.
            options: Route options (handlers, status codes, debug, timeout).
                When None, defaults from build_probe_route_options() are used.
            runner: Optional ProbeRunner. When None, uses an internal reporting-mode runner
                with the configured timeout.
        """
        if options is None:
            options = build_probe_route_options()
        params = options.to_route_params()
        self._probe = probe
        self._success_handler = params.success_handler
        self._failure_handler = params.failure_handler
        self._success_status = params.success_status
        self._failure_status = params.failure_status
        self._debug = params.debug
        self._timeout = params.timeout
        self._exclude_fields = {"allow_partial_failure", "error_details"} if not params.debug else set()
        self._runner = runner or _build_default_runner(timeout=params.timeout)

    async def __call__(self) -> tuple[bytes, dict[str, str] | None, int]:
        """Run the probe via run_probe (unified execution and timeout handling).

        Returns:
            A tuple containing the response body, headers, and status code.
        """
        report = await self._runner.run(self._probe)
        return await map_report_to_asgi_http_response(
            report,
            debug=self._debug,
            exclude_fields=self._exclude_fields,
            success_status=self._success_status,
            failure_status=self._failure_status,
            success_handler=self._success_handler,
            failure_handler=self._failure_handler,
        )

__call__() async

Run the probe via run_probe (unified execution and timeout handling).

Returns:

Type Description
tuple[bytes, dict[str, str] | None, int]

A tuple containing the response body, headers, and status code.

Source code in fast_healthchecks/integrations/base.py
async def __call__(self) -> tuple[bytes, dict[str, str] | None, int]:
    """Run the probe via run_probe (unified execution and timeout handling).

    Returns:
        A tuple containing the response body, headers, and status code.
    """
    report = await self._runner.run(self._probe)
    return await map_report_to_asgi_http_response(
        report,
        debug=self._debug,
        exclude_fields=self._exclude_fields,
        success_status=self._success_status,
        failure_status=self._failure_status,
        success_handler=self._success_handler,
        failure_handler=self._failure_handler,
    )

__init__(probe, *, options=None, runner=None)

Initialize the ASGI probe.

Parameters:

Name Type Description Default
probe Probe

The probe to run.

required
options ProbeRouteOptions | None

Route options (handlers, status codes, debug, timeout). When None, defaults from build_probe_route_options() are used.

None
runner ProbeRunner | None

Optional ProbeRunner. When None, uses an internal reporting-mode runner with the configured timeout.

None
Source code in fast_healthchecks/integrations/base.py
def __init__(
    self,
    probe: Probe,
    *,
    options: ProbeRouteOptions | None = None,
    runner: ProbeRunner | None = None,
) -> None:
    """Initialize the ASGI probe.

    Args:
        probe: The probe to run.
        options: Route options (handlers, status codes, debug, timeout).
            When None, defaults from build_probe_route_options() are used.
        runner: Optional ProbeRunner. When None, uses an internal reporting-mode runner
            with the configured timeout.
    """
    if options is None:
        options = build_probe_route_options()
    params = options.to_route_params()
    self._probe = probe
    self._success_handler = params.success_handler
    self._failure_handler = params.failure_handler
    self._success_status = params.success_status
    self._failure_status = params.failure_status
    self._debug = params.debug
    self._timeout = params.timeout
    self._exclude_fields = {"allow_partial_failure", "error_details"} if not params.debug else set()
    self._runner = runner or _build_default_runner(timeout=params.timeout)

ProbeRouteOptions

Bases: NamedTuple

Options for probe routes. Combines handler params and path prefix.

Source code in fast_healthchecks/integrations/base.py
class ProbeRouteOptions(NamedTuple):
    """Options for probe routes. Combines handler params and path prefix."""

    success_handler: HandlerType
    failure_handler: HandlerType
    success_status: int
    failure_status: int
    debug: bool
    timeout: float | None
    prefix: str

    def to_route_params(self) -> ProbeRouteParams:
        """Return ProbeRouteParams for create_probe_route_handler."""
        return ProbeRouteParams(
            success_handler=self.success_handler,
            failure_handler=self.failure_handler,
            success_status=self.success_status,
            failure_status=self.failure_status,
            debug=self.debug,
            timeout=self.timeout,
        )

to_route_params()

Return ProbeRouteParams for create_probe_route_handler.

Source code in fast_healthchecks/integrations/base.py
def to_route_params(self) -> ProbeRouteParams:
    """Return ProbeRouteParams for create_probe_route_handler."""
    return ProbeRouteParams(
        success_handler=self.success_handler,
        failure_handler=self.failure_handler,
        success_status=self.success_status,
        failure_status=self.failure_status,
        debug=self.debug,
        timeout=self.timeout,
    )

ProbeRouteParams

Bases: NamedTuple

Parameters for probe route handlers. Used by framework integrations.

Source code in fast_healthchecks/integrations/base.py
class ProbeRouteParams(NamedTuple):
    """Parameters for probe route handlers. Used by framework integrations."""

    success_handler: HandlerType
    failure_handler: HandlerType
    success_status: int
    failure_status: int
    debug: bool
    timeout: float | None

    def to_options(self, prefix: str = "/health") -> ProbeRouteOptions:
        """Return ProbeRouteOptions with the given prefix."""
        return ProbeRouteOptions(
            success_handler=self.success_handler,
            failure_handler=self.failure_handler,
            success_status=self.success_status,
            failure_status=self.failure_status,
            debug=self.debug,
            timeout=self.timeout,
            prefix=prefix,
        )

to_options(prefix='/health')

Return ProbeRouteOptions with the given prefix.

Source code in fast_healthchecks/integrations/base.py
def to_options(self, prefix: str = "/health") -> ProbeRouteOptions:
    """Return ProbeRouteOptions with the given prefix."""
    return ProbeRouteOptions(
        success_handler=self.success_handler,
        failure_handler=self.failure_handler,
        success_status=self.success_status,
        failure_status=self.failure_status,
        debug=self.debug,
        timeout=self.timeout,
        prefix=prefix,
    )

build_health_routes(probes, add_route, *, options=None)

Build health route entries for framework integrations.

Used by Litestar and FastStream health() functions. When options is None, uses build_probe_route_options() defaults.

Parameters:

Name Type Description Default
probes Iterable[Probe]

Probes to build routes for.

required
add_route Callable[[Probe, ProbeRouteOptions], _T]

Callback (probe, options) -> route entry for the framework.

required
options ProbeRouteOptions | None

Route options. When None, defaults from build_probe_route_options().

None

Returns:

Type Description
list[_T]

List of route entries produced by add_route for each probe.

Source code in fast_healthchecks/integrations/base.py
def build_health_routes(
    probes: Iterable[Probe],
    add_route: Callable[[Probe, ProbeRouteOptions], _T],
    *,
    options: ProbeRouteOptions | None = None,
) -> list[_T]:
    """Build health route entries for framework integrations.

    Used by Litestar and FastStream health() functions. When options is None,
    uses build_probe_route_options() defaults.

    Args:
        probes: Probes to build routes for.
        add_route: Callback (probe, options) -> route entry for the framework.
        options: Route options. When None, defaults from build_probe_route_options().

    Returns:
        List of route entries produced by add_route for each probe.
    """
    if options is None:
        options = build_probe_route_options()
    return _build_health_routes(probes, add_route=add_route, options=options)

build_probe_route_options(*, success_handler=default_handler, failure_handler=default_handler, success_status=HTTPStatus.NO_CONTENT, failure_status=HTTPStatus.SERVICE_UNAVAILABLE, debug=False, prefix='/health', timeout=None)

Build ProbeRouteOptions with defaults. Used by health() and _add_probe_route.

Parameters:

Name Type Description Default
success_handler HandlerType

Handler for healthy responses. Receives ProbeAsgiResponse.

default_handler
failure_handler HandlerType

Handler for unhealthy responses. Same signature.

default_handler
success_status int

HTTP status for healthy (default 204 No Content).

NO_CONTENT
failure_status int

HTTP status for unhealthy (default 503).

SERVICE_UNAVAILABLE
debug bool

Include check details in responses.

False
prefix str

URL prefix for probe routes (e.g. "/health").

'/health'
timeout float | None

Max seconds for all checks; on exceed returns failure. None = no limit.

None

Returns:

Type Description
ProbeRouteOptions

ProbeRouteOptions for use with HealthcheckRouter or health().

Source code in fast_healthchecks/integrations/base.py
def build_probe_route_options(  # noqa: PLR0913
    *,
    success_handler: HandlerType = default_handler,
    failure_handler: HandlerType = default_handler,
    success_status: int = HTTPStatus.NO_CONTENT,
    failure_status: int = HTTPStatus.SERVICE_UNAVAILABLE,
    debug: bool = False,
    prefix: str = "/health",
    timeout: float | None = None,
) -> ProbeRouteOptions:
    """Build ProbeRouteOptions with defaults. Used by health() and _add_probe_route.

    Args:
        success_handler: Handler for healthy responses. Receives ProbeAsgiResponse.
        failure_handler: Handler for unhealthy responses. Same signature.
        success_status: HTTP status for healthy (default 204 No Content).
        failure_status: HTTP status for unhealthy (default 503).
        debug: Include check details in responses.
        prefix: URL prefix for probe routes (e.g. "/health").
        timeout: Max seconds for all checks; on exceed returns failure. None = no limit.

    Returns:
        ProbeRouteOptions for use with HealthcheckRouter or health().
    """
    return ProbeRouteOptions(
        success_handler=success_handler,
        failure_handler=failure_handler,
        success_status=success_status,
        failure_status=failure_status,
        debug=debug,
        timeout=timeout,
        prefix=prefix,
    )

close_probes(probes) async

Close resources owned by checks in the given probes.

Calls aclose() on each check that has it (e.g. checks with cached clients). Ignores exceptions so one failure does not block others. After closing, yields to the event loop a few times so that any transport/socket cleanup callbacks (e.g. from aiohttp connector) can run before the caller's context is torn down (avoids unclosed-resource warnings in tests).

Parameters:

Name Type Description Default
probes Iterable[Probe]

Probes whose checks should be closed.

required
Source code in fast_healthchecks/integrations/base.py
async def close_probes(probes: Iterable[Probe]) -> None:
    """Close resources owned by checks in the given probes.

    Calls ``aclose()`` on each check that has it (e.g. checks with cached
    clients). Ignores exceptions so one failure does not block others.
    After closing, yields to the event loop a few times so that any
    transport/socket cleanup callbacks (e.g. from aiohttp connector) can run
    before the caller's context is torn down (avoids unclosed-resource
    warnings in tests).

    Args:
        probes: Probes whose checks should be closed.
    """
    for probe in probes:
        for check in probe.checks:
            aclose = getattr(check, "aclose", None)
            if callable(aclose):
                with contextlib.suppress(Exception):
                    maybe_awaitable = aclose()
                    if isinstance(maybe_awaitable, Awaitable):
                        await maybe_awaitable
    # aiohttp (opensearch-py) may schedule cleanup across multiple loop turns.
    # Yield a few times so transport/socket finalizers run before teardown.
    await asyncio.sleep(0.1)

create_probe_route_handler(probe, params, *, response_factory, runner=None)

Create an async handler for a probe route.

Framework integrations use this with their response_factory to build the handler, then register it (FastAPI add_api_route, FastStream/Litestar return).

Parameters:

Name Type Description Default
probe Probe

The probe to run when the route is called.

required
params ProbeRouteParams

Route params (handlers, status codes, etc.).

required
response_factory Callable[[bytes, dict[str, str], int], _T]

Called with (body, headers, status_code); returns framework response.

required
runner ProbeRunner | None

Optional ProbeRunner used for probe execution.

None

Returns:

Type Description
Callable[[], Awaitable[_T]]

Async callable that runs the probe and returns the framework response.

Source code in fast_healthchecks/integrations/base.py
def create_probe_route_handler(
    probe: Probe,
    params: ProbeRouteParams,
    *,
    response_factory: Callable[[bytes, dict[str, str], int], _T],
    runner: ProbeRunner | None = None,
) -> Callable[[], Awaitable[_T]]:
    """Create an async handler for a probe route.

    Framework integrations use this with their response_factory to build
    the handler, then register it (FastAPI add_api_route, FastStream/Litestar return).

    Args:
        probe: The probe to run when the route is called.
        params: Route params (handlers, status codes, etc.).
        response_factory: Called with (body, headers, status_code); returns framework response.
        runner: Optional ProbeRunner used for probe execution.

    Returns:
        Async callable that runs the probe and returns the framework response.
    """
    probe_asgi = make_probe_asgi(probe, options=params.to_options(), runner=runner)

    async def handler() -> _T:
        content, headers, status_code = await probe_asgi()
        return response_factory(content, headers or {}, status_code)

    return handler

default_handler(response) async

Default handler for health check route.

Returns a minimal body {"status": "healthy"|"unhealthy"} for responses that require content (e.g. 503). Returns None for 204 No Content.

Parameters:

Name Type Description Default
response ProbeAsgiResponse

The response from the probe.

required

Returns:

Type Description
dict[str, Any] | None

Minimal status dict, or None for no response body.

Source code in fast_healthchecks/integrations/base.py
async def default_handler(response: ProbeAsgiResponse) -> dict[str, Any] | None:
    """Default handler for health check route.

    Returns a minimal body ``{"status": "healthy"|"unhealthy"}`` for responses
    that require content (e.g. 503). Returns ``None`` for 204 No Content.

    Args:
        response: The response from the probe.

    Returns:
        Minimal status dict, or None for no response body.
    """
    await asyncio.sleep(0)
    return {"status": "healthy" if response.healthy else "unhealthy"}

healthcheck_shutdown(probes)

Return an async shutdown callback that closes the given probes' checks.

Use this with framework lifespan/shutdown hooks (e.g. Litestar on_shutdown, FastStream shutdown) so that health check resources are closed on app shutdown.

Parameters:

Name Type Description Default
probes Iterable[Probe]

The same probes passed to your health routes.

required

Returns:

Type Description
Callable[[], Awaitable[None]]

An async callable with no arguments that closes all checks with aclose().

Source code in fast_healthchecks/integrations/base.py
def healthcheck_shutdown(probes: Iterable[Probe]) -> Callable[[], Awaitable[None]]:
    """Return an async shutdown callback that closes the given probes' checks.

    Use this with framework lifespan/shutdown hooks (e.g. Litestar ``on_shutdown``,
    FastStream shutdown) so that health check resources are closed on app shutdown.

    Args:
        probes: The same probes passed to your health routes.

    Returns:
        An async callable with no arguments that closes all checks with ``aclose()``.
    """

    async def _shutdown() -> None:
        await close_probes(probes)

    return _shutdown

make_probe_asgi(probe, *, options=None, runner=None)

Create an ASGI probe from a probe.

Parameters:

Name Type Description Default
probe Probe

The probe to create the ASGI probe from.

required
options ProbeRouteOptions | None

Route options. When None, defaults from build_probe_route_options().

None
runner ProbeRunner | None

Optional ProbeRunner. When None, uses an internal reporting-mode runner.

None

Returns:

Type Description
Callable[[], Awaitable[tuple[bytes, dict[str, str] | None, int]]]

An ASGI probe.

Source code in fast_healthchecks/integrations/base.py
def make_probe_asgi(
    probe: Probe,
    *,
    options: ProbeRouteOptions | None = None,
    runner: ProbeRunner | None = None,
) -> Callable[[], Awaitable[tuple[bytes, dict[str, str] | None, int]]]:
    """Create an ASGI probe from a probe.

    Args:
        probe: The probe to create the ASGI probe from.
        options: Route options. When None, defaults from build_probe_route_options().
        runner: Optional ProbeRunner. When None, uses an internal reporting-mode runner.

    Returns:
        An ASGI probe.
    """
    return ProbeAsgi(probe, options=options, runner=runner)

probe_path_suffix(probe)

Return the path suffix for a probe (name without leading slash).

Source code in fast_healthchecks/integrations/base.py
def probe_path_suffix(probe: Probe) -> str:
    """Return the path suffix for a probe (name without leading slash)."""
    return probe.name.removeprefix("/")

probe_route_path(probe, prefix='/health')

Return the route path for a probe given a prefix.

Source code in fast_healthchecks/integrations/base.py
def probe_route_path(probe: Probe, prefix: str = "/health") -> str:
    """Return the route path for a probe given a prefix."""
    return f"{prefix.removesuffix('/')}/{probe_path_suffix(probe)}"

run_probe(probe, *, timeout=None, on_check_start=None, on_check_end=None, on_timeout_return_failure=False) async

Run a probe and return the health check report.

Can be used without ASGI (CLI, cron, tests). ProbeAsgi uses this with on_timeout_return_failure=True so timeout behavior is unified.

When on_check_start or on_check_end are provided, checks run sequentially (for ordering guarantees). Otherwise they run in parallel.

Cleanup and cancellation: On cancellation or timeout, run_probe does not close cached clients (checks with aclose). The caller must call healthcheck_shutdown(probes) or close_probes(probes) to close them. There are no dangling background tasks after run_probe returns or raises. See lifecycle and run-probe docs for cleanup paths X and Y.

Timeout semantics (probe-level only): When timeout is exceeded, all pending checks are cancelled. Mode A (on_timeout_return_failure=False): raise TimeoutError, no report. Mode B (on_timeout_return_failure=True): return HealthCheckReport with failed results for timed-out checks. ProbeAsgi uses Mode B. See docs run-probe.md for full semantics.

Parameters:

Name Type Description Default
probe Probe

The probe to run.

required
timeout float | None

Maximum seconds for all checks. Raises asyncio.TimeoutError if exceeded unless on_timeout_return_failure is True.

None
on_check_start OnCheckStart | None

Optional callback before each check runs. Receives (check, index).

None
on_check_end OnCheckEnd | None

Optional callback after each check completes. Receives (check, index, result).

None
on_timeout_return_failure bool

If True, on timeout return a report with failed results instead of raising TimeoutError.

False

Returns:

Type Description
HealthCheckReport

HealthCheckReport with results from all checks.

Raises:

Type Description
HealthCheckTimeoutError

When timeout is exceeded and on_timeout_return_failure is False. (Subclass of asyncio.TimeoutError; existing except TimeoutError still works.)

Source code in fast_healthchecks/integrations/base.py
async def run_probe(
    probe: Probe,
    *,
    timeout: float | None = None,
    on_check_start: OnCheckStart | None = None,
    on_check_end: OnCheckEnd | None = None,
    on_timeout_return_failure: bool = False,
) -> HealthCheckReport:
    """Run a probe and return the health check report.

    Can be used without ASGI (CLI, cron, tests). ProbeAsgi uses this with
    on_timeout_return_failure=True so timeout behavior is unified.

    When ``on_check_start`` or ``on_check_end`` are provided, checks run
    sequentially (for ordering guarantees). Otherwise they run in parallel.

    **Cleanup and cancellation:** On cancellation or timeout, run_probe does not
    close cached clients (checks with ``aclose``). The caller must call
    ``healthcheck_shutdown(probes)`` or ``close_probes(probes)`` to close them.
    There are no dangling background tasks after run_probe returns or raises.
    See lifecycle and run-probe docs for cleanup paths X and Y.

    **Timeout semantics (probe-level only):** When ``timeout`` is exceeded,
    all pending checks are cancelled. Mode A (on_timeout_return_failure=False):
    raise TimeoutError, no report. Mode B (on_timeout_return_failure=True):
    return HealthCheckReport with failed results for timed-out checks.
    ProbeAsgi uses Mode B. See docs run-probe.md for full semantics.

    Args:
        probe: The probe to run.
        timeout: Maximum seconds for all checks. Raises asyncio.TimeoutError if exceeded
            unless on_timeout_return_failure is True.
        on_check_start: Optional callback before each check runs. Receives (check, index).
        on_check_end: Optional callback after each check completes. Receives (check, index, result).
        on_timeout_return_failure: If True, on timeout return a report with failed results
            instead of raising TimeoutError.

    Returns:
        HealthCheckReport with results from all checks.

    Raises:
        HealthCheckTimeoutError: When timeout is exceeded and on_timeout_return_failure is False.
            (Subclass of asyncio.TimeoutError; existing ``except TimeoutError`` still works.)
    """
    get_probe_logger().log(
        logging.INFO,
        "probe_start",
        probe=probe.name,
        checks_count=len(probe.checks),
    )
    try:
        if on_check_start is None and on_check_end is None:
            results = await _gather_check_results(
                probe,
                timeout=timeout,
                on_timeout_return_failure=on_timeout_return_failure,
            )
        else:

            async def _run_with_hooks() -> list[HealthCheckResult]:
                out: list[HealthCheckResult] = []
                for i, check in enumerate(probe.checks):
                    if on_check_start is not None:
                        await on_check_start(check, i)
                    result = await _run_check_safe(check, i)
                    if on_check_end is not None:
                        await on_check_end(check, i, result)
                    out.append(result)
                return out

            try:
                if timeout is not None:
                    results = await asyncio.wait_for(_run_with_hooks(), timeout=timeout)
                else:
                    results = await _run_with_hooks()
            except asyncio.TimeoutError:
                if on_timeout_return_failure:
                    results = [
                        HealthCheckResult(
                            name=_get_check_name(check, i),
                            healthy=False,
                            error=map_exception_to_health_error(
                                HealthCheckTimeoutError("Probe timed out"),
                                code=PROBE_TIMEOUT,
                            ),
                        )
                        for i, check in enumerate(probe.checks)
                    ]
                else:
                    raise HealthCheckTimeoutError(code=PROBE_TIMEOUT) from None

        report = HealthCheckReport(
            results=results,
        )
        get_probe_logger().log(
            logging.INFO,
            "probe_end",
            probe=probe.name,
            healthy=report.healthy,
            results_summary=[(r.name, r.healthy) for r in results],
        )
        return report
    finally:
        # Cleanup of cached clients is not done here; caller must call
        # healthcheck_shutdown(probes) or close_probes(probes). See lifecycle docs.
        pass

FastAPI integration for health checks.

HealthcheckRouter

Bases: APIRouter

A router for health checks.

Parameters:

Name Type Description Default
*probes Probe

Probes to run (e.g. liveness, readiness, startup).

()
options ProbeRouteOptions | None

Route options. When None, uses build_probe_route_options() defaults.

None

To close health check resources (e.g. cached clients) on app shutdown, call await router.close() from your FastAPI lifespan, or use healthcheck_shutdown(probes) and call the returned callback.

Source code in fast_healthchecks/integrations/fastapi.py
class HealthcheckRouter(APIRouter):
    """A router for health checks.

    Args:
        *probes: Probes to run (e.g. liveness, readiness, startup).
        options: Route options. When None, uses build_probe_route_options() defaults.

    To close health check resources (e.g. cached clients) on app shutdown,
    call ``await router.close()`` from your FastAPI lifespan, or use
    ``healthcheck_shutdown(probes)`` and call the returned callback.
    """

    def __init__(
        self,
        *probes: Probe,
        options: ProbeRouteOptions | None = None,
        runner: ProbeRunner | None = None,
    ) -> None:
        """Initialize the router."""
        if options is None:
            options = build_probe_route_options()
        super().__init__(prefix=options.prefix.removesuffix("/"), tags=["Healthchecks"])
        self._healthcheck_probes: list[Probe] = list(probes)
        self._runner = runner
        for probe in probes:
            self._add_probe_route(probe, options=options)

    def _add_probe_route(self, probe: Probe, *, options: ProbeRouteOptions) -> None:
        params = options.to_route_params()
        handle_request = create_probe_route_handler(
            probe,
            params,
            response_factory=lambda c, h, s: Response(content=c, status_code=s, headers=h),
            runner=self._runner,
        )

        self.add_api_route(
            path=f"/{probe_path_suffix(probe)}",
            endpoint=handle_request,
            status_code=options.success_status,
            summary=probe.endpoint_summary,
            include_in_schema=options.debug,
            response_model=None,
            response_class=Response,
        )

    async def close(self) -> None:
        """Close resources owned by this router's health check probes.

        Call this from your FastAPI lifespan shutdown (e.g. after ``yield``
        in an ``@asynccontextmanager`` lifespan) so cached clients are closed.
        """
        await close_probes(self._healthcheck_probes)

__init__(*probes, options=None, runner=None)

Initialize the router.

Source code in fast_healthchecks/integrations/fastapi.py
def __init__(
    self,
    *probes: Probe,
    options: ProbeRouteOptions | None = None,
    runner: ProbeRunner | None = None,
) -> None:
    """Initialize the router."""
    if options is None:
        options = build_probe_route_options()
    super().__init__(prefix=options.prefix.removesuffix("/"), tags=["Healthchecks"])
    self._healthcheck_probes: list[Probe] = list(probes)
    self._runner = runner
    for probe in probes:
        self._add_probe_route(probe, options=options)

close() async

Close resources owned by this router's health check probes.

Call this from your FastAPI lifespan shutdown (e.g. after yield in an @asynccontextmanager lifespan) so cached clients are closed.

Source code in fast_healthchecks/integrations/fastapi.py
async def close(self) -> None:
    """Close resources owned by this router's health check probes.

    Call this from your FastAPI lifespan shutdown (e.g. after ``yield``
    in an ``@asynccontextmanager`` lifespan) so cached clients are closed.
    """
    await close_probes(self._healthcheck_probes)

healthcheck_shutdown(probes)

Return an async shutdown callback that closes the given probes' checks.

Use this with framework lifespan/shutdown hooks (e.g. Litestar on_shutdown, FastStream shutdown) so that health check resources are closed on app shutdown.

Parameters:

Name Type Description Default
probes Iterable[Probe]

The same probes passed to your health routes.

required

Returns:

Type Description
Callable[[], Awaitable[None]]

An async callable with no arguments that closes all checks with aclose().

Source code in fast_healthchecks/integrations/base.py
def healthcheck_shutdown(probes: Iterable[Probe]) -> Callable[[], Awaitable[None]]:
    """Return an async shutdown callback that closes the given probes' checks.

    Use this with framework lifespan/shutdown hooks (e.g. Litestar ``on_shutdown``,
    FastStream shutdown) so that health check resources are closed on app shutdown.

    Args:
        probes: The same probes passed to your health routes.

    Returns:
        An async callable with no arguments that closes all checks with ``aclose()``.
    """

    async def _shutdown() -> None:
        await close_probes(probes)

    return _shutdown

FastStream integration for health checks.

health(*probes, options=None, runner=None)

Make list of routes for healthchecks.

Returns:

Type Description
Iterable[tuple[str, ASGIApp]]

Iterable[tuple[str, ASGIApp]]: Generated healthcheck routes.

To close health check resources on app shutdown, pass the same probes to healthcheck_shutdown(probes) and register the returned callback with your FastStream app's shutdown hooks (e.g. @app.on_shutdown).

Source code in fast_healthchecks/integrations/faststream.py
def health(
    *probes: Probe,
    options: ProbeRouteOptions | None = None,
    runner: ProbeRunner | None = None,
) -> Iterable[tuple[str, ASGIApp]]:
    """Make list of routes for healthchecks.

    Returns:
        Iterable[tuple[str, ASGIApp]]: Generated healthcheck routes.

    To close health check resources on app shutdown, pass the same probes
    to ``healthcheck_shutdown(probes)`` and register the returned callback
    with your FastStream app's shutdown hooks (e.g. ``@app.on_shutdown``).
    """
    return build_health_routes(
        probes,
        add_route=lambda probe, route_options: _add_probe_route(probe, route_options, runner=runner),
        options=options,
    )

healthcheck_shutdown(probes)

Return an async shutdown callback that closes the given probes' checks.

Use this with framework lifespan/shutdown hooks (e.g. Litestar on_shutdown, FastStream shutdown) so that health check resources are closed on app shutdown.

Parameters:

Name Type Description Default
probes Iterable[Probe]

The same probes passed to your health routes.

required

Returns:

Type Description
Callable[[], Awaitable[None]]

An async callable with no arguments that closes all checks with aclose().

Source code in fast_healthchecks/integrations/base.py
def healthcheck_shutdown(probes: Iterable[Probe]) -> Callable[[], Awaitable[None]]:
    """Return an async shutdown callback that closes the given probes' checks.

    Use this with framework lifespan/shutdown hooks (e.g. Litestar ``on_shutdown``,
    FastStream shutdown) so that health check resources are closed on app shutdown.

    Args:
        probes: The same probes passed to your health routes.

    Returns:
        An async callable with no arguments that closes all checks with ``aclose()``.
    """

    async def _shutdown() -> None:
        await close_probes(probes)

    return _shutdown

Litestar integration for health checks.

health(*probes, options=None, runner=None)

Make list of routes for healthchecks.

Returns:

Type Description
Iterable[HTTPRouteHandler]

Iterable[HTTPRouteHandler]: Generated healthcheck route handlers.

To close health check resources on app shutdown, pass the same probes to healthcheck_shutdown(probes) and add the returned callback to Litestar's on_shutdown list.

Source code in fast_healthchecks/integrations/litestar.py
def health(
    *probes: Probe,
    options: ProbeRouteOptions | None = None,
    runner: ProbeRunner | None = None,
) -> Iterable[HTTPRouteHandler]:
    """Make list of routes for healthchecks.

    Returns:
        Iterable[HTTPRouteHandler]: Generated healthcheck route handlers.

    To close health check resources on app shutdown, pass the same probes
    to ``healthcheck_shutdown(probes)`` and add the returned callback to
    Litestar's ``on_shutdown`` list.
    """
    return build_health_routes(
        probes,
        add_route=lambda probe, route_options: _add_probe_route(probe, route_options, runner=runner),
        options=options,
    )

healthcheck_shutdown(probes)

Return an async shutdown callback that closes the given probes' checks.

Use this with framework lifespan/shutdown hooks (e.g. Litestar on_shutdown, FastStream shutdown) so that health check resources are closed on app shutdown.

Parameters:

Name Type Description Default
probes Iterable[Probe]

The same probes passed to your health routes.

required

Returns:

Type Description
Callable[[], Awaitable[None]]

An async callable with no arguments that closes all checks with aclose().

Source code in fast_healthchecks/integrations/base.py
def healthcheck_shutdown(probes: Iterable[Probe]) -> Callable[[], Awaitable[None]]:
    """Return an async shutdown callback that closes the given probes' checks.

    Use this with framework lifespan/shutdown hooks (e.g. Litestar ``on_shutdown``,
    FastStream shutdown) so that health check resources are closed on app shutdown.

    Args:
        probes: The same probes passed to your health routes.

    Returns:
        An async callable with no arguments that closes all checks with ``aclose()``.
    """

    async def _shutdown() -> None:
        await close_probes(probes)

    return _shutdown

Utility functions for fast-healthchecks.

maybe_redact(data, *, redact_secrets)

Return data with secrets redacted if requested, otherwise return as-is.

Source code in fast_healthchecks/utils.py
def maybe_redact(data: dict[str, Any], *, redact_secrets: bool) -> dict[str, Any]:
    """Return data with secrets redacted if requested, otherwise return as-is."""
    return redact_secrets_in_dict(data) if redact_secrets else data

parse_query_string(query)

Parse a URL query string into a dictionary.

Keys and values are URL-decoded (unquoted). Pairs without '=' are stored with an empty value. Values containing '=' are preserved.

Parameters:

Name Type Description Default
query str

The query string (e.g. 'key1=value1&key2=value2').

required

Returns:

Type Description
dict[str, str]

A dictionary of key-value pairs.

Source code in fast_healthchecks/utils.py
def parse_query_string(query: str) -> dict[str, str]:
    """Parse a URL query string into a dictionary.

    Keys and values are URL-decoded (unquoted). Pairs without '=' are stored
    with an empty value. Values containing '=' are preserved.

    Args:
        query: The query string (e.g. 'key1=value1&key2=value2').

    Returns:
        A dictionary of key-value pairs.
    """
    if not query:
        return {}
    result: dict[str, str] = {}
    for part in query.split("&"):
        kv = part.split("=", 1)
        key = unquote(kv[0]) if kv[0] else ""
        value = unquote(kv[1]) if len(kv) > 1 else ""
        result[key] = value
    return result

redact_secrets_in_dict(data)

Return a copy of data with secret-like keys redacted recursively.

Source code in fast_healthchecks/utils.py
def redact_secrets_in_dict(data: dict[str, Any]) -> dict[str, Any]:
    """Return a copy of data with secret-like keys redacted recursively."""
    return {key: (REDACT_PLACEHOLDER if key in _SECRET_KEYS else _redact_value(value)) for key, value in data.items()}

validate_host_ssrf_async(host) async

Resolve host to IP(s) and reject if any are loopback/private/reserved (SSRF/DNS rebinding).

Call this before making the request when block_private_hosts=True, so that hostnames that resolve to private IPs (e.g. internal DNS or DNS rebinding) are rejected.

Parameters:

Name Type Description Default
host str

The hostname to resolve and validate.

required

Raises:

Type Description
HealthCheckSSRFError

If any resolved IP is loopback, private, or reserved.

Source code in fast_healthchecks/utils.py
async def validate_host_ssrf_async(host: str) -> None:
    """Resolve host to IP(s) and reject if any are loopback/private/reserved (SSRF/DNS rebinding).

    Call this before making the request when block_private_hosts=True, so that
    hostnames that resolve to private IPs (e.g. internal DNS or DNS rebinding)
    are rejected.

    Args:
        host: The hostname to resolve and validate.

    Raises:
        HealthCheckSSRFError: If any resolved IP is loopback, private, or reserved.
    """
    host = (host or "").strip()
    if not host:
        return
    if host.lower() in {"localhost", "localhost.", "localhost6", "localhost6.localdomain6"}:
        msg = "URL host must not be localhost when block_private_hosts=True"
        raise HealthCheckSSRFError(msg)
    try:
        loop = asyncio.get_running_loop()
        infos = await loop.run_in_executor(None, lambda: socket.getaddrinfo(host, None))
    except OSError:
        # Resolution failed; let the subsequent request fail or handle
        return
    for _family, _type, _proto, _canon, sockaddr in infos:
        if not sockaddr:
            continue
        ip_str = sockaddr[0] if isinstance(sockaddr, (list, tuple)) else getattr(sockaddr, "host", None)
        if not isinstance(ip_str, str) or not ip_str:
            continue
        addr = _parse_ip_safe(ip_str)
        if addr is None:
            continue
        if addr.is_loopback or addr.is_private or addr.is_reserved:
            msg = "URL host must not resolve to loopback or private when block_private_hosts=True"
            raise HealthCheckSSRFError(msg)

validate_url_ssrf(url, *, allowed_schemes=frozenset({'http', 'https'}), block_private_hosts=False)

Validate URL for SSRF-sensitive use (e.g. healthchecks from config).

Parameters:

Name Type Description Default
url str

The URL string to validate.

required
allowed_schemes frozenset[str]

Schemes permitted (default http, https).

frozenset({'http', 'https'})
block_private_hosts bool

If True, reject localhost and private IP ranges.

False

Raises:

Type Description
HealthCheckSSRFError

If scheme is not allowed or host is in blocked range.

Source code in fast_healthchecks/utils.py
def validate_url_ssrf(
    url: str,
    *,
    allowed_schemes: frozenset[str] = frozenset({"http", "https"}),
    block_private_hosts: bool = False,
) -> None:
    """Validate URL for SSRF-sensitive use (e.g. healthchecks from config).

    Args:
        url: The URL string to validate.
        allowed_schemes: Schemes permitted (default http, https).
        block_private_hosts: If True, reject localhost and private IP ranges.

    Raises:
        HealthCheckSSRFError: If scheme is not allowed or host is in blocked range.
    """
    parsed = urlparse(url)
    scheme = (parsed.scheme or "").lower()
    if scheme not in allowed_schemes:
        msg = f"URL scheme must be one of {sorted(allowed_schemes)}, got {scheme!r}"
        raise HealthCheckSSRFError(msg)
    if not block_private_hosts:
        return
    host = (parsed.hostname or "").strip()
    if not host:
        return
    if host.lower() in {"localhost", "localhost.", "localhost6", "localhost6.localdomain6"}:
        msg = "URL host must not be localhost when block_private_hosts=True"
        raise HealthCheckSSRFError(msg)
    addr = _parse_ip_safe(host)
    if addr is None:
        return
    if addr.is_loopback or addr.is_private or addr.is_reserved:
        msg = "URL host must not be loopback or private when block_private_hosts=True"
        raise HealthCheckSSRFError(msg)