7.1 Зачем это надо

Мы с вами довольно подробно проговорили все свойства ACID, их предназначение и сценарии использования. Как вы уже поняли, не все БД предлагают гарантии ACID, жертвуя ими ради более высокой производительности. Поэтому вполне может случиться, что на вашем проекте будет выбрана БД, не предлагающая ACID, и вам может понадобиться воплотить часть необходимого функционала ACID на стороне приложения. А если ваша система будет спроектирована как микросервисы, или какой-то другой вид распределённых приложений, то, что в одном сервисе было бы обычной локальной транзакцией, теперь станет распределённой транзакцией – и, конечно, потеряет свою ACID-природу, даже если БД каждого отдельного микросервиса будет ACID.

Я не хочу давать вам исчерпывающее руководство по тому, как создать менеджера транзакций – просто потому, что это слишком большая и сложная тема, а я хочу описать лишь несколько основных техник. Если же речь не идёт о распределённых приложениях, то я не вижу смысла пытаться полностью воплотить ACID на стороне приложения, если вам нужны гарантии ACID – ведь проще и дешевле во всех смыслах будет взять уже готовое решение (то есть, БД с ACID).

Но я бы хотел показать вам некоторые техники, которые помогут вам в осуществлении транзакций на стороне приложения. В конце концов, знание этих техник может помочь вам в разных сценариях, даже не обязательно связанных с транзакциями, и сделает вас лучшими разработчиками (надеюсь на это).

7.2 Базовый инструментарий для любителей транзакций

Оптимистичная и пессимистичная блокировка. Это два типа блокировки неких данных, к которым может возникнуть одновременный доступ.

Оптимист полагает, что вероятность одновременного доступа не так велика, а потому он делает следующее: читает нужную строку, запоминает номер её версии (или timestamp, или checksum / hash – если вы не можете изменить схему данных и добавить столбец для версии или timestamp), и перед тем, как записать в БД изменения для этих данных, проверяет, не изменилась ли версия этих данных. Если версия изменилась, то нужно как-то решить создавшийся конфликт и обновить данные (“commit”), либо откатить транзакцию (“rollback”). Минус этого метода в том, что он создаёт благоприятные условия для бага с длинным названием “time-of-check to time-of-use”, сокращённо TOCTOU: состояние в период времени между проверкой и записью может измениться. Я не имею опыта использования оптимистичной блокировки, а «Википедия» в качестве решения предлагает использовать exception handling вместо проверки, что мне лично в контексте баз данных мало о чём говорит, если честно.

В качестве примера я нашёл одну технологию из повседневной жизни разработчика, которая использует нечто вроде оптимистичной блокировки – это протокол HTTP. Ответ на изначальный HTTP-запрос GET может включать в себя заголовок ETag для последующих запросов PUT со стороны клиента, который тот может использовать в заголовке If-Match. Для методов GET и HEAD сервер отправит обратно запрошенный ресурс, только если он соответствует одному из знакомых ему ETag. Для PUT и других небезопасных методов он будет загружать ресурс также только в этом случае. Если вы не знаете, как работает ETag, то вот хороший пример, с использованием библиотеки "feedparser" (которая помогает парсить RSS и прочие feeds).


>>> import feedparser 
>>> d = feedparser.parse('http://feedparser.org/docs/examples/atom10.xml') 
>>> d.etag 
'"6c132-941-ad7e3080"' 
>>> d2 = feedparser.parse('http://feedparser.org/docs/examples/atom10.xml', etag=d.etag) 
>>> d2.feed 
{} 
>>> d2.debug_message 
'The feed has not changed since you last checked, so the server sent no data.  This is a feature, not a bug!' 

Пессимист же исходит из того, что транзакции часто будут «встречаться» на одних и тех же данных, и чтобы упростить себе жизнь и избежать лишних race conditions, он просто блокирует необходимые ему данные. Для того, чтобы воплотить механизм блокировки, вам нужно либо поддерживать соединение с БД для вашей сессии (а не брать соединения из пула – в этом случае вам, скорее всего, придётся работать с оптимистичной блокировкой), либо использовать ID для транзакции, которая может быть использована независимо от соединения. Минус пессимистичной блокировки в том, что её использование замедляет обработку транзакций в целом, но зато вы можете быть спокойны за данные и получаете настоящую изоляцию.

Дополнительная опасность, правда, таится в возможной взаимной блокировке („deadlock“), при которой несколько процессов ожидают ресурсы, заблокированные друг другом. Например, для проведения транзакции нужные ресурсы А и Б. Процесс 1 занял ресурс А, а процесс 2 – ресурс Б. Ни один из двух процессов не может продолжить выполнение. Существуют различные способы решения этого вопроса – я не хочу сейчас вдаваться в детали, поэтому для начала почитайте «Википедию» , но если вкратце, то есть возможность создания иерархии блокировок. Если вы хотите познакомиться подробнее с этой концепцией, то предлагают вам поломать голову над «Задачей об обедающих философах» (“dining philosophers problem”).

Вот тут есть хороший пример того, как поведут себя обе блокировки в одном и том же сценарии.

Касательно реализаций locks. Не хочу вдаваться в подробности, но для распределённых систем существуют менеджеры блокировок, например: ZooKeeper, Redis, etcd, Consul.

7.3 Идемпотентность операций

Идемпотентность кода – это вообще хорошая практика, и это как раз тот случай, когда разработчику хорошо бы уметь это делать вне зависимости от того, использует ли он транзакции или нет. Идемпотентность – это свойство операции давать тот же результат при повторном применении этой операции к объекту. Функция была вызвана – дала результат. Вызвана ещё раз через секунду или пять – дала тот же результат. Конечно, если данные в БД поменялись, то результат будет другой. Данные в третьих системах могут не зависеть от функции, но всё, что зависит – должно быть предсказуемым.

Проявлений у идемпотентности может быть несколько. Одно из них – это просто рекомендация к тому, как надо писать свой код. Вы же помните, что лучшая функция – это та, которая делает одну вещь? И что хорошо бы написать для этой функции unit-тесты? Если вы придерживаетесь этих двух правил, то вы уже повышаете шанс на то, что ваши функции будут идемпотентны. Чтобы не возникло путаницы, уточню, что идемпотентные функции – не обязательные «чистые» (в смысле „function purity“). Чистые функции – это те функции, которые оперируют только теми данными, которые получили на входе, никак их не меняя и возвращая обработанный результат. Это те функции, которые позволяют скалировать приложение, используя техники функционального программирования. Поскольку мы говорим про некие общие данные и БД, то наши функции вряд ли будут чистыми, ибо они будут менять состояние БД или программ (сервисов).

Вот это — чистая функция:


def square(num: int) -> int: 
	return num * num 

А вот эта функция - не чистая, но идемпотентная (прошу не делать выводов о том, как я пишу код, по этим кускам):


def insert_data(insert_query: str, db_connection: DbConnectionType) -> int: 
  db_connection.execute(insert_query) 
  return True 

Вместо множества слов, я могу просто рассказать о том, как я вынужденно научился писать идемпотентные программы. Я много работаю с AWS, как вы уже могли понять, и там есть сервис под названием AWS Lambda. Lambda позволяет не заботиться о серверах, а просто загружать код, который будет запускаться в ответ на какие-то события или по расписанию. Событием могут быть сообщения, которые доставляются брокером (message broker). В AWS таким брокером является AWS SNS. Думаю, что это должно быть понятно даже для тех, кто не работает с AWS: у нас есть брокер, который отправляет сообщения по каналам (“topics”), и микросервисы, которые подписаны на эти каналы, получают сообщения и как-то на них реагируют.

Проблема заключаются в том, что SNS доставляет сообщения «как минимум один раз» („at-least-once delivery“). Что это значит? Что рано или поздно ваш код на Lambda будет вызван дважды. И это действительно случается. Существует целый ряд сценариев, когда ваша функция должна быть идемпотентной: например когда со счёта снимаются деньги, мы можем ожидать, что кто-то снимет одну и ту же сумму дважды, но мы должны убедиться, что это действительно 2 независимых друг от друга раза – иначе говоря, это 2 разные транзакции, а не повтор одной.

Я же для разнообразия приведу другой пример – ограничение частоты запросов к API (“rate limiting”). Наша Lambda принимает событие с неким user_id для которого должна быть сделана проверка, не исчерпал ли пользователь с таким ID своё кол-во возможных запросов к некой нашей API. Мы могли бы хранить в DynamoDB от AWS значение совершённых вызовов, и увеличивать его с каждым вызовов нашей функции на 1.

Но что делать, если эта Lambda-функция будет вызвана одним и тем же событием дважды? Кстати, вы обратили внимание на аргументы функции lambda_handler(). Второй аргумент, context в AWS Lambda даётся по умолчанию, и он содержит разные метаданные, в том числе – request_id , который генерируется для каждого уникального вызова. Это значит, что теперь, вместо того, чтобы хранить в таблице число совершённых вызовов, мы можем хранить список request_id и при каждом вызове наша Lambda будет проверять, был ли данный запрос уже обработан:


import json 
import os 
from typing import Any, Dict 
 
from aws_lambda_powertools.utilities.typing import LambdaContext  # нужно только для аннотации типа аргумента 
import boto3 
 
limit = os.getenv('LIMIT') 
 
def handler_name(event: Dict[str: Any], context: LambdaContext): 
 
	request_id = context.aws_request_id 
 
	# Находим user_id во входящем событии 
	user_id = event["user_id"] 
 
	# Наша таблица на DynamoDB 
	table = boto3.resource('dynamodb').Table('my_table') 
 
	# Делаем update 
	table.update_item( 
    	Key={'pkey': user_id}, 
    	UpdateExpression='ADD requests :request_id', 
    	ConditionExpression='attribute_not_exists (requests) OR (size(requests) < :limit AND NOT contains(requests, :request_id))', 
    	ExpressionAttributeValues={ 
        	':request_id': {'S': request_id}, 
        	':requests': {'SS': [request_id]}, 
        	':limit': {'N': limit} 
    	} 
	) 
 
	# TODO: написать дальнейшую логику 
 
	return { 
    	"statusCode": 200, 
    	"headers": { 
        	"Content-Type": "application/json" 
    	}, 
    	"body": json.dumps({ 
        	"status ": "success" 
    	}) 
	}

Поскольку мой пример фактически взят из интернета, то я оставлю ссылку на первоисточник, тем более, что он даёт чуть больше информации.

Помните, выше я уже упоминал, что что-то наподобие уникального ID транзакции можно использовать для блокировки общих данных? Теперь мы узнали, что его можно использовать и для обеспечения идемпотентности операций. Давайте же узнаем, какими способами можно самим генерировать такие ID.