Docs / fastkit-core / Events

Events

A lightweight, in-process signal system for decoupling side effects from business logic. Define signals, connect receivers, fire and forget — with built-in error isolation so a failing listener never breaks the request that triggered it.

Signal / Receiver

Django-inspired pattern — named signals, multiple receivers, async and sync.

Error isolation

Receiver exceptions are caught and logged — never propagated to the sender.

Async & sync receivers

Both coroutine and plain functions work as receivers — dispatched automatically.

Backend-agnostic

InProcessBackend today — broker backends (Redis, RabbitMQ) planned for 0.5.0.

Introduction

Side effects — sending a welcome email after registration, clearing a cache after an update, logging an audit trail after a delete — tend to accumulate inside service methods until they become unreadable. The signal system solves this by decoupling the action from its consequences.

A service fires a signal. It has no knowledge of what happens next. Receivers — defined in separate listener files — subscribe to that signal and handle the side effect. The service stays clean. The listeners are independently testable and composable.

Service
fires signal
Signal
dispatches to all receivers
Receiver A
send email
Receiver B
clear cache
Receiver C
log audit

The 0.4.0 event system uses InProcessBackend — all signals are dispatched within the same process synchronously, awaiting all receivers before returning. An async broker backend (Redis Streams, RabbitMQ) is planned for 0.5.0. Using dict, dataclass, or Pydantic models as payloads now ensures a smooth migration when that happens.

Quick Example

python
# users/signals.py
from fastkit_core.events import Signal

user_created = Signal('user.created')
user_updated = Signal('user.updated')
user_deleted = Signal('user.deleted')
python
# users/listeners.py
from .signals import user_created, user_deleted
from fastkit_core.cache import cache

@user_created.connect
async def send_welcome_email(payload: dict) -> None:
    await email_service.send_welcome(payload['email'])

@user_created.connect
async def invalidate_user_cache(payload: dict) -> None:
    await cache.invalidate('users:*')

@user_deleted.connect
async def cleanup_user_files(payload: dict) -> None:
    await storage.delete_user_files(payload['id'])
python
# users/services.py
from .signals import user_created, user_deleted

class UserService(AsyncBaseCrudService[...]):

    async def after_create(self, instance: User) -> None:
        # Service knows nothing about email or cache — just fires the signal
        await user_created.send({'id': instance.id, 'email': instance.email})

    async def after_delete(self, id: int) -> None:
        await user_deleted.send({'id': id})

Defining Signals

A Signal is a named channel. Create one signal instance per event type and keep them in a dedicated signals.py file per module. The name is used for logging and debugging — use a descriptive dot-separated string.

python
from fastkit_core.events import Signal

# Convention: module.event_name
user_created   = Signal('user.created')
user_updated   = Signal('user.updated')
user_deleted   = Signal('user.deleted')
order_placed   = Signal('order.placed')
payment_failed = Signal('payment.failed')

Each Signal instance is independent — receivers connect to a specific instance, not to the name string. Two signals with the same name but different Python objects do not share receivers.

Keep signals defined at module level — one signals.py per application module. Import the same object wherever you need to connect or send. Creating a new Signal('user.created') in a different file produces a separate, disconnected signal.

Connecting Receivers

A receiver is any callable — async or sync — that accepts the signal payload as its first argument. There are three ways to connect one.

Decorator (recommended)

signal.connect used as a decorator registers the function and returns it unchanged, so the function remains normally callable:

python
from .signals import user_created

@user_created.connect
async def send_welcome_email(payload: dict) -> None:
    await email_service.send_welcome(payload['email'])

@user_created.connect
def log_new_user(payload: dict) -> None:   # sync receivers also work
    logger.info("New user: %s", payload['email'])

Explicit connect / disconnect

For dynamic registration at runtime:

python
async def my_receiver(payload: dict) -> None:
    print(payload)

user_created.connect(my_receiver)

# Later — remove the receiver
user_created.disconnect(my_receiver)

Context manager — connected_to

Temporarily connect a receiver for the duration of a block. The receiver is automatically disconnected on exit, even if an exception is raised. Primarily useful in tests:

python
received = []

async def capture(payload):
    received.append(payload)

with user_created.connected_to(capture):
    await user_created.send({'id': 1, 'email': 'a@b.com'})

# capture is disconnected here automatically
assert len(received) == 1

Receiver registration order

Receivers are dispatched in the order they were connected. Duplicate connections are silently ignored — the same callable can only be connected once per signal.

Sending Signals

send() is always async — even with InProcessBackend — to ensure forward compatibility when broker backends are introduced in 0.5.0.

python
# Send with a payload
errors = await user_created.send({'id': 1, 'email': 'alice@example.com'})

# Send with no payload
errors = await order_shipped.send()

# Inspect failures — empty list means all receivers succeeded
if errors:
    logger.warning("%d receiver(s) failed for signal 'user.created'", len(errors))

# Check whether a signal has any receivers before sending
if user_created:
    await user_created.send(payload)

send() always dispatches to all receivers and returns a list of any exceptions that were raised. A receiver failure never stops other receivers or propagates to the caller — see Error Isolation for details.

Payloads

A payload is the data passed from the sender to all receivers. FastKit Core accepts any Python object but warns at runtime when the payload type would not survive serialization to a message broker.

Recommended payload types

python
from pydantic import BaseModel
from dataclasses import dataclass

# ✅ dict — simple and always works
await user_created.send({'id': 1, 'email': 'alice@example.com'})

# ✅ Pydantic model — typed, validated, broker-ready
class UserCreatedPayload(BaseModel):
    id:    int
    email: str
    name:  str

await user_created.send(UserCreatedPayload(id=1, email='alice@example.com', name='Alice'))

# ✅ dataclass — also broker-ready
@dataclass
class OrderPlacedPayload:
    order_id: int
    total:    float

await order_placed.send(OrderPlacedPayload(order_id=42, total=99.99))

# ⚠️ SQLAlchemy ORM instance — works now but triggers a UserWarning
await user_created.send(user_orm_instance)

Serialization warning

When a payload is not a dict, Pydantic model, or dataclass, FastKit emits a UserWarning at send time. This warning exists as a forward-compatibility reminder — broker backends introduced in 0.5.0 will require serializable payloads. The signal still fires normally.

Payload typeWorks nowBroker-ready (0.5.0)
dict
Pydantic BaseModel
dataclass
SQLAlchemy model instance✓ (with warning)
Arbitrary object✓ (with warning)
None

Error Isolation

A receiver that raises an exception does not affect other receivers or the code that called send(). The InProcessBackend catches every exception, logs it with full traceback via the standard logging module, and continues to the next receiver.

python
@user_created.connect
async def send_email(payload: dict) -> None:
    raise ConnectionError("SMTP server down")   # ← this fails

@user_created.connect
async def invalidate_cache(payload: dict) -> None:
    await cache.invalidate('users:*')            # ← this still runs

# send() returns the list of exceptions, does not raise
errors = await user_created.send({'id': 1, 'email': 'alice@example.com'})
# errors == [ConnectionError("SMTP server down")]
# cache was still invalidated
# the route handler that triggered this never saw an exception

The returned list[Exception] lets the caller inspect failures if needed. In most cases you can discard it — the logging output is sufficient. This design choice leaves retry logic and dead-letter handling as the responsibility of 0.5.0 broker backends.

Error isolation means signals are best suited for side effects that are tolerable to fail silently under degraded conditions — cache invalidation, email notifications, search index updates. For operations that must succeed atomically with the main action, use lifecycle hooks directly instead.

Service Layer Pattern

The recommended structure is to fire signals from service lifecycle hooks. This keeps route handlers completely ignorant of side effects and makes the service independently testable with or without listeners connected.

python
# users/signals.py
from fastkit_core.events import Signal

user_created = Signal('user.created')
user_updated = Signal('user.updated')
user_deleted = Signal('user.deleted')
python
# users/listeners.py
from .signals import user_created, user_updated, user_deleted
from fastkit_core.cache import cache

@user_created.connect
async def send_welcome_email(payload: dict) -> None:
    await email_service.send(
        to=payload['email'],
        template='welcome',
        context={'name': payload['name']}
    )

@user_created.connect
async def create_default_settings(payload: dict) -> None:
    await settings_service.create_defaults(user_id=payload['id'])

@user_updated.connect
async def invalidate_user_cache(payload: dict) -> None:
    await cache.delete(f"users:{payload['id']}")

@user_deleted.connect
async def cleanup_user_data(payload: dict) -> None:
    await cache.delete(f"users:{payload['id']}")
    await storage.delete_user_files(payload['id'])
python
# users/service.py
from fastkit_core.services import AsyncBaseCrudService
from .signals import user_created, user_updated, user_deleted
from models import User
from schemas import UserCreate, UserUpdate, UserResponse

class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):

    def __init__(self, repository):
        super().__init__(repository, response_schema=UserResponse)

    async def before_create(self, data: dict) -> dict:
        data['password'] = await hash_password(data['password'])
        return data

    async def after_create(self, instance: User) -> None:
        await user_created.send({
            'id':    instance.id,
            'email': instance.email,
            'name':  instance.full_name,
        })

    async def after_update(self, instance: User) -> None:
        await user_updated.send({'id': instance.id})

    async def after_delete(self, id: int) -> None:
        await user_deleted.send({'id': id})

Import listeners.py at application startup so that receivers are registered before any request arrives. The import itself triggers the @signal.connect decorators.

python
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI

# ← Import listeners at startup — this registers all receivers
import users.listeners
import orders.listeners

@asynccontextmanager
async def lifespan(app: FastAPI):
    yield

app = FastAPI(lifespan=lifespan)

If listeners.py is never imported, the @signal.connect decorators never execute and the receivers are never registered. Signals will fire but no receiver will respond — no error, just silence.

Testing

The connected_to() context manager and disconnect() make signals straightforward to test in isolation.

Assert a signal was sent

python
import pytest
from users.signals import user_created

@pytest.mark.asyncio
async def test_create_user_fires_signal(user_service):
    received = []

    async def capture(payload):
        received.append(payload)

    with user_created.connected_to(capture):
        await user_service.create(UserCreate(
            email='alice@example.com',
            name='Alice',
            password='Secret123!'
        ))

    assert len(received) == 1
    assert received[0]['email'] == 'alice@example.com'

Test a receiver in isolation

python
from users.listeners import send_welcome_email

@pytest.mark.asyncio
async def test_send_welcome_email(mock_email_service):
    # Call the receiver function directly — no signal involved
    await send_welcome_email({'email': 'alice@example.com', 'name': 'Alice'})

    mock_email_service.send.assert_called_once_with(
        to='alice@example.com',
        template='welcome',
        context={'name': 'Alice'}
    )

Isolating tests from production listeners

Because connected_to() is scoped to a block, tests that don’t import listeners.py will never trigger production side effects. If your test suite does import listeners globally (e.g. via conftest), disconnect them per-test:

python
import pytest
from users.signals import user_created
from users.listeners import send_welcome_email

@pytest.fixture(autouse=True)
def disconnect_email_listener():
    """Prevent real emails during tests."""
    user_created.disconnect(send_welcome_email)
    yield
    user_created.connect(send_welcome_email)  # restore after test

Custom Backend

Extend BaseSignalBackend to implement a custom dispatch strategy. The most common use case is a test backend that records dispatched signals:

python
from fastkit_core.events import BaseSignalBackend
from collections import defaultdict
from typing import Any, Callable

class RecordingBackend(BaseSignalBackend):
    """Records dispatched signals without calling any receivers — useful in tests."""

    def __init__(self):
        self._receivers: dict[str, list[Callable]] = defaultdict(list)
        self.dispatched: list[tuple[str, Any]] = []

    async def send(self, signal_name: str, payload: Any, **kwargs) -> list[Exception]:
        self.dispatched.append((signal_name, payload))
        return []

    def connect(self, signal_name: str, receiver: Callable) -> None:
        self._receivers[signal_name].append(receiver)

    def disconnect(self, signal_name: str, receiver: Callable) -> None:
        try:
            self._receivers[signal_name].remove(receiver)
        except ValueError:
            pass

    def receivers(self, signal_name: str) -> list[Callable]:
        return list(self._receivers.get(signal_name, []))

Inject a custom backend by patching the module-level singleton directly:

python
import fastkit_core.events.signal as signal_module

@pytest.fixture(autouse=True)
def recording_backend():
    backend = RecordingBackend()
    signal_module._backend_instance = backend
    yield backend
    signal_module._backend_instance = None

API Reference

Signal

Signal(name)
Create a new signal with the given name. Uses the module-level InProcessBackend singleton.
connect(receiver)
Register a receiver. Usable as a decorator. Returns the receiver unchanged. Duplicate connections are silently ignored.
disconnect(receiver)
Unregister a receiver. No error if the receiver was not connected.
send(payload, **kwargs)
async — dispatch to all receivers. Returns list[Exception] from failed receivers. Never raises.
connected_to(receiver)
Context manager. Connects receiver on enter, disconnects on exit (even on exception).
receivers
Property. Returns list[Callable] of all currently connected receivers.
__bool__
bool(signal) is True when the signal has at least one receiver connected.

InProcessBackend

send(signal_name, payload, **kwargs)
async — dispatches to all receivers. Async receivers are awaited, sync receivers are called directly. Exceptions are caught, logged, and appended to the returned list.
connect(signal_name, receiver)
Register receiver. Duplicate ignored.
disconnect(signal_name, receiver)
Unregister receiver. No error if not found.
receivers(signal_name)
Returns a copy of the receiver list for the given signal.

BaseSignalBackend

send(signal_name, payload, **kwargs) → list[Exception]
Abstract. Must never propagate receiver exceptions — catch, log, and return them.
connect(signal_name, receiver)
Abstract. Register a receiver for the given signal name.
disconnect(signal_name, receiver)
Abstract. Unregister a receiver.
receivers(signal_name) → list[Callable]
Abstract. Return all receivers for the given signal name.

Complete Example

A full e-commerce order flow demonstrating signals across two modules:

python
# orders/signals.py
from fastkit_core.events import Signal

order_placed   = Signal('order.placed')
order_shipped  = Signal('order.shipped')
order_canceled = Signal('order.canceled')
python
# orders/listeners.py
from .signals import order_placed, order_shipped, order_canceled
from fastkit_core.cache import cache
import logging

logger = logging.getLogger(__name__)

@order_placed.connect
async def send_order_confirmation(payload: dict) -> None:
    await email_service.send(
        to=payload['customer_email'],
        template='order_confirmation',
        context={'order_id': payload['order_id'], 'total': payload['total']}
    )

@order_placed.connect
async def reserve_inventory(payload: dict) -> None:
    for item in payload['items']:
        await inventory_service.reserve(item['sku'], item['quantity'])

@order_placed.connect
async def invalidate_order_cache(payload: dict) -> None:
    await cache.invalidate('orders:*')

@order_shipped.connect
async def send_tracking_email(payload: dict) -> None:
    await email_service.send(
        to=payload['customer_email'],
        template='order_shipped',
        context={'tracking_number': payload['tracking_number']}
    )

@order_canceled.connect
async def release_inventory(payload: dict) -> None:
    for item in payload['items']:
        await inventory_service.release(item['sku'], item['quantity'])

@order_canceled.connect
async def process_refund(payload: dict) -> None:
    if payload.get('payment_id'):
        await payment_service.refund(payload['payment_id'], payload['total'])
python
# orders/service.py
from fastkit_core.services import AsyncBaseCrudService
from .signals import order_placed, order_shipped, order_canceled
from models import Order
from schemas import OrderCreate, OrderUpdate, OrderResponse

class OrderService(AsyncBaseCrudService[Order, OrderCreate, OrderUpdate, OrderResponse]):

    def __init__(self, repository):
        super().__init__(repository, response_schema=OrderResponse)

    async def after_create(self, instance: Order) -> None:
        await order_placed.send({
            'order_id':       instance.id,
            'customer_email': instance.customer_email,
            'total':          instance.total,
            'items':          [{'sku': i.sku, 'quantity': i.quantity}
                               for i in instance.items],
        })

    async def ship(self, order_id: int, tracking_number: str) -> OrderResponse:
        order = await self.update(order_id, OrderUpdate(
            status='shipped',
            tracking_number=tracking_number
        ))
        await order_shipped.send({
            'order_id':        order_id,
            'customer_email':  order.customer_email,
            'tracking_number': tracking_number,
        })
        return order

    async def cancel(self, order_id: int) -> OrderResponse:
        order = await self.find_or_fail(order_id)
        updated = await self.update(order_id, OrderUpdate(status='canceled'))
        await order_canceled.send({
            'order_id':   order_id,
            'payment_id': order.payment_id,
            'total':      order.total,
            'items':      [{'sku': i.sku, 'quantity': i.quantity}
                           for i in order.items],
        })
        return updated
python
# tests/test_order_service.py
import pytest
from orders.signals import order_placed, order_canceled

@pytest.mark.asyncio
async def test_place_order_fires_signal(order_service):
    received = []

    with order_placed.connected_to(lambda p: received.append(p)):
        await order_service.create(OrderCreate(
            customer_email='alice@example.com',
            items=[{'sku': 'WIDGET-1', 'quantity': 2}],
        ))

    assert len(received) == 1
    assert received[0]['customer_email'] == 'alice@example.com'
    assert received[0]['total'] > 0

@pytest.mark.asyncio
async def test_cancel_fires_signal_with_items(order_service, existing_order):
    received = []

    with order_canceled.connected_to(lambda p: received.append(p)):
        await order_service.cancel(existing_order.id)

    assert received[0]['order_id'] == existing_order.id
    assert len(received[0]['items']) > 0