Давайте представим классическую ситуацию из жизни разработчика: у вас есть банковское приложение, и нужно перевести деньги с одного счета на другой. Очевидно, что эта операция состоит из таких шагов:
- Снимаем деньги с одного счета.
- Кладем деньги на другой счет.
Если на этапе снятия денег все прошло хорошо, а вот на этапе зачисления возникла ошибка — это катастрофа. Деньги куда-то исчезли! Как раз здесь нам на помощь приходят транзакции.
Для начала создадим простую модель Account, чтобы начать работу с транзакциями.
from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Account(Base):
__tablename__ = 'accounts'
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
balance = Column(Float, default=0.0)
Теперь у нас есть таблица аккаунтов с балансом. Допустим, нам нужно реализовать функцию, которая переводит деньги между двумя аккаунтами. Используем транзакцию для обеспечения атомарности операции:
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
def transfer_money(session: Session, from_account_id: int, to_account_id: int, amount: float):
try:
# Начинаем транзакцию
from_account = session.query(Account).filter(Account.id == from_account_id).first()
to_account = session.query(Account).filter(Account.id == to_account_id).first()
if from_account is None or to_account is None:
raise ValueError("Один из аккаунтов не найден!")
if from_account.balance < amount:
raise ValueError("Недостаточно средств на исходящем счете!")
# Снимаем деньги с одного счета
from_account.balance -= amount
# Добавляем деньги на другой счет
to_account.balance += amount
# Фиксируем изменения
session.commit()
print("Перевод успешно выполнен!")
except Exception as e:
# В случае ошибки откатываем изменения
session.rollback()
print(f"Ошибка выполнения перевода: {e}")
finally:
# Закрываем сессию
session.close()
Вот так выглядит базовая реализация перевода средств с использованием транзакции. Мы фиксируем изменения через commit() только в случае успешного выполнения всех шагов. Если что-то пошло не так, используем rollback().
Обработка ошибок и откат транзакций
Ошибки — это неизбежная часть жизни разработчика. К счастью, транзакции позволяют написать код так, чтобы данные оставались в консистентном состоянии даже в случае сбоев. Мы можем добавить дополнительную обработку ошибок для более детального контроля:
def transfer_money_safe(session: Session, from_account_id: int, to_account_id: int, amount: float):
try:
from_account = session.query(Account).filter(Account.id == from_account_id).with_for_update().first()
to_account = session.query(Account).filter(Account.id == to_account_id).with_for_update().first()
if from_account.balance < amount:
raise ValueError("Недостаточно средств!")
from_account.balance -= amount
to_account.balance += amount
session.commit()
print("Перевод выполнен успешно!")
except ValueError as ve:
session.rollback()
print(f"Ошибка валидации: {ve}")
except IntegrityError as ie:
session.rollback()
print(f"Ошибка целостности: {ie}")
except Exception as e:
session.rollback()
print(f"Непредвиденная ошибка: {e}")
finally:
session.close()
Здесь используется with_for_update() для блокировки строк до завершения транзакции. Это поможет избежать одновременной модификации одних и тех же данных разными процессами.
Практические примеры
Теперь давайте перенесем наш пример в FastAPI и представим, что приложение должно выполнять асинхронные операции.
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
app = FastAPI()
@app.post("/transfer/")
async def transfer_money_api(
from_account_id: int, to_account_id: int, amount: float, session: AsyncSession = Depends(get_db)
):
try:
async with session.begin():
result_from = await session.execute(select(Account).where(Account.id == from_account_id))
from_account = result_from.scalars().first()
result_to = await session.execute(select(Account).where(Account.id == to_account_id))
to_account = result_to.scalars().first()
if not from_account or not to_account:
raise HTTPException(status_code=404, detail="Аккаунт не найден!")
if from_account.balance < amount:
raise HTTPException(status_code=400, detail="Недостаточно средств!")
from_account.balance -= amount
to_account.balance += amount
# Изменения фиксируются автоматически благодаря session.begin()
return {"message": "Перевод выполнен успешно!"}
except Exception as e:
await session.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка перевода: {str(e)}")
Здесь мы используем async with session.begin() для работы с транзакцией в асинхронном режиме. Это автоматизирует открытие и фиксацию транзакции, что значительно упрощает код.
Важные нюансы при работе с транзакциями
- Использование rollback: никогда не забывайте откатывать транзакцию при возникновении ошибки, иначе она будет зависать.
- Закрытие сессии: после работы с транзакцией обязательно закрывайте сессию, используя
session.close()(или автоматически с помощью менеджера контекста). - Асинхронные сессии: если вы работаете в асинхронном контексте, используйте
async with session.begin()для корректной работы.
Таким образом, транзакции и механизм откатов (rollback) позволяют защитить данные вашего приложения от ошибок и сбоев в работе. Эти знания пригодятся вам не только в банкинге, но и в любых проектах, где требуется атомарность операций, таких как системы бронирования, e-commerce и прочие высоконагруженные сервисы.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ