7.1 Зачем это надо

Мы подробно обсудили все свойства ACID, их назначение и сценарии использования. Не все базы данных предоставляют гарантии ACID, принося в жертву их для получения большей производительности. Поэтому возможно, что для вашего проекта будет выбрана БД, не предоставляющая ACID, и вам придется реализовать часть функционала ACID на стороне приложения. Если же ваша система будет разработана в виде микросервисов или другого распределенного приложения, то то, что было бы обычной локальной транзакцией в одном сервисе, станет распределенной транзакцией и потеряет свою ACID-природу, даже если БД каждого микросервиса будет ACID.

Я не хочу давать подробное руководство по созданию менеджера транзакций, потому что это слишком сложная и объёмная тема. Если же не речь идёт о распределённых приложениях, то нет смысла пытаться полностью реализовать ACID на стороне приложения. Если вам нужны гарантии ACID, то лучше воспользоваться готовым решением, например, БД с ACID.

Но я хотел бы показать вам некоторые техники, которые помогут вам при выполнении транзакций в приложении. В конце концов, знание этих техник может пригодиться в различных ситуациях, даже не обязательно связанных с транзакциями, и сделает вас лучшими разработчиками (надеюсь на это).

7.2 Базовый инструментарий для любителей транзакций

Оптимистичная и пессимистичная блокировка - это два разных способа ограничения доступа к некоторым данным, когда доступ к ним претендуют несколько пользователей одновременно.

Оптимист

Полагает, что вероятность одновременного доступа невелика. Поэтому он читает нужную строку, запоминает номер её версии (или timestamp, или checksum / hash), и прежде чем записать изменения в БД, проверяет, изменилась ли версия этих данных. Если да, то нужно решить конфликт и обновить данные commit, либо откатить транзакцию rollback. Недостаток этого метода в том, что он создает благоприятные условия для бага с длинным названием time-of-check to time-of-use (TOCTOU). Состояние между проверкой и записью может измениться. Я не имею опыта использования оптимистичной блокировки. «Википедия» предлагает использовать exception handling вместо проверки. Это мало говорит мне в контексте баз данных, если честно.

В качестве примера рассмотрим технологию из повседневной жизни разработчика, использующую оптимистичную блокировку - протокол HTTP. Ответ на изначальный запрос GET может содержать заголовок ETag для последующих запросов PUT со стороны клиента. Этот заголовок можно использовать в заголовке If-Match. Если для методов GET и HEAD ресурс соответствует одному из знакомых серверу ETag, то он отправит его обратно. Для PUT и других небезопасных методов ресурс будет загружаться только в этом случае. Например, библиотека "feedparser" помогает парсить RSS и другие feeds с использованием ETag.


import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.feed.synd.SyndFeedImpl;
import com.sun.syndication.io.SyndFeedInput;
import com.sun.syndication.io.SyndFeedOutput;

import java.io.IOException;
import java.net.URL;

public class FeedParserExample {

    public static void main(String[] args) throws IOException {
        // Parse the Atom feed.
        SyndFeedInput input = new SyndFeedInput();
        SyndFeed feed = input.read(new URL("http://feedparser.org/docs/examples/atom10.xml"));

        // Get the ETag value of the feed.
        String etag = feed.getEtag();

        // Parse the Atom feed again, but this time set the ETag value.
        SyndFeed feed2 = input.read(new URL("http://feedparser.org/docs/examples/atom10.xml"), etag);

        // Check if the feed has changed.
        if (feed2.getFeed() == null) {
            System.out.println("The feed has not changed since you last checked, so the server sent no data. This is a feature, not a bug!");
        } else {
            System.out.println("The feed has changed.");
        }
    }
}

Пессимистичная блокировка предполагает, что транзакции часто будут встречаться на одних и тех же данных и для того, чтобы избежать лишних race conditions, необходимо блокировать необходимые данные. Для этого можно поддерживать соединение с базой данных для сессии или использовать ID транзакции, который может быть использован независимо от соединения. Недостаток пессимистичной блокировки в том, что она замедляет обработку транзакций, но обеспечивает надёжную изоляцию данных.

Дополнительная опасность заключается в возможной взаимной блокировке deadlock, когда несколько процессов ожидают ресурсы, заблокированные друг другом. Например, для проведения транзакции нужны ресурсы А и Б. Процесс 1 занял ресурс А, а процесс 2 – ресурс Б. Процессы не могут продолжить выполнение. Существуют различные способы решения этой проблемы. Например, можно создать иерархию блокировок. Чтобы познакомиться подробнее с этой концепцией, можно почитать о «Задаче об обедающих философах» («dining philosophers problem»).

Вот тут есть хороший пример того, как поведут себя обе блокировки в одном и том же сценарии.

Касательно реализаций блокировок, для распределённых систем существуют менеджеры блокировок, такие как ZooKeeper, Redis, etcd и Consul.

7.3 Идемпотентность операций

Идемпотентность кода – это хорошая практика. Она позволяет разработчику получать один и тот же результат при повторном применении операции к объекту, независимо от того, использует ли он транзакции или нет. Данные в БД и третьих системах могут влиять на результат, но всё, что зависит, должно быть предсказуемым.

Проявления идемпотентности могут быть разными. Одно из них — это рекомендация по написанию кода. Вы помните, что лучшая функция — это та, которая делает одну вещь? А также хорошо бы написать для нее unit-тесты? Если вы придерживаетесь этих двух правил, то увеличиваете шансы на идемпотентность функций. Чтобы не было путаницы, отмечу, что идемпотентные функции — это не обязательно «чистые» (в смысле «function purity»).

Чистые функции — это те, которые оперируют только данными, полученными на входе, не изменяя их и возвращая обработанный результат. Такие функции позволяют масштабировать приложение с помощью функционального программирования. Однако, так как мы говорим о общих данных и БД, наши функции вряд ли будут чистыми, поскольку они будут изменять состояние БД или программ (сервисов).

Вот это — чистая функция:


public static int square(int num) {
    return num * num;
  }

А эта функция не является чистой, но она идемпотентна (прошу не делать выводы о том, как я пишу код, по этим кускам):


  public static boolean insertData(String insertQuery, DbConnection dbConnection) {
    dbConnection.execute(insertQuery);
    return true;
  }

Вместо множества слов, я могу просто рассказать о том, как я вынужденно научился писать идемпотентные программы. Работаю с AWS, а именно с сервисом AWS Lambda. Он позволяет загружать код, который будет автоматически запускаться по расписанию или в ответ на какие-то события. Например, при поступлении сообщений от брокера сообщений (message broker). В AWS таким брокером является AWS SNS. Он отправляет сообщения по каналам topics. Микросервисы, подписанные на эти каналы, получают сообщения и реагируют на них.

Проблема заключается в том, что SNS обеспечивает доставку сообщений «как минимум один раз» («at-least-once delivery»). Что это означает? Ваша функция Lambda будет вызвана дважды. Это действительно происходит. Существует ряд ситуаций, в которых ваша функция должна быть идемпотентной: например, при снятии денег со счета мы можем ожидать, что кто-то снимет одну и ту же сумму дважды, но мы должны быть уверены, что это действительно два независимых друг от друга раза – то есть две разные транзакции, а не повтор одной.

Я приведу другой пример – ограничение частоты запросов к API («rate limiting»). Наша Lambda принимает событие с неким user_id, для которого нужно проверить, исчерпал ли пользователь с таким ID количество возможных запросов к нашей API. Мы можем хранить в DynamoDB от AWS значение совершённых вызовов и увеличивать его на 1 при каждом вызове нашей функции.

Но что делать, если эта Lambda-функция будет вызвана одним и тем же событием дважды? Кстати, вы обратили внимание на аргументы функции lambda_handler(). Второй аргумент, context в AWS Lambda даётся по умолчанию и содержит разные метаданные, в том числе request_id, который генерируется для каждого уникального вызова. Это значит, что теперь, вместо того, чтобы хранить в таблице число совершённых вызовов, мы можем хранить список request_id и при каждом вызове наша Lambda будет проверять, был ли данный запрос уже обработан.


import java.util.HashMap;
import java.util.Map;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

public class Handler implements RequestHandler<Map<String, Object>, Map<String, Object> {

    private static final AmazonDynamoDB DYNAMODB = AmazonDynamoDBClientBuilder.defaultClient();
    private static final String TABLE_NAME = "my_table";
    private static final String LIMIT = System.getenv("LIMIT");

    @Override
    public Map<String, Object> handleRequest(Map<String, Object> event, Context context) {
        String requestId = context.getAwsRequestId();
        String userId = (String) event.get("user_id");

        UpdateItemRequest updateItemRequest = new UpdateItemRequest()
                .withTableName(TABLE_NAME)
                .withKey(new AttributeValue().withS(userId))
                .withUpdateExpression("ADD requests :request_id")
                .withConditionExpression("attribute_not_exists (requests) OR (size(requests) < :limit AND NOT contains(requests, :request_id))")
                .withExpressionAttributeValues(new HashMap<>() {{
                    put(":request_id", new AttributeValue().withS(requestId));
                    put(":requests", new AttributeValue().withSS(new String[]{requestId}));
                    put(":limit", new AttributeValue().withN(LIMIT));
                }});

        DYNAMODB.updateItem(updateItemRequest);

        // TODO: write further logic

        Map<String, Object> response = new HashMap<>();
        response.put("statusCode", 200);
        response.put("headers", new HashMap<>() {{
            put("Content-Type", "application/json");
        }});
        response.put("body", "{\"status\": \"success\"}");

        return response;
    }
}

Поскольку этот пример был взят из интернета, я предоставлю ссылку на источник, который даёт дополнительную информацию.

Ранее мы упоминали, что можно использовать уникальный ID транзакции для блокировки общих данных. Теперь мы знаем, что такой ID можно использовать для обеспечения идемпотентности операций. Давайте рассмотрим, какими способами можно самостоятельно генерировать такие ID.