The FastAPI Lifespan Pattern — Clean Background Services
When you're running background services in a FastAPI app — pollers, watchers, cleanup loops — you need a clean way to start them up, keep them running, and shut them down gracefully. Here's the pattern I use in production.
The Pattern
FastAPI lifespan()
├── STARTUP (before yield)
│ ├── Initialize database pool
│ ├── Seed data if empty
│ ├── Launch background tasks via asyncio.create_task()
│ │ ├── ESPN NBA poller (schedule + score loops)
│ │ ├── ESPN MLB poller (schedule + score loops)
│ │ ├── Deposit watchers (tOWAI, OWAI)
│ │ ├── Withdrawal cleanup loop
│ │ └── Embedding backfill
│ ├── Store service references on app.state
│ └── Register tasks in _background_tasks dict
│
├── yield ← app serves requests
│
└── SHUTDOWN (after yield)
├── service.stop() on each poller
├── watcher.stop() on each deposit watcher
└── Close database pool
Everything lives in one function. No scattered on_startup / on_shutdown hooks to keep in sync.
Why This Works Well
1. Single ownership point
Every background service is created, started, and torn down in one place — the lifespan() function. You open one file, you see the full picture. Nothing is hidden in some decorator on a random module.
2. Graceful degradation via feature flags
Each service checks an env var before starting (NBA_AUTO_IMPORT_ENABLED, SOLANA_RPC_URL, etc.). If the var is unset, the service is skipped with a log message. The app still boots and serves requests — you don't need ESPN or Solana configured just to work on the frontend.
This is huge for developer experience. Clone the repo, run the server, and the API works. Background services are opt-in.
3. Error isolation
Each service launch is wrapped in try/except. If the NBA service fails to start, the MLB service and deposit watchers still run. A single broken service doesn't crash the whole app.
try:
nba_service = NBAService(db_pool)
asyncio.create_task(nba_service.start())
app.state.nba_service = nba_service
logger.info("NBA service started")
except Exception as e:
logger.error(f"NBA service failed to start: {e}")
# App continues running without NBA4. Admin endpoint access via app.state
Services are stored on app.state.nba_service / app.state.mlb_service so admin endpoints can call into them (e.g., trigger a manual import) without global variables or dependency injection frameworks.
5. Cooperative shutdown
Services use a _running = False flag pattern. The lifespan calls service.stop() which flips the flag, and each service's while self._running loop exits on the next iteration. No forced cancellation or orphaned tasks.
The ESPN Service Architecture
Each sports service follows the same internal pattern:
class NBAService:
def __init__(self, db_pool):
self._espn = ESPNNBAClient()
self._running = False
async def start(self):
self._running = True
await asyncio.gather(
self._schedule_loop(),
self._score_loop(),
)
def stop(self):
self._running = FalseDual-Loop Design
The schedule and score loops have different polling intervals because they serve different purposes:
- Schedule loop (every 6 hours): Game schedules don't change often. Fetches upcoming games, creates prediction markets, and is idempotent (dedup by ESPN game ID via a
sports_importstable). - Score loop (60s live / 15min idle): Needs to be fast during live games to resolve markets promptly. Uses adaptive polling — checks for final scores and auto-settles markets.
Both loops run inside asyncio.gather() so they share the service's lifetime.
Stateless HTTP Client
The ESPN client creates a fresh httpx.AsyncClient for each request rather than holding a persistent connection:
async def get_games_for_date(self, target_date: date) -> list[NBAGame]:
async with httpx.AsyncClient(timeout=self._timeout) as client:
resp = await client.get(SCOREBOARD_URL, params={"dates": date_str})
resp.raise_for_status()
data = resp.json()
# parse and returnThis is intentional — ESPN's public API has no auth, requests are infrequent (minutes apart), and stateless clients have zero cleanup concerns. The service layer owns the lifecycle; the HTTP client is just a function with a timeout.
Parallel Multi-Date Fetching
For bulk operations (admin import ranges, metadata resync), the client parallelizes:
async def get_games_for_dates(self, dates: list[date]) -> list[NBAGame]:
results = await asyncio.gather(
*[self.get_games_for_date(d) for d in dates]
)
return [game for games in results for game in games]Staggered Startup
Services that hit the same external API (Solana RPC) are staggered to avoid rate limits:
# tOWAI watcher starts immediately
asyncio.create_task(towai_watcher.start())
# OWAI watcher starts 5s later
async def _start_owai_delayed():
await asyncio.sleep(5)
await owai_watcher.start()
asyncio.create_task(_start_owai_delayed())Simple, effective, and obvious when you read the code.
Health Observability
The _background_tasks registry tracks what's running. The /health and /debug/status endpoints report on it, so you can see at a glance which services are active in production vs. skipped.
Adding a New Background Service
The checklist for adding a new service to this pattern:
- Create a service class with
start()/stop()and a_runningflag - Add an env var gate in
lifespan() - Wrap the launch in
try/except - Store on
app.stateif admin endpoints need access - Register in
_background_tasksfor health checks - Call
stop()in the shutdown block (afteryield)
Six steps. All in one file. No framework magic.
The Takeaway
The lifespan pattern gives you a single, readable orchestration point for all your background services. Feature flags make services opt-in. Error isolation means one failure doesn't cascade. Cooperative shutdown means no orphaned tasks.
It's not clever. It's not abstract. It's just a generator function with startup before the yield and cleanup after it. And that simplicity is exactly why it works.
Get vibe coding tips in your inbox
Build logs, tool reviews, and the prompts that actually work. No spam.