Skip to content

Usage

How you mount health checks depends on the framework:

  • FastAPI: Use HealthcheckRouter with one or more Probe instances and pass it to app.include_router().
  • FastStream: Use the health() function from fast_healthchecks.integrations.faststream with your probes and options; it returns the routes to register with the app.
  • Litestar: Use the health() function from fast_healthchecks.integrations.litestar with your probes and options; it returns the routes to register with the app.

Create the health check endpoint dynamically using different conditions. Each condition is a callable, and you can even have dependencies inside it:

"""Probe and check factory helpers for example apps."""

import asyncio
import os
import time
from pathlib import Path
from typing import Any

from dotenv import load_dotenv

from fast_healthchecks.checks.function import FunctionHealthCheck
from fast_healthchecks.checks.kafka import KafkaHealthCheck
from fast_healthchecks.checks.mongo import MongoHealthCheck
from fast_healthchecks.checks.opensearch import OpenSearchHealthCheck
from fast_healthchecks.checks.postgresql.asyncpg import PostgreSQLAsyncPGHealthCheck
from fast_healthchecks.checks.postgresql.psycopg import PostgreSQLPsycopgHealthCheck
from fast_healthchecks.checks.rabbitmq import RabbitMQHealthCheck
from fast_healthchecks.checks.redis import RedisHealthCheck
from fast_healthchecks.checks.types import Check
from fast_healthchecks.checks.url import UrlHealthCheck
from fast_healthchecks.integrations.base import ProbeAsgiResponse

_ = load_dotenv(Path(__file__).parent.parent / ".env")


def sync_dummy_check() -> bool:
    """Run a synchronous dummy check that sleeps briefly and returns True.

    Returns:
        True.
    """
    time.sleep(0.1)
    return True


async def async_dummy_check() -> bool:
    """Run an async dummy check that sleeps briefly and returns True.

    Returns:
        True.
    """
    await asyncio.sleep(0.1)
    return True


async def async_dummy_check_fail() -> bool:
    """Run an async dummy check that raises ValueError.

    Raises:
        ValueError: Always.
    """
    await asyncio.sleep(0)
    msg = "Failed"
    raise ValueError(msg) from None


def get_liveness_checks() -> list[Check]:
    """Return new check instances (one set per app to avoid shared clients across event loops)."""
    return [
        FunctionHealthCheck(func=sync_dummy_check, name="Sync dummy"),
    ]


def get_readiness_checks() -> list[Check]:
    """Return new check instances (one set per app to avoid shared clients across event loops)."""
    return [
        KafkaHealthCheck(
            bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
            name="Kafka",
        ),
        MongoHealthCheck.from_dsn(os.environ["MONGO_DSN"], name="Mongo"),
        OpenSearchHealthCheck(hosts=os.environ["OPENSEARCH_HOSTS"].split(","), name="OpenSearch"),
        PostgreSQLAsyncPGHealthCheck.from_dsn(os.environ["POSTGRES_DSN"], name="PostgreSQL asyncpg"),
        PostgreSQLPsycopgHealthCheck.from_dsn(os.environ["POSTGRES_DSN"], name="PostgreSQL psycopg"),
        RabbitMQHealthCheck.from_dsn(os.environ["RABBITMQ_DSN"], name="RabbitMQ"),
        RedisHealthCheck.from_dsn(os.environ["REDIS_DSN"], name="Redis"),
        UrlHealthCheck(url="https://httpbingo.org/status/200", name="URL 200"),
    ]


def get_startup_checks() -> list[Check]:
    """Return new check instances (one set per app to avoid shared clients across event loops)."""
    return [
        FunctionHealthCheck(func=async_dummy_check, name="Async dummy"),
    ]


def get_readiness_checks_success() -> list[Check]:
    """Return new check instances for success-path tests."""
    return [
        FunctionHealthCheck(func=async_dummy_check, name="Async dummy"),
    ]


def get_readiness_checks_fail() -> list[Check]:
    """Return new check instances for failure-path tests."""
    return [
        FunctionHealthCheck(func=async_dummy_check_fail, name="Async dummy fail"),
    ]


LIVENESS_CHECKS: list[Check] = get_liveness_checks()
READINESS_CHECKS: list[Check] = get_readiness_checks()
STARTUP_CHECKS: list[Check] = get_startup_checks()
READINESS_CHECKS_SUCCESS: list[Check] = get_readiness_checks_success()
READINESS_CHECKS_FAIL: list[Check] = get_readiness_checks_fail()


async def custom_handler(response: ProbeAsgiResponse) -> dict[str, Any] | None:
    """Custom handler for probes.

    Returns:
        Any: Probe response payload.
    """
    await asyncio.sleep(0)
    return response.data
"""Example app for fast-healthchecks."""

from __future__ import annotations

from contextlib import asynccontextmanager
from typing import TYPE_CHECKING

from fastapi import FastAPI, status

if TYPE_CHECKING:
    from collections.abc import AsyncIterator

from examples.probes import (
    custom_handler,
    get_liveness_checks,
    get_readiness_checks,
    get_readiness_checks_fail,
    get_readiness_checks_success,
    get_startup_checks,
)
from fast_healthchecks.integrations.base import Probe, build_probe_route_options
from fast_healthchecks.integrations.fastapi import HealthcheckRouter

router_integration = HealthcheckRouter(
    Probe(
        name="liveness",
        checks=get_liveness_checks(),
    ),
    Probe(
        name="readiness",
        checks=get_readiness_checks(),
    ),
    Probe(
        name="startup",
        checks=get_startup_checks(),
    ),
    options=build_probe_route_options(debug=True, prefix="/health"),
)


@asynccontextmanager
async def lifespan_integration(_app: FastAPI) -> AsyncIterator[None]:
    """Lifespan for integration app: close healthcheck router on shutdown."""
    yield
    await router_integration.close()


app_integration = FastAPI(lifespan=lifespan_integration)
app_integration.include_router(router_integration)

router_success = HealthcheckRouter(
    Probe(
        name="liveness",
        checks=[],
    ),
    Probe(
        name="readiness",
        checks=get_readiness_checks_success(),
    ),
    Probe(
        name="startup",
        checks=[],
    ),
    options=build_probe_route_options(debug=True, prefix="/health"),
)


@asynccontextmanager
async def lifespan_success(_app: FastAPI) -> AsyncIterator[None]:
    """Lifespan for success app: close healthcheck router on shutdown."""
    yield
    await router_success.close()


app_success = FastAPI(lifespan=lifespan_success)
app_success.include_router(router_success)

router_fail = HealthcheckRouter(
    Probe(
        name="liveness",
        checks=[],
    ),
    Probe(
        name="readiness",
        checks=get_readiness_checks_fail(),
    ),
    Probe(
        name="startup",
        checks=[],
    ),
    options=build_probe_route_options(debug=True, prefix="/health"),
)


@asynccontextmanager
async def lifespan_fail(_app: FastAPI) -> AsyncIterator[None]:
    """Lifespan for fail app: close healthcheck router on shutdown."""
    yield
    await router_fail.close()


app_fail = FastAPI(lifespan=lifespan_fail)
app_fail.include_router(router_fail)

router_custom = HealthcheckRouter(
    Probe(
        name="liveness",
        checks=[],
        summary="Check if the application is alive",
    ),
    Probe(
        name="readiness",
        checks=get_readiness_checks_success(),
        summary="Check if the application is ready",
    ),
    Probe(
        name="startup",
        checks=[],
        summary="Check if the application is starting up",
    ),
    options=build_probe_route_options(
        success_handler=custom_handler,
        failure_handler=custom_handler,
        success_status=status.HTTP_200_OK,
        failure_status=status.HTTP_503_SERVICE_UNAVAILABLE,
        debug=True,
        prefix="/custom_health",
    ),
)


@asynccontextmanager
async def lifespan_custom(_app: FastAPI) -> AsyncIterator[None]:
    """Lifespan for custom app: close healthcheck router on shutdown."""
    yield
    await router_custom.close()


app_custom = FastAPI(lifespan=lifespan_custom)
app_custom.include_router(router_custom)
"""Example app for fast-healthchecks."""

import os
from http import HTTPStatus

from faststream.asgi import AsgiFastStream
from faststream.kafka import KafkaBroker

from examples.probes import (
    LIVENESS_CHECKS,
    READINESS_CHECKS,
    READINESS_CHECKS_FAIL,
    READINESS_CHECKS_SUCCESS,
    STARTUP_CHECKS,
    custom_handler,
)
from fast_healthchecks.integrations.base import Probe, build_probe_route_options
from fast_healthchecks.integrations.faststream import health, healthcheck_shutdown

broker = KafkaBroker(
    os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9094,localhost:9095").split(","),
)

_probes_integration = (
    Probe(name="liveness", checks=LIVENESS_CHECKS),
    Probe(name="readiness", checks=READINESS_CHECKS),
    Probe(name="startup", checks=STARTUP_CHECKS),
)
app_integration = AsgiFastStream(
    broker,
    asgi_routes=[
        *health(
            *_probes_integration,
            options=build_probe_route_options(debug=False, prefix="/health"),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_integration)],
)

_probes_success = (
    Probe(name="liveness", checks=[]),
    Probe(name="readiness", checks=READINESS_CHECKS_SUCCESS),
    Probe(name="startup", checks=[]),
)
app_success = AsgiFastStream(
    broker,
    asgi_routes=[
        *health(
            *_probes_success,
            options=build_probe_route_options(debug=False, prefix="/health"),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_success)],
)

_probes_fail = (
    Probe(name="liveness", checks=[]),
    Probe(name="readiness", checks=READINESS_CHECKS_FAIL),
    Probe(name="startup", checks=[]),
)
app_fail = AsgiFastStream(
    broker,
    asgi_routes=[
        *health(
            *_probes_fail,
            options=build_probe_route_options(debug=False, prefix="/health"),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_fail)],
)

_probes_custom = (
    Probe(
        name="liveness",
        checks=[],
        summary="Check if the application is alive",
    ),
    Probe(
        name="readiness",
        checks=READINESS_CHECKS_SUCCESS,
        summary="Check if the application is ready",
    ),
    Probe(
        name="startup",
        checks=[],
        summary="Check if the application is starting up",
    ),
)
app_custom = AsgiFastStream(
    broker,
    asgi_routes=[
        *health(
            *_probes_custom,
            options=build_probe_route_options(
                success_handler=custom_handler,
                failure_handler=custom_handler,
                success_status=HTTPStatus.OK,
                failure_status=HTTPStatus.SERVICE_UNAVAILABLE,
                debug=True,
                prefix="/custom_health",
            ),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_custom)],
)
"""Example app for fast-healthchecks."""

from litestar import Litestar
from litestar.status_codes import HTTP_200_OK, HTTP_503_SERVICE_UNAVAILABLE

from examples.probes import (
    custom_handler,
    get_liveness_checks,
    get_readiness_checks,
    get_readiness_checks_fail,
    get_readiness_checks_success,
    get_startup_checks,
)
from fast_healthchecks.integrations.base import Probe, build_probe_route_options
from fast_healthchecks.integrations.litestar import health, healthcheck_shutdown

_probes_integration = [
    Probe(name="liveness", checks=get_liveness_checks()),
    Probe(name="readiness", checks=get_readiness_checks()),
    Probe(name="startup", checks=get_startup_checks()),
]
app_integration = Litestar(
    route_handlers=[
        *health(
            *_probes_integration,
            options=build_probe_route_options(debug=False, prefix="/health"),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_integration)],
)
app_integration.debug = True  # for integration tests (verbose errors/schema)

_probes_success = [
    Probe(name="liveness", checks=[]),
    Probe(name="readiness", checks=get_readiness_checks_success()),
    Probe(name="startup", checks=[]),
]
app_success = Litestar(
    route_handlers=[
        *health(
            *_probes_success,
            options=build_probe_route_options(debug=False, prefix="/health"),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_success)],
)
app_success.debug = True  # for unit/integration tests (verbose errors/schema)

_probes_fail = [
    Probe(name="liveness", checks=[]),
    Probe(name="readiness", checks=get_readiness_checks_fail()),
    Probe(name="startup", checks=[]),
]
app_fail = Litestar(
    route_handlers=[
        *health(
            *_probes_fail,
            options=build_probe_route_options(debug=False, prefix="/health"),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_fail)],
)

_probes_custom = [
    Probe(
        name="liveness",
        checks=[],
        summary="Check if the application is alive",
    ),
    Probe(
        name="readiness",
        checks=get_readiness_checks_success(),
        summary="Check if the application is ready",
    ),
    Probe(
        name="startup",
        checks=[],
        summary="Check if the application is starting up",
    ),
]
app_custom = Litestar(
    route_handlers=[
        *health(
            *_probes_custom,
            options=build_probe_route_options(
                success_handler=custom_handler,
                failure_handler=custom_handler,
                success_status=HTTP_200_OK,
                failure_status=HTTP_503_SERVICE_UNAVAILABLE,
                debug=True,
                prefix="/custom_health",
            ),
        ),
    ],
    on_shutdown=[healthcheck_shutdown(_probes_custom)],
)

You can find examples for each framework here: