Services
The business logic layer of your application. Services sit between your API endpoints and the repository, handling validation, lifecycle hooks, transactions, and complex business rules — keeping your code organized and testable.
Full CRUD with type-safe generics — sync and async.
before/after create, update, delete — sync and async.
Custom business rule validation before any operation.
Auto-convert model instances to Pydantic schemas.
Introduction
In FastKit Core's architecture, services sit between API endpoints and the repository layer. They are responsible for business logic — not for HTTP responses, not for database queries. This separation makes your code testable, reusable, and maintainable.
Services are reusable — the same UserService can be called from an API endpoint, a CLI
command, a background job, or a test. Zero coupling to the HTTP layer.
Quick Start
from fastkit_core.services import BaseCrudService
from fastkit_core.database import Repository
from models import User
from schemas import UserCreate, UserUpdate, UserResponse
class UserService(BaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
def __init__(self, repository: Repository):
super().__init__(repository, response_schema=UserResponse)
def validate_create(self, data: UserCreate) -> None:
if self.exists(email=data.email):
raise ValueError("Email already exists")
def before_create(self, data: dict) -> dict:
data['password'] = hash_password(data['password'])
return data
def after_create(self, instance: User) -> None:
send_welcome_email(instance.email)
from fastkit_core.services import AsyncBaseCrudService
from fastkit_core.database import AsyncRepository
class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
def __init__(self, repository: AsyncRepository):
super().__init__(repository, response_schema=UserResponse)
async def validate_create(self, data: UserCreate) -> None:
if await self.exists(email=data.email):
raise ValueError("Email already exists")
async def before_create(self, data: dict) -> dict:
data['password'] = await hash_password_async(data['password'])
return data
async def after_create(self, instance: User) -> None:
await send_welcome_email_async(instance.email)
from fastapi import Depends
from fastkit_core.database import get_db, Repository
from fastkit_core.http import success_response
from sqlalchemy.orm import Session
def get_user_service(session: Session = Depends(get_db)) -> UserService:
return UserService(Repository(User, session))
@app.post("/users", status_code=201)
def create_user(data: UserCreate, svc: UserService = Depends(get_user_service)):
user = svc.create(data)
return success_response(data=user.model_dump(), message="User created")
@app.get("/users")
def list_users(page: int = 1, svc: UserService = Depends(get_user_service)):
users, meta = svc.paginate(page=page, per_page=20)
return {'items': [u.model_dump() for u in users], 'pagination': meta}
BaseCrudService
BaseCrudService is a generic class with four type parameters. You only override what you
need — everything else works out of the box.
Type parameters
class MyService(BaseCrudService[
ModelType, # SQLAlchemy model → e.g. User
CreateSchemaType, # Pydantic create → e.g. UserCreate
UpdateSchemaType, # Pydantic update → e.g. UserUpdate
ResponseSchemaType # Pydantic response → e.g. UserResponse
]):
pass
All available operations
# ── Create ────────────────────────────────────
product = service.create(ProductCreate(name="Widget", price=9.99))
products = service.create_many([ProductCreate(...), ProductCreate(...)])
# ── Read ──────────────────────────────────────
product = service.find(1) # None if missing
product = service.find_or_fail(1) # raises NotFoundException
products = service.get_all()
products = service.get_all(limit=50)
products = service.filter(status='active', price__lt=100)
product = service.filter_one(sku='ABC123')
exists = service.exists(sku='ABC123') # bool
count = service.count(status='active') # int
# ── Paginate ──────────────────────────────────
products, meta = service.paginate(
page=1, per_page=20,
_order_by='-created_at',
status='active'
)
# ── Update ────────────────────────────────────
updated = service.update(1, ProductUpdate(price=12.99))
count = service.update_many(filters={'status':'pending'}, data={'status':'active'})
# ── Delete ────────────────────────────────────
service.delete(1) # soft delete if SoftDeleteMixin
service.delete(1, force=True) # always hard delete
count = service.delete_many({'status': 'inactive'})
# ── Transaction control ───────────────────────
service.commit()
service.rollback()
service.flush()
Schema auto-conversion
Services accept Pydantic schemas or plain dicts — both work transparently:
# All three work identically
service.create(ProductCreate(name="Widget", price=9.99))
service.create({"name": "Widget", "price": 9.99})
# Partial updates — only set fields are sent to the database
update = ProductUpdate(price=12.99) # name is not set
service.update(1, update) # only price column updated
AsyncBaseCrudService
Identical API to BaseCrudService — just add async/await. Use
AsyncRepository instead of Repository.
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from fastkit_core.database import get_async_db, AsyncRepository
async def get_service(session: AsyncSession = Depends(get_async_db)):
return ProductService(AsyncRepository(Product, session))
@app.post("/products", status_code=201)
async def create_product(data: ProductCreate, svc = Depends(get_service)):
product = await svc.create(data)
return success_response(data=product.model_dump())
@app.get("/products")
async def list_products(page: int = 1, svc = Depends(get_service)):
products, meta = await svc.paginate(page=page, per_page=20)
return {'items': [p.model_dump() for p in products], 'pagination': meta}
Response Schema Mapping
When you pass response_schema to super().__init__(), every service method
automatically converts model instances to your Pydantic schema before returning.
class UserResponse(BaseModel):
id: int
email: str
name: str
# password is NOT included — secure by design
model_config = {"from_attributes": True}
class UserService(BaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
def __init__(self, repository: Repository):
super().__init__(repository, response_schema=UserResponse)
# Every method returns UserResponse — not the raw User model
user: UserResponse = service.create(user_data)
user: UserResponse = service.find(1)
users: list[UserResponse] = service.get_all()
Any.
Lifecycle Hooks
Override hooks to execute code at specific points in an operation. before_* hooks can
modify data. after_* hooks react to results.
All available hooks
class UserService(BaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
def before_create(self, data: dict) -> dict:
data['password'] = hash_password(data['password'])
data['created_by'] = get_current_user_id()
return data # must return modified data
def after_create(self, instance: User) -> None:
send_welcome_email(instance.email)
create_user_profile(instance.id)
def before_update(self, id: int, data: dict) -> dict:
data['updated_by'] = get_current_user_id()
return data
def after_update(self, instance: User) -> None:
cache.delete(f'user:{instance.id}')
notify_watchers(instance.id)
def before_delete(self, id: int) -> None:
user = self.find_or_fail(id)
if user.has_active_subscriptions():
raise ValueError("Cannot delete user with active subscriptions")
def after_delete(self, id: int) -> None:
delete_user_files(id)
revoke_user_tokens(id)
Common hook use cases
Validation Hooks
Validation hooks run before the operation and before any lifecycle hooks. Raise
ValueError to abort the operation cleanly.
class ProductService(BaseCrudService[Product, ProductCreate, ProductUpdate, ProductResponse]):
def validate_create(self, data: ProductCreate) -> None:
# Uniqueness
if self.exists(sku=data.sku):
raise ValueError("Product with this SKU already exists")
# Business rules
if data.price <= 0:
raise ValueError("Price must be greater than zero")
if data.stock < 0:
raise ValueError("Stock cannot be negative")
def validate_update(self, id: int, data: ProductUpdate) -> None:
product = self.find_or_fail(id)
# Permission check
if not can_edit_product(product):
raise ValueError("You don't have permission to edit this product")
# Business rule
if data.price and data.price < product.cost:
raise ValueError("Price cannot be lower than cost")
Async validation
class ProductService(AsyncBaseCrudService[Product, ProductCreate, ProductUpdate, ProductResponse]):
async def validate_create(self, data: ProductCreate) -> None:
if await self.exists(sku=data.sku):
raise ValueError("SKU already exists")
# Call external services in validation
if not await external_inventory_check(data.sku):
raise ValueError("SKU not found in inventory system")
Catch ValueError from services in your endpoints and convert to HTTPException(status_code=400).
This keeps HTTP concerns in the API layer, business errors in the service layer.
SlugServiceMixin
Mix in SlugServiceMixin to get automatic slug generation with uniqueness checking — no
custom logic needed.
from fastkit_core.services import AsyncBaseCrudService, SlugServiceMixin
class ArticleService(
SlugServiceMixin,
AsyncBaseCrudService[Article, ArticleCreate, ArticleUpdate, ArticleResponse]
):
def __init__(self, repository: AsyncRepository):
super().__init__(repository, response_schema=ArticleResponse)
async def before_create(self, data: dict) -> dict:
# Generates unique slug, appends -1, -2 etc. if needed
data['slug'] = await self.async_generate_slug(data['title'])
return data
async def before_update(self, id: int, data: dict) -> dict:
if 'title' in data:
# exclude_id ensures current record isn't seen as a conflict
data['slug'] = await self.async_generate_slug(
data['title'],
exclude_id=id
)
return data
AsyncBaseCrudService.API Reference
| Hook | When | Signature |
|---|---|---|
validate_create |
Before create — first | (data: CreateSchema) → None |
before_create |
Before create — after validation | (data: dict) → dict |
after_create |
After create, after commit | (instance: Model) → None |
validate_update |
Before update — first | (id, data: UpdateSchema) → None |
before_update |
Before update — after validation | (id, data: dict) → dict |
after_update |
After update, after commit | (instance: Model) → None |
before_delete |
Before delete | (id) → None |
after_delete |
After delete, after commit | (id) → None |
All hooks have async
equivalents in AsyncBaseCrudService.
ResponseSchema | None.NotFoundException if missing.
_limit, _offset,
_order_by, _load_relations.
None.(list[ResponseSchema], meta_dict).bool.int.ResponseSchema.
list[ResponseSchema].ResponseSchema.
SoftDeleteMixin.
async def transfer_money(from_id, to_id, amount, svc: AccountService):
try:
from_acc = await svc.find_or_fail(from_id)
to_acc = await svc.find_or_fail(to_id)
await svc.update(from_id, {'balance': from_acc.balance - amount}, commit=False)
await svc.update(to_id, {'balance': to_acc.balance + amount}, commit=False)
await svc.commit() # commit both or neither
except Exception:
await svc.rollback()
raise
Complete Example
A full async Article service with slug generation, validation, lifecycle hooks, and a publish workflow:
# services.py
from fastkit_core.services import AsyncBaseCrudService, SlugServiceMixin
from fastkit_core.database import AsyncRepository
from models import Article
from schemas import ArticleCreate, ArticleUpdate, ArticleResponse
class ArticleService(
SlugServiceMixin,
AsyncBaseCrudService[Article, ArticleCreate, ArticleUpdate, ArticleResponse]
):
def __init__(self, repository: AsyncRepository):
super().__init__(repository, response_schema=ArticleResponse)
async def validate_create(self, data: ArticleCreate) -> None:
if not await author_exists(data.author_id):
raise ValueError("Author not found")
if not await category_exists(data.category_id):
raise ValueError("Category not found")
async def before_create(self, data: dict) -> dict:
data['slug'] = await self.async_generate_slug(data['title'])
return data
async def before_update(self, id: int, data: dict) -> dict:
if 'title' in data:
data['slug'] = await self.async_generate_slug(
data['title'], exclude_id=id
)
return data
async def after_create(self, instance: Article) -> None:
await search_index.add_async(instance)
async def after_update(self, instance: Article) -> None:
await cache.delete_async(f'article:{instance.id}')
await search_index.update_async(instance)
# Custom business method
async def publish(self, article_id: int) -> ArticleResponse:
article = await self.find_or_fail(article_id)
if article.status == 'published':
raise ValueError("Article is already published")
return await self.update(article_id, ArticleUpdate(status='published'))
# schemas.py
from pydantic import BaseModel, Field
from datetime import datetime
class ArticleCreate(BaseModel):
title: str = Field(min_length=3, max_length=200)
content: str = Field(min_length=100)
author_id: int
category_id: int
class ArticleUpdate(BaseModel):
title: str | None = None
content: str | None = None
category_id: int | None = None
status: str | None = None
class ArticleResponse(BaseModel):
id: int
title: str
slug: str
content: str
author_id: int
category_id: int
status: str
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
# main.py
from fastapi import FastAPI, Depends, HTTPException
from fastkit_core.database import get_async_db, AsyncRepository
from fastkit_core.http import success_response
app = FastAPI()
async def get_svc(session = Depends(get_async_db)) -> ArticleService:
return ArticleService(AsyncRepository(Article, session))
@app.post("/articles", status_code=201)
async def create_article(data: ArticleCreate, svc = Depends(get_svc)):
try:
article = await svc.create(data)
return success_response(data=article.model_dump(), message="Created")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/articles")
async def list_articles(page: int = 1, status: str = 'published', svc = Depends(get_svc)):
articles, meta = await svc.paginate(page=page, per_page=20, status=status)
return {'items': [a.model_dump() for a in articles], 'pagination': meta}
@app.get("/articles/{article_id}")
async def get_article(article_id: int, svc = Depends(get_svc)):
return success_response(data=(await svc.find_or_fail(article_id)).model_dump())
@app.put("/articles/{article_id}")
async def update_article(article_id: int, data: ArticleUpdate, svc = Depends(get_svc)):
try:
updated = await svc.update(article_id, data)
return success_response(data=updated.model_dump(), message="Updated")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/articles/{article_id}/publish")
async def publish_article(article_id: int, svc = Depends(get_svc)):
try:
published = await svc.publish(article_id)
return success_response(data=published.model_dump(), message="Published")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.delete("/articles/{article_id}", status_code=204)
async def delete_article(article_id: int, svc = Depends(get_svc)):
await svc.delete(article_id)