Docs / fastkit-core / Services

Services

The business logic layer of your application. Services sit between your API endpoints and the repository, handling validation, lifecycle hooks, transactions, and complex business rules — keeping your code organized and testable.

BaseCrudService

Full CRUD with type-safe generics — sync and async.

Lifecycle Hooks

before/after create, update, delete — sync and async.

Validation Hooks

Custom business rule validation before any operation.

Response Mapping

Auto-convert model instances to Pydantic schemas.

Introduction

In FastKit Core's architecture, services sit between API endpoints and the repository layer. They are responsible for business logic — not for HTTP responses, not for database queries. This separation makes your code testable, reusable, and maintainable.

API Endpoint
HTTP layer
Service
Business logic
Repository
Data access
Database
SQLAlchemy

Services are reusable — the same UserService can be called from an API endpoint, a CLI command, a background job, or a test. Zero coupling to the HTTP layer.

Quick Start

python
from fastkit_core.services import BaseCrudService
from fastkit_core.database import Repository
from models import User
from schemas import UserCreate, UserUpdate, UserResponse

class UserService(BaseCrudService[User, UserCreate, UserUpdate, UserResponse]):

    def __init__(self, repository: Repository):
        super().__init__(repository, response_schema=UserResponse)

    def validate_create(self, data: UserCreate) -> None:
        if self.exists(email=data.email):
            raise ValueError("Email already exists")

    def before_create(self, data: dict) -> dict:
        data['password'] = hash_password(data['password'])
        return data

    def after_create(self, instance: User) -> None:
        send_welcome_email(instance.email)
python
from fastkit_core.services import AsyncBaseCrudService
from fastkit_core.database import AsyncRepository

class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):

    def __init__(self, repository: AsyncRepository):
        super().__init__(repository, response_schema=UserResponse)

    async def validate_create(self, data: UserCreate) -> None:
        if await self.exists(email=data.email):
            raise ValueError("Email already exists")

    async def before_create(self, data: dict) -> dict:
        data['password'] = await hash_password_async(data['password'])
        return data

    async def after_create(self, instance: User) -> None:
        await send_welcome_email_async(instance.email)
python
from fastapi import Depends
from fastkit_core.database import get_db, Repository
from fastkit_core.http import success_response
from sqlalchemy.orm import Session

def get_user_service(session: Session = Depends(get_db)) -> UserService:
    return UserService(Repository(User, session))

@app.post("/users", status_code=201)
def create_user(data: UserCreate, svc: UserService = Depends(get_user_service)):
    user = svc.create(data)
    return success_response(data=user.model_dump(), message="User created")

@app.get("/users")
def list_users(page: int = 1, svc: UserService = Depends(get_user_service)):
    users, meta = svc.paginate(page=page, per_page=20)
    return {'items': [u.model_dump() for u in users], 'pagination': meta}

BaseCrudService

BaseCrudService is a generic class with four type parameters. You only override what you need — everything else works out of the box.

Type parameters

python
class MyService(BaseCrudService[
    ModelType,          # SQLAlchemy model     → e.g. User
    CreateSchemaType,   # Pydantic create       → e.g. UserCreate
    UpdateSchemaType,   # Pydantic update       → e.g. UserUpdate
    ResponseSchemaType  # Pydantic response     → e.g. UserResponse
]):
    pass

All available operations

python
# ── Create ────────────────────────────────────
product = service.create(ProductCreate(name="Widget", price=9.99))
products = service.create_many([ProductCreate(...), ProductCreate(...)])

# ── Read ──────────────────────────────────────
product  = service.find(1)                      # None if missing
product  = service.find_or_fail(1)              # raises NotFoundException
products = service.get_all()
products = service.get_all(limit=50)
products = service.filter(status='active', price__lt=100)
product  = service.filter_one(sku='ABC123')
exists   = service.exists(sku='ABC123')         # bool
count    = service.count(status='active')       # int

# ── Paginate ──────────────────────────────────
products, meta = service.paginate(
    page=1, per_page=20,
    _order_by='-created_at',
    status='active'
)

# ── Update ────────────────────────────────────
updated = service.update(1, ProductUpdate(price=12.99))
count   = service.update_many(filters={'status':'pending'}, data={'status':'active'})

# ── Delete ────────────────────────────────────
service.delete(1)                  # soft delete if SoftDeleteMixin
service.delete(1, force=True)      # always hard delete
count = service.delete_many({'status': 'inactive'})

# ── Transaction control ───────────────────────
service.commit()
service.rollback()
service.flush()

Schema auto-conversion

Services accept Pydantic schemas or plain dicts — both work transparently:

python
# All three work identically
service.create(ProductCreate(name="Widget", price=9.99))
service.create({"name": "Widget", "price": 9.99})

# Partial updates — only set fields are sent to the database
update = ProductUpdate(price=12.99)  # name is not set
service.update(1, update)            # only price column updated

AsyncBaseCrudService

Identical API to BaseCrudService — just add async/await. Use AsyncRepository instead of Repository.

python
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from fastkit_core.database import get_async_db, AsyncRepository

async def get_service(session: AsyncSession = Depends(get_async_db)):
    return ProductService(AsyncRepository(Product, session))

@app.post("/products", status_code=201)
async def create_product(data: ProductCreate, svc = Depends(get_service)):
    product = await svc.create(data)
    return success_response(data=product.model_dump())

@app.get("/products")
async def list_products(page: int = 1, svc = Depends(get_service)):
    products, meta = await svc.paginate(page=page, per_page=20)
    return {'items': [p.model_dump() for p in products], 'pagination': meta}

Response Schema Mapping

When you pass response_schema to super().__init__(), every service method automatically converts model instances to your Pydantic schema before returning.

python
class UserResponse(BaseModel):
    id:    int
    email: str
    name:  str
    # password is NOT included — secure by design
    model_config = {"from_attributes": True}

class UserService(BaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
    def __init__(self, repository: Repository):
        super().__init__(repository, response_schema=UserResponse)

# Every method returns UserResponse — not the raw User model
user: UserResponse = service.create(user_data)
user: UserResponse = service.find(1)
users: list[UserResponse] = service.get_all()
Security
Sensitive fields like passwords or tokens are excluded automatically — they never reach the endpoint.
Consistency
Every endpoint using the same service returns the same response shape.
Type safety
IDEs and mypy know the exact return type — no more Any.
OpenAPI docs
FastAPI generates accurate response schemas automatically.

Lifecycle Hooks

Override hooks to execute code at specific points in an operation. before_* hooks can modify data. after_* hooks react to results.

validate_create(data)
before_create(data) → modified data
repository.create(data)
after_create(instance)
return ResponseSchema

All available hooks

python
class UserService(BaseCrudService[User, UserCreate, UserUpdate, UserResponse]):

    def before_create(self, data: dict) -> dict:
        data['password']   = hash_password(data['password'])
        data['created_by'] = get_current_user_id()
        return data  # must return modified data

    def after_create(self, instance: User) -> None:
        send_welcome_email(instance.email)
        create_user_profile(instance.id)

    def before_update(self, id: int, data: dict) -> dict:
        data['updated_by'] = get_current_user_id()
        return data

    def after_update(self, instance: User) -> None:
        cache.delete(f'user:{instance.id}')
        notify_watchers(instance.id)

    def before_delete(self, id: int) -> None:
        user = self.find_or_fail(id)
        if user.has_active_subscriptions():
            raise ValueError("Cannot delete user with active subscriptions")

    def after_delete(self, id: int) -> None:
        delete_user_files(id)
        revoke_user_tokens(id)

Common hook use cases

before_create
Hash passwords · Set defaults · Add audit fields · Generate slugs / tokens
after_create
Send emails · Create related records · Update search index · Log activity
before_update
Update timestamps · Validate ownership · Regenerate slug on title change
after_update
Clear caches · Send notifications · Sync external systems
before_delete
Check dependencies · Validate business rules · Archive data
after_delete
Delete related records · Clean up files · Revoke tokens

Validation Hooks

Validation hooks run before the operation and before any lifecycle hooks. Raise ValueError to abort the operation cleanly.

python
class ProductService(BaseCrudService[Product, ProductCreate, ProductUpdate, ProductResponse]):

    def validate_create(self, data: ProductCreate) -> None:
        # Uniqueness
        if self.exists(sku=data.sku):
            raise ValueError("Product with this SKU already exists")

        # Business rules
        if data.price <= 0:
            raise ValueError("Price must be greater than zero")

        if data.stock < 0:
            raise ValueError("Stock cannot be negative")

    def validate_update(self, id: int, data: ProductUpdate) -> None:
        product = self.find_or_fail(id)

        # Permission check
        if not can_edit_product(product):
            raise ValueError("You don't have permission to edit this product")

        # Business rule
        if data.price and data.price < product.cost:
            raise ValueError("Price cannot be lower than cost")

Async validation

python
class ProductService(AsyncBaseCrudService[Product, ProductCreate, ProductUpdate, ProductResponse]):

    async def validate_create(self, data: ProductCreate) -> None:
        if await self.exists(sku=data.sku):
            raise ValueError("SKU already exists")

        # Call external services in validation
        if not await external_inventory_check(data.sku):
            raise ValueError("SKU not found in inventory system")

Catch ValueError from services in your endpoints and convert to HTTPException(status_code=400). This keeps HTTP concerns in the API layer, business errors in the service layer.

SlugServiceMixin

Mix in SlugServiceMixin to get automatic slug generation with uniqueness checking — no custom logic needed.

python
from fastkit_core.services import AsyncBaseCrudService, SlugServiceMixin

class ArticleService(
    SlugServiceMixin,
    AsyncBaseCrudService[Article, ArticleCreate, ArticleUpdate, ArticleResponse]
):
    def __init__(self, repository: AsyncRepository):
        super().__init__(repository, response_schema=ArticleResponse)

    async def before_create(self, data: dict) -> dict:
        # Generates unique slug, appends -1, -2 etc. if needed
        data['slug'] = await self.async_generate_slug(data['title'])
        return data

    async def before_update(self, id: int, data: dict) -> dict:
        if 'title' in data:
            # exclude_id ensures current record isn't seen as a conflict
            data['slug'] = await self.async_generate_slug(
                data['title'],
                exclude_id=id
            )
        return data
slugify(text, separator, max_length)
Static method. Converts text to URL-safe slug string.
generate_slug(text, slug_field, exclude_id, ...)
Sync. Generates unique slug, auto-increments suffix on collision.
async_generate_slug(text, slug_field, exclude_id, ...)
Async version — use inside AsyncBaseCrudService.

API Reference

Hook When Signature
validate_create Before create — first (data: CreateSchema) → None
before_create Before create — after validation (data: dict) → dict
after_create After create, after commit (instance: Model) → None
validate_update Before update — first (id, data: UpdateSchema) → None
before_update Before update — after validation (id, data: dict) → dict
after_update After update, after commit (instance: Model) → None
before_delete Before delete (id) → None
after_delete After delete, after commit (id) → None

All hooks have async equivalents in AsyncBaseCrudService.

find(id, load_relations)
Find by PK. Returns ResponseSchema | None.
find_or_fail(id, load_relations)
Find by PK. Raises NotFoundException if missing.
get_all(limit, load_relations)
All records. Optional limit.
filter(**filters)
Django-style filters. Supports _limit, _offset, _order_by, _load_relations.
filter_one(**filters)
First matching record or None.
paginate(page, per_page, **filters)
Returns (list[ResponseSchema], meta_dict).
exists(**filters)
Returns bool.
count(**filters)
Returns int.
create(data, commit)
Runs validate → before → save → after. Returns ResponseSchema.
create_many(data_list, commit)
Bulk create. Returns list[ResponseSchema].
update(id, data, commit)
Runs validate → before → save → after. Returns updated ResponseSchema.
update_many(filters, data, commit)
Bulk update. Returns affected row count.
delete(id, commit, force)
Runs before → delete → after. Soft deletes if model has SoftDeleteMixin.
delete_many(filters, commit)
Bulk delete. Returns affected row count.
python
async def transfer_money(from_id, to_id, amount, svc: AccountService):
    try:
        from_acc = await svc.find_or_fail(from_id)
        to_acc   = await svc.find_or_fail(to_id)

        await svc.update(from_id, {'balance': from_acc.balance - amount}, commit=False)
        await svc.update(to_id,   {'balance': to_acc.balance + amount},   commit=False)

        await svc.commit()   # commit both or neither
    except Exception:
        await svc.rollback()
        raise
commit()
Manually commit the current transaction.
rollback()
Roll back the current transaction.
flush()
Flush pending changes to the DB without committing.

Complete Example

A full async Article service with slug generation, validation, lifecycle hooks, and a publish workflow:

python
# services.py
from fastkit_core.services import AsyncBaseCrudService, SlugServiceMixin
from fastkit_core.database import AsyncRepository
from models import Article
from schemas import ArticleCreate, ArticleUpdate, ArticleResponse

class ArticleService(
    SlugServiceMixin,
    AsyncBaseCrudService[Article, ArticleCreate, ArticleUpdate, ArticleResponse]
):
    def __init__(self, repository: AsyncRepository):
        super().__init__(repository, response_schema=ArticleResponse)

    async def validate_create(self, data: ArticleCreate) -> None:
        if not await author_exists(data.author_id):
            raise ValueError("Author not found")
        if not await category_exists(data.category_id):
            raise ValueError("Category not found")

    async def before_create(self, data: dict) -> dict:
        data['slug'] = await self.async_generate_slug(data['title'])
        return data

    async def before_update(self, id: int, data: dict) -> dict:
        if 'title' in data:
            data['slug'] = await self.async_generate_slug(
                data['title'], exclude_id=id
            )
        return data

    async def after_create(self, instance: Article) -> None:
        await search_index.add_async(instance)

    async def after_update(self, instance: Article) -> None:
        await cache.delete_async(f'article:{instance.id}')
        await search_index.update_async(instance)

    # Custom business method
    async def publish(self, article_id: int) -> ArticleResponse:
        article = await self.find_or_fail(article_id)
        if article.status == 'published':
            raise ValueError("Article is already published")
        return await self.update(article_id, ArticleUpdate(status='published'))
python
# schemas.py
from pydantic import BaseModel, Field
from datetime import datetime

class ArticleCreate(BaseModel):
    title:       str = Field(min_length=3, max_length=200)
    content:     str = Field(min_length=100)
    author_id:   int
    category_id: int

class ArticleUpdate(BaseModel):
    title:       str | None = None
    content:     str | None = None
    category_id: int | None = None
    status:      str | None = None

class ArticleResponse(BaseModel):
    id:          int
    title:       str
    slug:        str
    content:     str
    author_id:   int
    category_id: int
    status:      str
    created_at:  datetime
    updated_at:  datetime
    model_config = {"from_attributes": True}
python
# main.py
from fastapi import FastAPI, Depends, HTTPException
from fastkit_core.database import get_async_db, AsyncRepository
from fastkit_core.http import success_response

app = FastAPI()

async def get_svc(session = Depends(get_async_db)) -> ArticleService:
    return ArticleService(AsyncRepository(Article, session))

@app.post("/articles", status_code=201)
async def create_article(data: ArticleCreate, svc = Depends(get_svc)):
    try:
        article = await svc.create(data)
        return success_response(data=article.model_dump(), message="Created")
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/articles")
async def list_articles(page: int = 1, status: str = 'published', svc = Depends(get_svc)):
    articles, meta = await svc.paginate(page=page, per_page=20, status=status)
    return {'items': [a.model_dump() for a in articles], 'pagination': meta}

@app.get("/articles/{article_id}")
async def get_article(article_id: int, svc = Depends(get_svc)):
    return success_response(data=(await svc.find_or_fail(article_id)).model_dump())

@app.put("/articles/{article_id}")
async def update_article(article_id: int, data: ArticleUpdate, svc = Depends(get_svc)):
    try:
        updated = await svc.update(article_id, data)
        return success_response(data=updated.model_dump(), message="Updated")
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/articles/{article_id}/publish")
async def publish_article(article_id: int, svc = Depends(get_svc)):
    try:
        published = await svc.publish(article_id)
        return success_response(data=published.model_dump(), message="Published")
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.delete("/articles/{article_id}", status_code=204)
async def delete_article(article_id: int, svc = Depends(get_svc)):
    await svc.delete(article_id)