Транзакции нужны, чтобы минимизировать риск неконсистентности данных. Например, при выполнении сложной бизнес-логики, где изменение одной таблицы зависит от изменений в другой.
Пример из реального мира:
- Вы делаете банковский перевод денег от одного пользователя другому.
- Это включает два действия:
- Списание средств с одного счёта.
- Зачисление на другой счёт.
- Если одно из действий завершится ошибкой, транзакция должна откатить изменения, чтобы данные остались консистентными.
Транзакции в FastAPI через SQLAlchemy
В экосистеме FastAPI мы можем эффективно работать с транзакциями, используя SQLAlchemy и асинхронный подход. Давайте рассмотрим, как это сделать на практике.
Шаг 1: установить базовую конфигурацию проекта (скорее всего у вас уже давным давно всё установлено, но бывает всякое)
Если вы не настроили SQLAlchemy для работы с FastAPI, сделайте это сначала. Напомним краткое содержание:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from fastapi import FastAPI, Depends
from sqlalchemy.ext.declarative import declarative_base
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
# Dependency для получения сессии
async def get_db():
async with SessionLocal() as session:
yield session
Шаг 2: использовать транзакции для выполнения операций
Представьте, что мы разрабатываем небольшой API для управления банковскими переводами.
Пример операции без транзакции:
from sqlalchemy.exc import SQLAlchemyError
async def transfer_funds(sender_id: int, recipient_id: int, amount: float, db: AsyncSession):
try:
sender = await db.execute(select(User).where(User.id == sender_id))
recipient = await db.execute(select(User).where(User.id == recipient_id))
sender.balance -= amount
recipient.balance += amount
# Сохранение изменений
await db.commit()
except SQLAlchemyError:
# В случае ошибки никаких изменений в базе не будет
await db.rollback()
Активное применение транзакций
Но что, если половина изменений прошла, а потом вылетела ошибка? Именно здесь помогают транзакции. SQLAlchemy позволяет группировать операции так, чтобы в случае ошибки откатить их и сохранить данные базы в исходном состоянии.
Пример:
async def transfer_funds_with_transaction(sender_id: int, recipient_id: int, amount: float, db: AsyncSession):
try:
async with db.begin(): # Начало транзакции
sender = await db.execute(select(User).where(User.id == sender_id))
recipient = await db.execute(select(User).where(User.id == recipient_id))
sender.balance -= amount
recipient.balance += amount
# COMMIT будет вызван автоматически после завершения блока
except SQLAlchemyError:
# Ошибка приводит к автоматическому вызову ROLLBACK
await db.rollback()
raise
Эта реализация гарантирует атомарность операции: если одно из действий упадёт, изменения в базе данных не применятся.
Транзакции в Django через SQLAlchemy
Интеграция SQLAlchemy в Django — задача не стандартная, но возможная. Однако в большинстве случаев транзакции в Django обрабатываются его родным ORM. Однако, если вы всё же используете SQLAlchemy в Django, всё работает аналогично.
from sqlalchemy.ext.asyncio import AsyncSession
from django.http import JsonResponse
# Django использует middleware, можно обрабатывать транзакции в ручном режиме
async def transfer_funds_django_view(request):
sender_id = request.POST.get("sender_id")
recipient_id = request.POST.get("recipient_id")
amount = float(request.POST.get("amount"))
async with AsyncSession() as session:
try:
async with session.begin(): # Начало транзакции
sender = await session.execute(select(User).where(User.id == sender_id))
recipient = await session.execute(select(User).where(User.id == recipient_id))
sender.balance -= amount
recipient.balance += amount
# Транзакция успешно завершена
return JsonResponse({"status": "success"})
except SQLAlchemyError:
await session.rollback()
return JsonResponse({"status": "error", "message": "Transaction failed"})
Работа с асинхронными операциями
Асинхронность добавляет своей изюминки, особенно когда речь о транзакциях. Например, при выполнении долгих операций (например, внешних HTTP-запросов) внутри транзакции может возникнуть тайм-аут.
Советы по предотвращению проблем с асинхронными транзакциями:
- Минимизируйте время выполнения транзакции: операции, не связанные с базой данных (например, вызовы внешних API), выполняйте до или после транзакции.
- Используйте timeouts: для долгих операций выставляйте разумный тайм-аут на уровне движка базы данных.
- Тестируйте граничные сценарии: проверьте, как транзакции ведут себя при неочевидных ошибках (например, недоступность базы данных).
Практический пример: транзакции в FastAPI
Допустим, мы строим API для обработки заказов в магазине. Нам нужно:
- Сперва проверить, есть ли товар на складе.
- Затем уменьшить количество товара.
- После этого создать запись о новом заказе.
Код:
from sqlalchemy.exc import SQLAlchemyError
@router.post("/create_order/")
async def create_order(order: OrderCreate, db: AsyncSession = Depends(get_db)):
try:
async with db.begin():
product = await db.execute(select(Product).where(Product.id == order.product_id))
product = product.scalar_one_or_none()
if not product or product.stock < order.quantity:
raise HTTPException(status_code=400, detail="Product out of stock")
product.stock -= order.quantity
new_order = Order(customer_id=order.customer_id, product_id=order.product_id, quantity=order.quantity)
db.add(new_order)
# COMMIT произойдёт автоматически, если всё пройдёт успешно
return {"status": "success"}
except SQLAlchemyError as e:
await db.rollback()
return {"status": "error", "message": str(e)}
Особенности работы с транзакциями в Django ORM (например, atomic)
Если вы используете встроенный Django ORM, транзакции обрабатываются через декоратор transaction.atomic. Вот как это выглядит:
from django.db import transaction
@transaction.atomic
def transfer_funds(sender_id, recipient_id, amount):
sender = User.objects.get(id=sender_id)
recipient = User.objects.get(id=recipient_id)
sender.balance -= amount
recipient.balance += amount
sender.save()
recipient.save()
Этот подход автоматически завершает транзакцию, если код выполнен без ошибок, или вызывает rollback, если произошёл какой-либо сбой.
Типичные ошибки при работе с транзакциями
- Забывчивость по поводу rollback: иногда разработчики забывают выполнить
rollbackв местах, где транзакция падает с ошибкой. Это может привести к "висящим" транзакциям в базе. - Игнорирование исключений: никогда не игнорируйте ошибки — транзакция может остаться "подвешенной".
- Долгие операции в транзакции: не делайте долгих операций (например, внешних HTTP-запросов) в блоке транзакции.
В реальной жизни умелое использование транзакций не только делает приложения стабильнее и надёжнее, но и создаёт ощущение "магии". Пользователи удивятся, как всё работает без сбоев, а вы улыбнётесь, потому что знаете: это всё благодаря доведённым до блеска транзакциям.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ