Events
A lightweight, in-process signal system for decoupling side effects from business logic. Define signals, connect receivers, fire and forget — with built-in error isolation so a failing listener never breaks the request that triggered it.
Django-inspired pattern — named signals, multiple receivers, async and sync.
Receiver exceptions are caught and logged — never propagated to the sender.
Both coroutine and plain functions work as receivers — dispatched automatically.
InProcessBackend today — broker backends (Redis, RabbitMQ) planned for 0.5.0.
Introduction
Side effects — sending a welcome email after registration, clearing a cache after an update, logging an audit trail after a delete — tend to accumulate inside service methods until they become unreadable. The signal system solves this by decoupling the action from its consequences.
A service fires a signal. It has no knowledge of what happens next. Receivers — defined in separate listener files — subscribe to that signal and handle the side effect. The service stays clean. The listeners are independently testable and composable.
The 0.4.0 event system uses InProcessBackend — all signals are dispatched within the same process synchronously, awaiting all receivers before returning. An async broker backend (Redis Streams, RabbitMQ) is planned for 0.5.0. Using dict, dataclass, or Pydantic models as payloads now ensures a smooth migration when that happens.
Quick Example
# users/signals.py
from fastkit_core.events import Signal
user_created = Signal('user.created')
user_updated = Signal('user.updated')
user_deleted = Signal('user.deleted')
# users/listeners.py
from .signals import user_created, user_deleted
from fastkit_core.cache import cache
@user_created.connect
async def send_welcome_email(payload: dict) -> None:
await email_service.send_welcome(payload['email'])
@user_created.connect
async def invalidate_user_cache(payload: dict) -> None:
await cache.invalidate('users:*')
@user_deleted.connect
async def cleanup_user_files(payload: dict) -> None:
await storage.delete_user_files(payload['id'])
# users/services.py
from .signals import user_created, user_deleted
class UserService(AsyncBaseCrudService[...]):
async def after_create(self, instance: User) -> None:
# Service knows nothing about email or cache — just fires the signal
await user_created.send({'id': instance.id, 'email': instance.email})
async def after_delete(self, id: int) -> None:
await user_deleted.send({'id': id})
Defining Signals
A Signal is a named channel. Create one signal instance per event type and keep them in a dedicated signals.py file per module. The name is used for logging and debugging — use a descriptive dot-separated string.
from fastkit_core.events import Signal
# Convention: module.event_name
user_created = Signal('user.created')
user_updated = Signal('user.updated')
user_deleted = Signal('user.deleted')
order_placed = Signal('order.placed')
payment_failed = Signal('payment.failed')
Each Signal instance is independent — receivers connect to a specific instance, not to the name string. Two signals with the same name but different Python objects do not share receivers.
Keep signals defined at module level — one signals.py per application module. Import the same object wherever you need to connect or send. Creating a new Signal('user.created') in a different file produces a separate, disconnected signal.
Connecting Receivers
A receiver is any callable — async or sync — that accepts the signal payload as its first argument. There are three ways to connect one.
Decorator (recommended)
signal.connect used as a decorator registers the function and returns it unchanged, so the function remains normally callable:
from .signals import user_created
@user_created.connect
async def send_welcome_email(payload: dict) -> None:
await email_service.send_welcome(payload['email'])
@user_created.connect
def log_new_user(payload: dict) -> None: # sync receivers also work
logger.info("New user: %s", payload['email'])
Explicit connect / disconnect
For dynamic registration at runtime:
async def my_receiver(payload: dict) -> None:
print(payload)
user_created.connect(my_receiver)
# Later — remove the receiver
user_created.disconnect(my_receiver)
Context manager — connected_to
Temporarily connect a receiver for the duration of a block. The receiver is automatically disconnected on exit, even if an exception is raised. Primarily useful in tests:
received = []
async def capture(payload):
received.append(payload)
with user_created.connected_to(capture):
await user_created.send({'id': 1, 'email': 'a@b.com'})
# capture is disconnected here automatically
assert len(received) == 1
Receiver registration order
Receivers are dispatched in the order they were connected. Duplicate connections are silently ignored — the same callable can only be connected once per signal.
Sending Signals
send() is always async — even with InProcessBackend — to ensure forward compatibility when broker backends are introduced in 0.5.0.
# Send with a payload
errors = await user_created.send({'id': 1, 'email': 'alice@example.com'})
# Send with no payload
errors = await order_shipped.send()
# Inspect failures — empty list means all receivers succeeded
if errors:
logger.warning("%d receiver(s) failed for signal 'user.created'", len(errors))
# Check whether a signal has any receivers before sending
if user_created:
await user_created.send(payload)
send() always dispatches to all receivers and returns a list of any exceptions that were raised. A receiver failure never stops other receivers or propagates to the caller — see Error Isolation for details.
Payloads
A payload is the data passed from the sender to all receivers. FastKit Core accepts any Python object but warns at runtime when the payload type would not survive serialization to a message broker.
Recommended payload types
from pydantic import BaseModel
from dataclasses import dataclass
# ✅ dict — simple and always works
await user_created.send({'id': 1, 'email': 'alice@example.com'})
# ✅ Pydantic model — typed, validated, broker-ready
class UserCreatedPayload(BaseModel):
id: int
email: str
name: str
await user_created.send(UserCreatedPayload(id=1, email='alice@example.com', name='Alice'))
# ✅ dataclass — also broker-ready
@dataclass
class OrderPlacedPayload:
order_id: int
total: float
await order_placed.send(OrderPlacedPayload(order_id=42, total=99.99))
# ⚠️ SQLAlchemy ORM instance — works now but triggers a UserWarning
await user_created.send(user_orm_instance)
Serialization warning
When a payload is not a dict, Pydantic model, or dataclass, FastKit emits a UserWarning at send time. This warning exists as a forward-compatibility reminder — broker backends introduced in 0.5.0 will require serializable payloads. The signal still fires normally.
| Payload type | Works now | Broker-ready (0.5.0) |
|---|---|---|
dict | ✓ | ✓ |
Pydantic BaseModel | ✓ | ✓ |
dataclass | ✓ | ✓ |
| SQLAlchemy model instance | ✓ (with warning) | ✗ |
| Arbitrary object | ✓ (with warning) | ✗ |
None | ✓ | ✓ |
Error Isolation
A receiver that raises an exception does not affect other receivers or the code that called send(). The InProcessBackend catches every exception, logs it with full traceback via the standard logging module, and continues to the next receiver.
@user_created.connect
async def send_email(payload: dict) -> None:
raise ConnectionError("SMTP server down") # ← this fails
@user_created.connect
async def invalidate_cache(payload: dict) -> None:
await cache.invalidate('users:*') # ← this still runs
# send() returns the list of exceptions, does not raise
errors = await user_created.send({'id': 1, 'email': 'alice@example.com'})
# errors == [ConnectionError("SMTP server down")]
# cache was still invalidated
# the route handler that triggered this never saw an exception
The returned list[Exception] lets the caller inspect failures if needed. In most cases you can discard it — the logging output is sufficient. This design choice leaves retry logic and dead-letter handling as the responsibility of 0.5.0 broker backends.
Error isolation means signals are best suited for side effects that are tolerable to fail silently under degraded conditions — cache invalidation, email notifications, search index updates. For operations that must succeed atomically with the main action, use lifecycle hooks directly instead.
Service Layer Pattern
The recommended structure is to fire signals from service lifecycle hooks. This keeps route handlers completely ignorant of side effects and makes the service independently testable with or without listeners connected.
# users/signals.py
from fastkit_core.events import Signal
user_created = Signal('user.created')
user_updated = Signal('user.updated')
user_deleted = Signal('user.deleted')
# users/listeners.py
from .signals import user_created, user_updated, user_deleted
from fastkit_core.cache import cache
@user_created.connect
async def send_welcome_email(payload: dict) -> None:
await email_service.send(
to=payload['email'],
template='welcome',
context={'name': payload['name']}
)
@user_created.connect
async def create_default_settings(payload: dict) -> None:
await settings_service.create_defaults(user_id=payload['id'])
@user_updated.connect
async def invalidate_user_cache(payload: dict) -> None:
await cache.delete(f"users:{payload['id']}")
@user_deleted.connect
async def cleanup_user_data(payload: dict) -> None:
await cache.delete(f"users:{payload['id']}")
await storage.delete_user_files(payload['id'])
# users/service.py
from fastkit_core.services import AsyncBaseCrudService
from .signals import user_created, user_updated, user_deleted
from models import User
from schemas import UserCreate, UserUpdate, UserResponse
class UserService(AsyncBaseCrudService[User, UserCreate, UserUpdate, UserResponse]):
def __init__(self, repository):
super().__init__(repository, response_schema=UserResponse)
async def before_create(self, data: dict) -> dict:
data['password'] = await hash_password(data['password'])
return data
async def after_create(self, instance: User) -> None:
await user_created.send({
'id': instance.id,
'email': instance.email,
'name': instance.full_name,
})
async def after_update(self, instance: User) -> None:
await user_updated.send({'id': instance.id})
async def after_delete(self, id: int) -> None:
await user_deleted.send({'id': id})
Import listeners.py at application startup so that receivers are registered before any request arrives. The import itself triggers the @signal.connect decorators.
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
# ← Import listeners at startup — this registers all receivers
import users.listeners
import orders.listeners
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
app = FastAPI(lifespan=lifespan)
If listeners.py is never imported, the @signal.connect decorators never execute and the receivers are never registered. Signals will fire but no receiver will respond — no error, just silence.
Testing
The connected_to() context manager and disconnect() make signals straightforward to test in isolation.
Assert a signal was sent
import pytest
from users.signals import user_created
@pytest.mark.asyncio
async def test_create_user_fires_signal(user_service):
received = []
async def capture(payload):
received.append(payload)
with user_created.connected_to(capture):
await user_service.create(UserCreate(
email='alice@example.com',
name='Alice',
password='Secret123!'
))
assert len(received) == 1
assert received[0]['email'] == 'alice@example.com'
Test a receiver in isolation
from users.listeners import send_welcome_email
@pytest.mark.asyncio
async def test_send_welcome_email(mock_email_service):
# Call the receiver function directly — no signal involved
await send_welcome_email({'email': 'alice@example.com', 'name': 'Alice'})
mock_email_service.send.assert_called_once_with(
to='alice@example.com',
template='welcome',
context={'name': 'Alice'}
)
Isolating tests from production listeners
Because connected_to() is scoped to a block, tests that don’t import listeners.py will never trigger production side effects. If your test suite does import listeners globally (e.g. via conftest), disconnect them per-test:
import pytest
from users.signals import user_created
from users.listeners import send_welcome_email
@pytest.fixture(autouse=True)
def disconnect_email_listener():
"""Prevent real emails during tests."""
user_created.disconnect(send_welcome_email)
yield
user_created.connect(send_welcome_email) # restore after test
Custom Backend
Extend BaseSignalBackend to implement a custom dispatch strategy. The most common use case is a test backend that records dispatched signals:
from fastkit_core.events import BaseSignalBackend
from collections import defaultdict
from typing import Any, Callable
class RecordingBackend(BaseSignalBackend):
"""Records dispatched signals without calling any receivers — useful in tests."""
def __init__(self):
self._receivers: dict[str, list[Callable]] = defaultdict(list)
self.dispatched: list[tuple[str, Any]] = []
async def send(self, signal_name: str, payload: Any, **kwargs) -> list[Exception]:
self.dispatched.append((signal_name, payload))
return []
def connect(self, signal_name: str, receiver: Callable) -> None:
self._receivers[signal_name].append(receiver)
def disconnect(self, signal_name: str, receiver: Callable) -> None:
try:
self._receivers[signal_name].remove(receiver)
except ValueError:
pass
def receivers(self, signal_name: str) -> list[Callable]:
return list(self._receivers.get(signal_name, []))
Inject a custom backend by patching the module-level singleton directly:
import fastkit_core.events.signal as signal_module
@pytest.fixture(autouse=True)
def recording_backend():
backend = RecordingBackend()
signal_module._backend_instance = backend
yield backend
signal_module._backend_instance = None
API Reference
Signal
InProcessBackend singleton.async — dispatch to all receivers. Returns list[Exception] from failed receivers. Never raises.list[Callable] of all currently connected receivers.bool(signal) is True when the signal has at least one receiver connected.InProcessBackend
async — dispatches to all receivers. Async receivers are awaited, sync receivers are called directly. Exceptions are caught, logged, and appended to the returned list.BaseSignalBackend
Complete Example
A full e-commerce order flow demonstrating signals across two modules:
# orders/signals.py
from fastkit_core.events import Signal
order_placed = Signal('order.placed')
order_shipped = Signal('order.shipped')
order_canceled = Signal('order.canceled')
# orders/listeners.py
from .signals import order_placed, order_shipped, order_canceled
from fastkit_core.cache import cache
import logging
logger = logging.getLogger(__name__)
@order_placed.connect
async def send_order_confirmation(payload: dict) -> None:
await email_service.send(
to=payload['customer_email'],
template='order_confirmation',
context={'order_id': payload['order_id'], 'total': payload['total']}
)
@order_placed.connect
async def reserve_inventory(payload: dict) -> None:
for item in payload['items']:
await inventory_service.reserve(item['sku'], item['quantity'])
@order_placed.connect
async def invalidate_order_cache(payload: dict) -> None:
await cache.invalidate('orders:*')
@order_shipped.connect
async def send_tracking_email(payload: dict) -> None:
await email_service.send(
to=payload['customer_email'],
template='order_shipped',
context={'tracking_number': payload['tracking_number']}
)
@order_canceled.connect
async def release_inventory(payload: dict) -> None:
for item in payload['items']:
await inventory_service.release(item['sku'], item['quantity'])
@order_canceled.connect
async def process_refund(payload: dict) -> None:
if payload.get('payment_id'):
await payment_service.refund(payload['payment_id'], payload['total'])
# orders/service.py
from fastkit_core.services import AsyncBaseCrudService
from .signals import order_placed, order_shipped, order_canceled
from models import Order
from schemas import OrderCreate, OrderUpdate, OrderResponse
class OrderService(AsyncBaseCrudService[Order, OrderCreate, OrderUpdate, OrderResponse]):
def __init__(self, repository):
super().__init__(repository, response_schema=OrderResponse)
async def after_create(self, instance: Order) -> None:
await order_placed.send({
'order_id': instance.id,
'customer_email': instance.customer_email,
'total': instance.total,
'items': [{'sku': i.sku, 'quantity': i.quantity}
for i in instance.items],
})
async def ship(self, order_id: int, tracking_number: str) -> OrderResponse:
order = await self.update(order_id, OrderUpdate(
status='shipped',
tracking_number=tracking_number
))
await order_shipped.send({
'order_id': order_id,
'customer_email': order.customer_email,
'tracking_number': tracking_number,
})
return order
async def cancel(self, order_id: int) -> OrderResponse:
order = await self.find_or_fail(order_id)
updated = await self.update(order_id, OrderUpdate(status='canceled'))
await order_canceled.send({
'order_id': order_id,
'payment_id': order.payment_id,
'total': order.total,
'items': [{'sku': i.sku, 'quantity': i.quantity}
for i in order.items],
})
return updated
# tests/test_order_service.py
import pytest
from orders.signals import order_placed, order_canceled
@pytest.mark.asyncio
async def test_place_order_fires_signal(order_service):
received = []
with order_placed.connected_to(lambda p: received.append(p)):
await order_service.create(OrderCreate(
customer_email='alice@example.com',
items=[{'sku': 'WIDGET-1', 'quantity': 2}],
))
assert len(received) == 1
assert received[0]['customer_email'] == 'alice@example.com'
assert received[0]['total'] > 0
@pytest.mark.asyncio
async def test_cancel_fires_signal_with_items(order_service, existing_order):
received = []
with order_canceled.connected_to(lambda p: received.append(p)):
await order_service.cancel(existing_order.id)
assert received[0]['order_id'] == existing_order.id
assert len(received[0]['items']) > 0