Database
Built on SQLAlchemy 2.0+ — base models, rich mixins, Django-style filtering, repository pattern, async support, read replicas, and the unique TranslatableMixin for multi-language models.
Introduction
FastKit Core's database module provides a powerful, production-ready foundation for working with databases. It adds useful patterns and features while staying close to SQLAlchemy's flexibility — you're never locked into an abstraction you can't escape.
IntId, UUID, Timestamps, SoftDelete, Slug, Publishable — compose what you need.
Django-style filters, pagination, eager loading, transactions.
Full async support with complete API parity.
Built-in multi-language model support — unique to FastKit.
Automatic read/write splitting for horizontal scaling.
PostgreSQL, MySQL, MariaDB, MSSQL, Oracle, SQLite.
Quick Start
Define a model
from fastkit_core.database import Base, IntIdMixin, TimestampMixin, SoftDeleteMixin
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String
class User(Base, IntIdMixin, TimestampMixin, SoftDeleteMixin):
# __tablename__ is auto-generated as 'users'
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
full_name: Mapped[str] = mapped_column(String(200))
Initialize the database
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastkit_core.database import init_database, shutdown_database, get_db_manager
from fastkit_core.config import ConfigManager
@asynccontextmanager
async def lifespan(app: FastAPI):
config = ConfigManager()
init_database(config)
Base.metadata.create_all(get_db_manager().engine)
yield
shutdown_database()
app = FastAPI(lifespan=lifespan)
Use in endpoints
from fastapi import Depends
from fastkit_core.database import Repository, get_db
from sqlalchemy.orm import Session
@app.get("/users")
def list_users(session: Session = Depends(get_db)):
repo = Repository(User, session)
return [u.to_dict() for u in repo.get_all(limit=20)]
Base Models
Base
The foundation for all models. Provides auto table name generation and serialization helpers.
from fastkit_core.database import Base
class Product(Base):
name: Mapped[str] = mapped_column(String(200))
price: Mapped[float]
# Auto table names:
# User → users
# UserProfile → user_profiles
# Category → categories
# Serialization
product.to_dict() # {'id':1, 'name':'Widget', 'price':9.99}
product.to_dict(exclude=['price']) # {'id':1, 'name':'Widget'}
product.to_dict(include_relationships=True) # includes related objects
product.update_from_dict({'name': 'New'})
product.update_from_dict(data, allow_only=['name', 'price']) # safe update
BaseWithTimestamps
Convenience base that bundles Base + TimestampMixin:
from fastkit_core.database import BaseWithTimestamps
class Article(BaseWithTimestamps):
title: Mapped[str]
content: Mapped[str]
# Automatically has: id, created_at, updated_at
Mixins
Compose your models from focused mixins. Each adds a single responsibility — use only what you need.
| Mixin | Adds | Use when |
|---|---|---|
IntIdMixin | Auto-increment integer id | Most models — simple and efficient |
UUIDMixin | UUID v4 id | Distributed systems, public-facing IDs |
TimestampMixin | created_at, updated_at | Almost always — track record age |
SoftDeleteMixin | deleted_at, is_deleted | Need to recover deleted records |
SlugMixin | slug field (unique, indexed) | URLs, human-readable identifiers |
PublishableMixin | published_at, publish workflow | Content with draft/publish states |
TranslatableMixin | Multi-language fields | i18n content — see dedicated section |
from fastkit_core.database import Base, IntIdMixin
class User(Base, IntIdMixin):
name: Mapped[str]
# Adds: id: Mapped[int] — auto-incrementing primary key
from fastkit_core.database import Base, UUIDMixin
class User(Base, UUIDMixin):
name: Mapped[str]
# Adds: id: Mapped[UUID] — UUID v4, auto-generated
user = User(name="Alice")
print(user.id) # UUID('550e8400-e29b-41d4-a716-446655440000')
Use UUIDMixin for distributed systems, public-facing IDs (non-sequential = harder to guess), or when merging databases across regions.
from fastkit_core.database import Base, IntIdMixin, TimestampMixin
class Post(Base, IntIdMixin, TimestampMixin):
title: Mapped[str]
# Adds: created_at, updated_at — both managed automatically
post = Post(title="Hello")
session.commit()
print(post.created_at) # 2025-01-10 10:30:00
print(post.updated_at) # 2025-01-10 10:30:00
post.title = "Hello World"
session.commit()
print(post.updated_at) # 2025-01-10 10:35:00 ← auto-updated!
from fastkit_core.database import Base, IntIdMixin, SoftDeleteMixin
class Post(Base, IntIdMixin, SoftDeleteMixin):
title: Mapped[str]
# Adds: deleted_at: Mapped[datetime | None] — None means active
# Soft delete (marks, doesn't remove)
post.soft_delete()
print(post.is_deleted) # True
print(post.deleted_at) # 2025-01-10 10:40:00
# Restore
post.restore()
print(post.is_deleted) # False
# Query scopes
Post.active(session).all() # only non-deleted
Post.deleted(session).all() # only deleted
Post.with_deleted(session).all() # all including deleted
from fastkit_core.database import Base, IntIdMixin, PublishableMixin
from datetime import datetime, timedelta, timezone
class Article(Base, IntIdMixin, PublishableMixin):
title: Mapped[str]
# Adds: published_at: Mapped[datetime | None]
article = Article(title="News")
print(article.is_draft) # True
print(article.is_published) # False
article.publish() # publish now
article.unpublish() # back to draft
future = datetime.now(timezone.utc) + timedelta(days=7)
article.schedule(future) # schedule for future
print(article.is_scheduled) # True
# Query scopes
Article.published(session).all() # published only
Article.drafts(session).all() # drafts only
Article.scheduled(session).all() # scheduled only
Session Management
Configuration
# config/database.py
CONNECTIONS = {
'default': {
'driver': 'postgresql',
'host': 'localhost',
'port': 5432,
'database': 'myapp',
'username': 'user',
'password': 'secret',
'pool_size': 5,
'max_overflow': 10,
},
# Read replica — auto-used by get_read_db / read_session()
'read_replica_1': {
'driver': 'postgresql',
'host': 'replica1.example.com',
'database': 'myapp',
'username': 'readonly',
'password': 'secret',
},
# Direct URL
'analytics': {
'url': 'postgresql://user:pass@analytics-db:5432/analytics'
},
# SQLite
'local': {
'driver': 'sqlite',
'database': 'dev.db'
}
}
Init with read replicas
init_database(
config,
connection_name='default',
read_replicas=['read_replica_1', 'read_replica_2']
)
Using sessions
from fastkit_core.database import get_db_manager
db = get_db_manager()
# Write — auto-commits on success, rolls back on error
with db.session() as session:
session.add(User(name="John"))
# Read — routes to replica if configured
with db.read_session() as session:
users = session.query(User).all()
from fastkit_core.database import get_db, get_read_db
# Write endpoint — primary db
@app.post("/users")
def create_user(data: dict, session: Session = Depends(get_db)):
user = User(**data)
session.add(user)
session.commit()
return user.to_dict()
# Read endpoint — replica if configured
@app.get("/users")
def list_users(session: Session = Depends(get_read_db)):
return session.query(User).all()
from fastkit_core.database import health_check_all
health = health_check_all()
# {
# 'default': {
# 'primary': True,
# 'read_replica': True,
# }
# }
Repository Pattern
The Repository class provides a clean abstraction over database operations — keeping SQL out of your business logic.
from fastkit_core.database import Repository
repo = Repository(User, session)
# ── Create ────────────────────────────────────
user = repo.create({'name': 'Alice', 'email': 'alice@example.com'})
users = repo.create_many([{'name': 'Bob'}, {'name': 'Carol'}])
user = repo.create({'name': 'Dave'}, commit=False) # manual commit
# ── Read ──────────────────────────────────────
user = repo.get(1) # by ID, None if missing
user = repo.get_or_404(1) # raises NotFoundException if missing
users = repo.get_all() # all records
users = repo.get_all(limit=100)
user = repo.first(email='a@b.com') # first match
exists = repo.exists(email='a@b.com')
total = repo.count(status='active')
# ── Update ────────────────────────────────────
user = repo.update(1, {'name': 'Alice Smith'})
count = repo.update_many(filters={'status': 'pending'}, data={'status': 'active'})
# ── Delete ────────────────────────────────────
repo.delete(1) # soft delete if SoftDeleteMixin
repo.delete(1, force=True) # hard delete always
count = repo.delete_many({'status': 'inactive'})
# ── Transaction control ───────────────────────
repo.commit()
repo.rollback()
repo.flush()
Filtering
Django-style filter operators using double-underscore syntax:
# Simple equality
users = repo.filter(status='active')
# Comparisons
adults = repo.filter(age__gte=18, age__lt=65)
# String patterns
users = repo.filter(email__ilike='%@gmail.com')
users = repo.filter(name__startswith='J')
users = repo.filter(name__contains='doe')
# Lists
users = repo.filter(status__in=['active', 'pending'])
users = repo.filter(role__not_in=['banned', 'deleted'])
# NULL checks
users = repo.filter(verified_at__is_null=True)
users = repo.filter(email__is_not_null=True)
# Range
products = repo.filter(price__between=(10, 100))
# Ordering, limit, offset
users = repo.filter(
status='active',
age__gte=18,
_order_by='-created_at', # '-' prefix = DESC
_limit=20,
_offset=40
)
| Operator | SQL equivalent | Example |
|---|---|---|
eq (default) | = value | status='active' |
ne | != value | status__ne='banned' |
lt, lte | <, <= | age__lt=18 |
gt, gte | >, >= | price__gte=100 |
in, not_in | IN, NOT IN | role__in=['admin','user'] |
like, ilike | LIKE, ILIKE | email__ilike='%@gmail%' |
is_null, is_not_null | IS NULL | deleted_at__is_null=True |
between | BETWEEN a AND b | price__between=(10,100) |
startswith, endswith, contains | LIKE 'x%' etc. | name__startswith='J' |
Pagination
users, meta = repo.paginate(
page=2,
per_page=20,
_order_by='-created_at',
status='active',
age__gte=18
)
print(meta)
# {
# 'page': 2,
# 'per_page': 20,
# 'total': 150,
# 'total_pages': 8,
# 'has_next': True,
# 'has_prev': True
# }
Eager Loading
Prevent N+1 query problems by loading related objects in a single query pass:
from sqlalchemy.orm import selectinload
# ❌ Without eager loading — N+1 problem
users = repo.get_all() # 1 query
for user in users:
print(user.posts) # +1 query per user!
# ✅ With eager loading — 2 queries total
users = repo.get_all(load_relations=[selectinload(User.posts)])
for user in users:
print(user.posts) # already loaded
# Multiple relationships
invoice = repo.get(
invoice_id,
load_relations=[
selectinload(Invoice.client),
selectinload(Invoice.items),
selectinload(Invoice.payments),
]
)
# Nested relationships
invoices = repo.get_all(load_relations=[
selectinload(Invoice.items)
.selectinload(InvoiceItem.product)
.selectinload(Product.category)
])
# Works with all read methods
repo.get(id, load_relations=[...])
repo.get_all(load_relations=[...])
repo.filter(_load_relations=[...], **filters)
repo.paginate(page=1, per_page=20, _load_relations=[...])
100 invoices without eager loading = 101 queries, ~5000ms. With eager loading = 2 queries, ~100ms. That's a 50× improvement.
Async Support
AsyncRepository has the same API as Repository — just add await:
from fastkit_core.database import AsyncRepository, get_async_db, get_async_read_db
from sqlalchemy.ext.asyncio import AsyncSession
# Read endpoint using replica
@app.get("/users")
async def list_users(session: AsyncSession = Depends(get_async_read_db)):
repo = AsyncRepository(User, session)
users, meta = await repo.paginate(page=1, per_page=20, status='active')
return {'items': [u.to_dict() for u in users], 'pagination': meta}
# Write endpoint
@app.post("/users")
async def create_user(data: UserCreate, session: AsyncSession = Depends(get_async_db)):
repo = AsyncRepository(User, session)
user = await repo.create(data.model_dump())
return user.to_dict()
# All methods are awaitable
user = await repo.get(1)
user = await repo.get_or_404(1)
users = await repo.get_all(limit=100)
users = await repo.filter(status='active', _order_by='-created_at')
exists = await repo.exists(email='a@b.com')
count = await repo.count(status='active')
user = await repo.update(1, {'name': 'New'})
await repo.delete(1)
TranslatableMixin
Store and retrieve multi-language content directly in your models — no separate translation tables needed.
from fastkit_core.database import Base, IntIdMixin, TranslatableMixin
from sqlalchemy import JSON
class Article(Base, IntIdMixin, TranslatableMixin):
__translatable__ = ['title', 'content'] # fields to translate
__fallback_locale__ = 'en' # fallback if locale missing
title: Mapped[dict] = mapped_column(JSON)
content: Mapped[dict] = mapped_column(JSON)
status: Mapped[str] = mapped_column(String(20), default='draft')
# Store translations
article.title = {"en": "Hello World", "es": "Hola Mundo", "fr": "Bonjour le Monde"}
# Get for a specific locale
article.set_locale('es')
print(article.title) # "Hola Mundo"
print(article.get_translation('title')) # "Hola Mundo"
# Get with fallback
article.get_translation('title', locale='de', fallback=True) # falls back to 'en'
# Serialization with locale
article.to_dict(locale='es') # all translatable fields in Spanish
# Validation
errors = article.validate_translations(required_locales=['en', 'es'])
# {'title': ['es translation missing'], 'content': []}
# Global locale (e.g. set from request header)
Article.set_global_locale('fr')
print(article.title) # "Bonjour le Monde"
Middleware integration
from fastkit_core.database import set_locale_from_request
@app.middleware("http")
async def locale_middleware(request: Request, call_next):
locale = request.headers.get('Accept-Language', 'en')[:2]
set_locale_from_request(locale)
return await call_next(request)
API Reference
Repository
None.NotFoundException if missing._limit, _offset, _order_by, _load_relations.(list, meta_dict). Meta includes total, total_pages, has_next, has_prev.None.bool.int.SoftDeleteMixin, otherwise hard delete. force=True always hard-deletes.TranslatableMixin
Complete Example
A full multi-language Article API combining mixins, repository, TranslatableMixin and locale middleware:
# models.py
from fastkit_core.database import (
Base, IntIdMixin, TimestampMixin, SoftDeleteMixin, TranslatableMixin
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy import String, JSON, ForeignKey
class User(Base, IntIdMixin, TimestampMixin):
__tablename__ = 'users'
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
articles: Mapped[list["Article"]] = relationship(back_populates="author")
class Article(Base, IntIdMixin, TimestampMixin, SoftDeleteMixin, TranslatableMixin):
__tablename__ = 'articles'
__translatable__ = ['title', 'content']
__fallback_locale__ = 'en'
title: Mapped[dict] = mapped_column(JSON)
content: Mapped[dict] = mapped_column(JSON)
status: Mapped[str] = mapped_column(String(20), default='draft')
author_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
author: Mapped["User"] = relationship(back_populates="articles")
# main.py
from fastapi import FastAPI, Depends, Request
from fastkit_core.database import (
init_database, get_db, Repository, set_locale_from_request
)
from fastkit_core.config import ConfigManager
from sqlalchemy.orm import Session, selectinload
config = ConfigManager(modules=['database'])
init_database(config)
app = FastAPI()
@app.middleware("http")
async def locale_middleware(request: Request, call_next):
locale = request.headers.get('Accept-Language', 'en')[:2]
set_locale_from_request(locale)
return await call_next(request)
def get_repo(session: Session = Depends(get_db)) -> Repository:
return Repository(Article, session)
@app.post("/articles", status_code=201)
def create_article(data: dict, repo = Depends(get_repo)):
return repo.create(data).to_dict()
@app.get("/articles")
def list_articles(page: int = 1, repo = Depends(get_repo)):
articles, meta = repo.paginate(
page=page, per_page=20,
status='published',
_order_by='-created_at',
_load_relations=[selectinload(Article.author)]
)
return {'items': [a.to_dict() for a in articles], 'pagination': meta}
@app.get("/articles/{article_id}")
def get_article(article_id: int, repo = Depends(get_repo)):
article = repo.get_or_404(
article_id,
load_relations=[selectinload(Article.author)]
)
return article.to_dict(include_relationships=True)
@app.delete("/articles/{article_id}", status_code=204)
def delete_article(article_id: int, repo = Depends(get_repo)):
repo.delete(article_id) # soft delete