Caching
A unified, backend-agnostic caching layer with zero required dependencies. InMemoryBackend works out of the box — swap to RedisBackend in production without changing a line of application code.
InMemory for dev & tests. Redis for production. Same API, swappable via config.
Per-entry and global default TTL on all backends.
Cache async function results with a single line. Static or dynamic keys.
Wildcard * invalidation — clear entire namespaces at once.
Introduction
Caching is one of the most frequently re-implemented pieces of infrastructure in every FastAPI project. Without a standard solution, teams either skip it entirely or each implement their own wrapper. FastKit Core standardizes it with a clean, async-first API that works identically across backends.
The caching layer is built around a singleton CacheManager — initialize it once at application startup, then import the cache proxy anywhere in your code:
from fastkit_core.cache import cache
# Use anywhere after setup_cache() is called
await cache.set('users:all', users, ttl=300)
data = await cache.get('users:all')
await cache.invalidate('users:*')
Setup
Configuration
# config/cache.py
CACHE = {
'DEFAULT': {
'driver': 'memory',
'ttl': 300, # default TTL in seconds (5 minutes)
}
}
InMemoryBackend requires no extra dependencies and is perfect for development, single-process apps, and testing. Data is stored in the application process and lost on restart.
# config/cache.py
import os
CACHE = {
'DEFAULT': {
'driver': 'redis',
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', 6379)),
'db': int(os.getenv('REDIS_DB', 0)),
'ttl': 300,
}
}
Redis requires the redis package: pip install fastkit-core[redis]
Initialize at startup
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastkit_core.cache import setup_cache
from fastkit_core.config import ConfigManager
@asynccontextmanager
async def lifespan(app: FastAPI):
config = ConfigManager(modules=['cache'])
setup_cache(config) # initialize once at startup
yield
app = FastAPI(lifespan=lifespan)
After setup_cache() is called, the cache proxy is available globally. Calling cache.get() before setup raises a RuntimeError with a clear message.
Basic Usage
from fastkit_core.cache import cache
# Set a value (uses default TTL from config)
await cache.set('users:all', users_list)
# Set with explicit TTL (seconds)
await cache.set('users:all', users_list, ttl=600)
# Set without expiry
await cache.set('config:flags', feature_flags, ttl=None)
# Get a value — returns None if missing or expired
data = await cache.get('users:all')
if data is None:
data = await fetch_from_db()
await cache.set('users:all', data)
# Check existence without fetching
exists = await cache.has('users:all')
# Delete a single key
await cache.delete('users:1')
# Clear all keys
await cache.clear()
Key naming convention
Use colon-separated namespaces for keys — this makes pattern-based invalidation effective:
# ✅ Namespaced keys
'users:all'
'users:1'
'users:1:profile'
'products:category:electronics'
'session:abc123'
# Invalidate entire namespace at once
await cache.invalidate('users:*') # clears all user cache
await cache.invalidate('products:*') # clears all product cache
Backends
| Backend | Dependency | Persistence | Multi-process | Best for |
|---|---|---|---|---|
InMemoryBackend | None (default) | No — lost on restart | No — per-process | Development, single-process, tests |
RedisBackend | redis optional extra | Yes — configurable | Yes — shared | Production, multi-worker, distributed |
InMemoryBackend details
Entries are stored as (value, expires_at) tuples in a plain Python dict. Expiry is checked lazily on get() — expired entries are removed during read. Pattern invalidation uses fnmatch from the standard library, supporting * as a wildcard.
RedisBackend details
Uses redis.asyncio — fully non-blocking, does not interfere with FastAPI’s event loop. TTL is delegated to Redis natively via the ex= parameter, so expiry is handled by the Redis server with no overhead in the application layer.
Switching backends is a one-line config change — no application code changes required. This makes it straightforward to use InMemoryBackend in development and CI, and RedisBackend in production.
TTL
TTL (time-to-live) controls how long a cache entry remains valid. FastKit Core applies TTL at three levels, with the most specific taking precedence:
# Level 1: Config default — applies when no TTL is passed
CACHE = {'DEFAULT': {'driver': 'memory', 'ttl': 300}}
# Level 2: Per-call TTL — overrides config default
await cache.set('key', value, ttl=60) # expires in 60s
# Level 3: No expiry — pass ttl=None explicitly
await cache.set('key', value, ttl=None) # never expires
# @cached decorator — always specify TTL explicitly
@cached(ttl=120, key='my:key')
async def expensive_function(): ...
| TTL value | Behavior |
|---|---|
ttl=300 | Expires in 300 seconds (5 minutes) |
ttl=None (call-site) | Falls back to config default TTL |
Config ttl: None | Entries never expire by default |
Invalidation
Cache invalidation — knowing when to clear data — is the hardest part of caching. FastKit Core supports two strategies:
Exact key deletion
# Delete a specific entry
await cache.delete('users:1')
await cache.delete(f'users:{user_id}')
Pattern-based invalidation
Deletes all keys matching a pattern. Supports * as a wildcard:
# Clear everything in the users namespace
await cache.invalidate('users:*')
# Clear all product categories
await cache.invalidate('products:category:*')
# Clear everything (same as clear())
await cache.invalidate('*')
Invalidation in write operations
The most common pattern — invalidate related cache after a write:
class UserService(AsyncBaseCrudService[...]):
async def after_create(self, instance: User) -> None:
# New user invalidates the list cache
await cache.invalidate('users:*')
async def after_update(self, instance: User) -> None:
# Update invalidates both the individual entry and lists
await cache.delete(f'users:{instance.id}')
await cache.invalidate('users:list:*')
async def after_delete(self, id: int) -> None:
await cache.delete(f'users:{id}')
await cache.invalidate('users:*')
@cached Decorator
Cache the return value of an async function with a single decorator. On cache hit the function body is never executed — the cached result is returned directly.
Static key
from fastkit_core.cache import cached
@cached(ttl=300, key='users:all')
async def get_all_users() -> list[UserResponse]:
# Only called on cache miss — result cached for 300s
return await user_service.get_all()
Dynamic key with lambda
For functions with arguments, use a lambda that receives the same arguments as the function:
@cached(ttl=60, key=lambda user_id: f'user:{user_id}')
async def get_user(user_id: int) -> UserResponse:
return await user_service.find_or_fail(user_id)
# Different users get independent cache entries
await get_user(1) # cache key: 'user:1'
await get_user(42) # cache key: 'user:42'
# Multiple arguments
@cached(ttl=120, key=lambda uid, org: f'user:{uid}:org:{org}')
async def get_user_for_org(uid: int, org: int) -> UserResponse:
...
How it works
# Equivalent manual implementation
async def get_user(user_id: int) -> UserResponse:
cache_key = f'user:{user_id}'
cached_value = await cache.get(cache_key)
if cached_value is not None:
return cached_value # cache hit — DB not touched
result = await user_service.find_or_fail(user_id)
await cache.set(cache_key, result, ttl=60)
return result
@cached only works on async functions — applying it to a sync function raises TypeError immediately at decoration time, before any request is processed.
Service Layer Pattern
The recommended pattern — cache in the service layer, invalidate in lifecycle hooks. This keeps caching logic co-located with business logic and out of route handlers.
from fastkit_core.cache import cache
class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
async def get_all_active(self) -> list[UserResponse]:
cache_key = 'users:active'
cached = await cache.get(cache_key)
if cached is not None:
return cached
users = await self.filter(status='active', _order_by='-created_at')
await cache.set(cache_key, users, ttl=300)
return users
async def find_by_id(self, user_id: int) -> UserResponse:
cache_key = f'users:{user_id}'
cached = await cache.get(cache_key)
if cached is not None:
return cached
user = await self.find_or_fail(user_id)
await cache.set(cache_key, user, ttl=600)
return user
from fastkit_core.cache import cached
class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
@cached(ttl=300, key='users:active')
async def get_all_active(self) -> list[UserResponse]:
return await self.filter(status='active', _order_by='-created_at')
@cached(ttl=600, key=lambda self, user_id: f'users:{user_id}')
async def find_by_id(self, user_id: int) -> UserResponse:
return await self.find_or_fail(user_id)
from fastkit_core.cache import cache
class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
async def after_create(self, instance: User) -> None:
# New user invalidates list caches
await cache.invalidate('users:*')
async def after_update(self, instance: User) -> None:
# Update invalidates the specific user entry and all lists
await cache.delete(f'users:{instance.id}')
await cache.invalidate('users:active')
async def after_delete(self, id: int) -> None:
await cache.delete(f'users:{id}')
await cache.invalidate('users:*')
Custom Backend
Extend AbstractCacheBackend to implement a custom backend — for example, a file-system cache, Memcached, or a mock for testing:
from fastkit_core.cache import AbstractCacheBackend
from typing import Any
class MockCacheBackend(AbstractCacheBackend):
"""In-memory backend that records all operations — useful in tests."""
def __init__(self):
self._store: dict = {}
self.calls: list[tuple] = []
async def get(self, key: str) -> Any | None:
self.calls.append(('get', key))
return self._store.get(key)
async def set(self, key: str, data: Any, ttl: int | None = None) -> None:
self.calls.append(('set', key, ttl))
self._store[key] = data
async def delete(self, key: str) -> None:
self._store.pop(key, None)
async def invalidate(self, pattern: str) -> None:
import fnmatch
for k in list(self._store):
if fnmatch.fnmatch(k, pattern):
del self._store[k]
async def has(self, key: str) -> bool:
return key in self._store
async def clear(self) -> None:
self._store.clear()
self.calls.clear()
To use a custom backend, bypass setup_cache() and inject directly:
from fastkit_core.cache import CacheManager, get_cache
import fastkit_core.cache.manager as cache_module
# In tests — inject mock backend directly
mock_backend = MockCacheBackend()
manager = CacheManager.__new__(CacheManager)
manager._backend_instance = mock_backend
cache_module._cache_instance = manager
API Reference
Setup functions
ConfigManager. Call once at application startup.CacheManager singleton. Raises RuntimeError if called before setup_cache().get_cache(). Import and use directly without explicit get_cache() calls.CacheManager / AbstractCacheBackend
async — returns cached value or None if missing or expired.async — store value. ttl in seconds, None uses config default.async — delete a single exact key. No error if key doesn’t exist.async — delete all keys matching the pattern. Supports * wildcard.async — returns bool. Respects TTL — expired entries return False.async — remove all keys from the cache.@cached decorator
int — required. Cache lifetime in seconds.str | Callable[..., str] — required. Static string or lambda receiving the same arguments as the decorated function.Backends
get(). default_ttl=None means no expiry.pip install fastkit-core[redis].Complete Example
A full user API with caching across all layers:
# config/cache.py
import os
CACHE = {
'DEFAULT': {
'driver': os.getenv('CACHE_DRIVER', 'memory'), # 'redis' in production
'ttl': 300,
# Redis — only used when driver='redis'
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', 6379)),
'db': int(os.getenv('REDIS_DB', 0)),
}
}
# services.py
from fastkit_core.services import AsyncBaseCrudService
from fastkit_core.database import AsyncRepository
from fastkit_core.cache import cache, cached
from models import User
from schemas import UserCreate, UserUpdate, UserResponse
class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
def __init__(self, repository: AsyncRepository):
super().__init__(repository, response_schema=UserResponse)
# ── Cached reads ──────────────────────────────────────────────────────
@cached(ttl=300, key='users:all')
async def get_all_users(self) -> list[UserResponse]:
return await self.get_all(_order_by='-created_at')
@cached(ttl=600, key=lambda self, user_id: f'users:{user_id}')
async def get_user(self, user_id: int) -> UserResponse:
return await self.find_or_fail(user_id)
@cached(ttl=120, key=lambda self, status: f'users:status:{status}')
async def get_by_status(self, status: str) -> list[UserResponse]:
return await self.filter(status=status, _order_by='name')
# ── Invalidation hooks ────────────────────────────────────────────────
async def after_create(self, instance: User) -> None:
await cache.invalidate('users:*')
async def after_update(self, instance: User) -> None:
await cache.delete(f'users:{instance.id}')
await cache.invalidate('users:all')
await cache.invalidate(f'users:status:{instance.status}')
async def after_delete(self, id: int) -> None:
await cache.delete(f'users:{id}')
await cache.invalidate('users:*')
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from fastkit_core.cache import setup_cache
from fastkit_core.database import get_async_db, AsyncRepository
from fastkit_core.config import ConfigManager
from fastkit_core.http import success_response, register_exception_handlers
@asynccontextmanager
async def lifespan(app: FastAPI):
config = ConfigManager(modules=['cache', 'database'])
setup_cache(config)
yield
app = FastAPI(lifespan=lifespan)
register_exception_handlers(app)
async def get_service(session: AsyncSession = Depends(get_async_db)) -> UserService:
return UserService(AsyncRepository(User, session))
@app.get("/users")
async def list_users(svc: UserService = Depends(get_service)):
users = await svc.get_all_users() # cached
return success_response(data=[u.model_dump() for u in users])
@app.get("/users/{user_id}")
async def get_user(user_id: int, svc: UserService = Depends(get_service)):
user = await svc.get_user(user_id) # cached per user_id
return success_response(data=user.model_dump())
@app.post("/users", status_code=201)
async def create_user(data: UserCreate, svc: UserService = Depends(get_service)):
user = await svc.create(data) # after_create invalidates cache
return success_response(data=user.model_dump(), message="User created")
@app.put("/users/{user_id}")
async def update_user(user_id: int, data: UserUpdate, svc: UserService = Depends(get_service)):
user = await svc.update(user_id, data) # after_update invalidates cache
return success_response(data=user.model_dump(), message="User updated")
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int, svc: UserService = Depends(get_service)):
await svc.delete(user_id) # after_delete invalidates cache