Usage¶
How you mount health checks depends on the framework:
- FastAPI: Use
HealthcheckRouterwith one or moreProbeinstances and pass it toapp.include_router(). - FastStream: Use the
health()function fromfast_healthchecks.integrations.faststreamwith your probes and options; it returns the routes to register with the app. - Litestar: Use the
health()function fromfast_healthchecks.integrations.litestarwith your probes and options; it returns the routes to register with the app.
Create the health check endpoint dynamically using different conditions. Each condition is a callable, and you can even have dependencies inside it:
"""Probe and check factory helpers for example apps."""
import asyncio
import os
import time
from pathlib import Path
from typing import Any
from dotenv import load_dotenv
from fast_healthchecks.checks.function import FunctionHealthCheck
from fast_healthchecks.checks.kafka import KafkaHealthCheck
from fast_healthchecks.checks.mongo import MongoHealthCheck
from fast_healthchecks.checks.opensearch import OpenSearchHealthCheck
from fast_healthchecks.checks.postgresql.asyncpg import PostgreSQLAsyncPGHealthCheck
from fast_healthchecks.checks.postgresql.psycopg import PostgreSQLPsycopgHealthCheck
from fast_healthchecks.checks.rabbitmq import RabbitMQHealthCheck
from fast_healthchecks.checks.redis import RedisHealthCheck
from fast_healthchecks.checks.types import Check
from fast_healthchecks.checks.url import UrlHealthCheck
from fast_healthchecks.integrations.base import ProbeAsgiResponse
_ = load_dotenv(Path(__file__).parent.parent / ".env")
def sync_dummy_check() -> bool:
"""Run a synchronous dummy check that sleeps briefly and returns True.
Returns:
True.
"""
time.sleep(0.1)
return True
async def async_dummy_check() -> bool:
"""Run an async dummy check that sleeps briefly and returns True.
Returns:
True.
"""
await asyncio.sleep(0.1)
return True
async def async_dummy_check_fail() -> bool:
"""Run an async dummy check that raises ValueError.
Raises:
ValueError: Always.
"""
await asyncio.sleep(0)
msg = "Failed"
raise ValueError(msg) from None
def get_liveness_checks() -> list[Check]:
"""Return new check instances (one set per app to avoid shared clients across event loops)."""
return [
FunctionHealthCheck(func=sync_dummy_check, name="Sync dummy"),
]
def get_readiness_checks() -> list[Check]:
"""Return new check instances (one set per app to avoid shared clients across event loops)."""
return [
KafkaHealthCheck(
bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
name="Kafka",
),
MongoHealthCheck.from_dsn(os.environ["MONGO_DSN"], name="Mongo"),
OpenSearchHealthCheck(hosts=os.environ["OPENSEARCH_HOSTS"].split(","), name="OpenSearch"),
PostgreSQLAsyncPGHealthCheck.from_dsn(os.environ["POSTGRES_DSN"], name="PostgreSQL asyncpg"),
PostgreSQLPsycopgHealthCheck.from_dsn(os.environ["POSTGRES_DSN"], name="PostgreSQL psycopg"),
RabbitMQHealthCheck.from_dsn(os.environ["RABBITMQ_DSN"], name="RabbitMQ"),
RedisHealthCheck.from_dsn(os.environ["REDIS_DSN"], name="Redis"),
UrlHealthCheck(url="https://httpbingo.org/status/200", name="URL 200"),
]
def get_startup_checks() -> list[Check]:
"""Return new check instances (one set per app to avoid shared clients across event loops)."""
return [
FunctionHealthCheck(func=async_dummy_check, name="Async dummy"),
]
def get_readiness_checks_success() -> list[Check]:
"""Return new check instances for success-path tests."""
return [
FunctionHealthCheck(func=async_dummy_check, name="Async dummy"),
]
def get_readiness_checks_fail() -> list[Check]:
"""Return new check instances for failure-path tests."""
return [
FunctionHealthCheck(func=async_dummy_check_fail, name="Async dummy fail"),
]
LIVENESS_CHECKS: list[Check] = get_liveness_checks()
READINESS_CHECKS: list[Check] = get_readiness_checks()
STARTUP_CHECKS: list[Check] = get_startup_checks()
READINESS_CHECKS_SUCCESS: list[Check] = get_readiness_checks_success()
READINESS_CHECKS_FAIL: list[Check] = get_readiness_checks_fail()
async def custom_handler(response: ProbeAsgiResponse) -> dict[str, Any] | None:
"""Custom handler for probes.
Returns:
Any: Probe response payload.
"""
await asyncio.sleep(0)
return response.data
"""Example app for fast-healthchecks."""
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
from fastapi import FastAPI, status
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from examples.probes import (
custom_handler,
get_liveness_checks,
get_readiness_checks,
get_readiness_checks_fail,
get_readiness_checks_success,
get_startup_checks,
)
from fast_healthchecks.integrations.base import Probe, build_probe_route_options
from fast_healthchecks.integrations.fastapi import HealthcheckRouter
router_integration = HealthcheckRouter(
Probe(
name="liveness",
checks=get_liveness_checks(),
),
Probe(
name="readiness",
checks=get_readiness_checks(),
),
Probe(
name="startup",
checks=get_startup_checks(),
),
options=build_probe_route_options(debug=True, prefix="/health"),
)
@asynccontextmanager
async def lifespan_integration(_app: FastAPI) -> AsyncIterator[None]:
"""Lifespan for integration app: close healthcheck router on shutdown."""
yield
await router_integration.close()
app_integration = FastAPI(lifespan=lifespan_integration)
app_integration.include_router(router_integration)
router_success = HealthcheckRouter(
Probe(
name="liveness",
checks=[],
),
Probe(
name="readiness",
checks=get_readiness_checks_success(),
),
Probe(
name="startup",
checks=[],
),
options=build_probe_route_options(debug=True, prefix="/health"),
)
@asynccontextmanager
async def lifespan_success(_app: FastAPI) -> AsyncIterator[None]:
"""Lifespan for success app: close healthcheck router on shutdown."""
yield
await router_success.close()
app_success = FastAPI(lifespan=lifespan_success)
app_success.include_router(router_success)
router_fail = HealthcheckRouter(
Probe(
name="liveness",
checks=[],
),
Probe(
name="readiness",
checks=get_readiness_checks_fail(),
),
Probe(
name="startup",
checks=[],
),
options=build_probe_route_options(debug=True, prefix="/health"),
)
@asynccontextmanager
async def lifespan_fail(_app: FastAPI) -> AsyncIterator[None]:
"""Lifespan for fail app: close healthcheck router on shutdown."""
yield
await router_fail.close()
app_fail = FastAPI(lifespan=lifespan_fail)
app_fail.include_router(router_fail)
router_custom = HealthcheckRouter(
Probe(
name="liveness",
checks=[],
summary="Check if the application is alive",
),
Probe(
name="readiness",
checks=get_readiness_checks_success(),
summary="Check if the application is ready",
),
Probe(
name="startup",
checks=[],
summary="Check if the application is starting up",
),
options=build_probe_route_options(
success_handler=custom_handler,
failure_handler=custom_handler,
success_status=status.HTTP_200_OK,
failure_status=status.HTTP_503_SERVICE_UNAVAILABLE,
debug=True,
prefix="/custom_health",
),
)
@asynccontextmanager
async def lifespan_custom(_app: FastAPI) -> AsyncIterator[None]:
"""Lifespan for custom app: close healthcheck router on shutdown."""
yield
await router_custom.close()
app_custom = FastAPI(lifespan=lifespan_custom)
app_custom.include_router(router_custom)
"""Example app for fast-healthchecks."""
import os
from http import HTTPStatus
from faststream.asgi import AsgiFastStream
from faststream.kafka import KafkaBroker
from examples.probes import (
LIVENESS_CHECKS,
READINESS_CHECKS,
READINESS_CHECKS_FAIL,
READINESS_CHECKS_SUCCESS,
STARTUP_CHECKS,
custom_handler,
)
from fast_healthchecks.integrations.base import Probe, build_probe_route_options
from fast_healthchecks.integrations.faststream import health, healthcheck_shutdown
broker = KafkaBroker(
os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9094,localhost:9095").split(","),
)
_probes_integration = (
Probe(name="liveness", checks=LIVENESS_CHECKS),
Probe(name="readiness", checks=READINESS_CHECKS),
Probe(name="startup", checks=STARTUP_CHECKS),
)
app_integration = AsgiFastStream(
broker,
asgi_routes=[
*health(
*_probes_integration,
options=build_probe_route_options(debug=False, prefix="/health"),
),
],
on_shutdown=[healthcheck_shutdown(_probes_integration)],
)
_probes_success = (
Probe(name="liveness", checks=[]),
Probe(name="readiness", checks=READINESS_CHECKS_SUCCESS),
Probe(name="startup", checks=[]),
)
app_success = AsgiFastStream(
broker,
asgi_routes=[
*health(
*_probes_success,
options=build_probe_route_options(debug=False, prefix="/health"),
),
],
on_shutdown=[healthcheck_shutdown(_probes_success)],
)
_probes_fail = (
Probe(name="liveness", checks=[]),
Probe(name="readiness", checks=READINESS_CHECKS_FAIL),
Probe(name="startup", checks=[]),
)
app_fail = AsgiFastStream(
broker,
asgi_routes=[
*health(
*_probes_fail,
options=build_probe_route_options(debug=False, prefix="/health"),
),
],
on_shutdown=[healthcheck_shutdown(_probes_fail)],
)
_probes_custom = (
Probe(
name="liveness",
checks=[],
summary="Check if the application is alive",
),
Probe(
name="readiness",
checks=READINESS_CHECKS_SUCCESS,
summary="Check if the application is ready",
),
Probe(
name="startup",
checks=[],
summary="Check if the application is starting up",
),
)
app_custom = AsgiFastStream(
broker,
asgi_routes=[
*health(
*_probes_custom,
options=build_probe_route_options(
success_handler=custom_handler,
failure_handler=custom_handler,
success_status=HTTPStatus.OK,
failure_status=HTTPStatus.SERVICE_UNAVAILABLE,
debug=True,
prefix="/custom_health",
),
),
],
on_shutdown=[healthcheck_shutdown(_probes_custom)],
)
"""Example app for fast-healthchecks."""
from litestar import Litestar
from litestar.status_codes import HTTP_200_OK, HTTP_503_SERVICE_UNAVAILABLE
from examples.probes import (
custom_handler,
get_liveness_checks,
get_readiness_checks,
get_readiness_checks_fail,
get_readiness_checks_success,
get_startup_checks,
)
from fast_healthchecks.integrations.base import Probe, build_probe_route_options
from fast_healthchecks.integrations.litestar import health, healthcheck_shutdown
_probes_integration = [
Probe(name="liveness", checks=get_liveness_checks()),
Probe(name="readiness", checks=get_readiness_checks()),
Probe(name="startup", checks=get_startup_checks()),
]
app_integration = Litestar(
route_handlers=[
*health(
*_probes_integration,
options=build_probe_route_options(debug=False, prefix="/health"),
),
],
on_shutdown=[healthcheck_shutdown(_probes_integration)],
)
app_integration.debug = True # for integration tests (verbose errors/schema)
_probes_success = [
Probe(name="liveness", checks=[]),
Probe(name="readiness", checks=get_readiness_checks_success()),
Probe(name="startup", checks=[]),
]
app_success = Litestar(
route_handlers=[
*health(
*_probes_success,
options=build_probe_route_options(debug=False, prefix="/health"),
),
],
on_shutdown=[healthcheck_shutdown(_probes_success)],
)
app_success.debug = True # for unit/integration tests (verbose errors/schema)
_probes_fail = [
Probe(name="liveness", checks=[]),
Probe(name="readiness", checks=get_readiness_checks_fail()),
Probe(name="startup", checks=[]),
]
app_fail = Litestar(
route_handlers=[
*health(
*_probes_fail,
options=build_probe_route_options(debug=False, prefix="/health"),
),
],
on_shutdown=[healthcheck_shutdown(_probes_fail)],
)
_probes_custom = [
Probe(
name="liveness",
checks=[],
summary="Check if the application is alive",
),
Probe(
name="readiness",
checks=get_readiness_checks_success(),
summary="Check if the application is ready",
),
Probe(
name="startup",
checks=[],
summary="Check if the application is starting up",
),
]
app_custom = Litestar(
route_handlers=[
*health(
*_probes_custom,
options=build_probe_route_options(
success_handler=custom_handler,
failure_handler=custom_handler,
success_status=HTTP_200_OK,
failure_status=HTTP_503_SERVICE_UNAVAILABLE,
debug=True,
prefix="/custom_health",
),
),
],
on_shutdown=[healthcheck_shutdown(_probes_custom)],
)
You can find examples for each framework here: