Java NIO предоставляет ByteBuffer, но многие библиотеки надстраивают свои собственные API-интерфейсы байтовых буферов поверх, особенно для сетевых операций, где повторное использование буферов и/или использование прямых буферов выгодно для производительности. Например, Netty имеет иерархию ByteBuf, Undertow использует XNIO, Jetty использует объединенные байтовые буферы с обратным вызовом для освобождения буфера, и так далее. Модуль spring-core предоставляет набор абстракций для работы с различными API-интерфейсами байтовых буферов в таком виде:

  • DataBufferFactory абстрагирует создание буфера данных.

  • DataBuffer представляет собой байтовый буфер, который может быть объединен в пул.

  • DataBufferUtils предоставляет служебные (вспомогательные/утилитные) методы для буферов данных.

  • <Кодеки декодируют или кодируют потоки буфера данных в объекты более высокого уровня.

DataBufferFactory

DataBufferFactory используется для создания буферов данных одним из двух способов:

  1. Выделение нового буфера данных с предварительным опциональным указанием объема, если он известен, что более эффективно, даже при том, что реализация DataBuffer может увеличиваться и уменьшаться в объеме по требованию.

  2. Оборачивание существующего byte[] или java.nio.ByteBuffer, который декорирует заданные данные реализацией DataBuffer и не включает выделение.

Обратите внимание, что приложения WebFlux не создают DataBufferFactory напрямую, а обращаются к нему через ServerHttpResponse или ClientHttpRequest на стороне клиента. Тип фабрики зависит от базового клиента или сервера, например, NettyDataBufferFactory для Reactor Netty, DefaultDataBufferFactory для других.

DataBuffer

Интерфейс DataBuffer предлагает операции, схожие с java.nio.ByteBuffer, но также привносит несколько дополнительных преимуществ, некоторые из которых вдохновлены ByteBuf из фреймворка Netty. Ниже приведен неполный перечень преимуществ:

  • Чтение и запись с независимыми позициями, т.е. не требующие вызова flip() для чередования чтения и записи.

  • Объем расширяется по требованию, как и в случае с java.lang.StringBuilder.

  • Объединение в пул буферов и подсчет ссылок через PooledDataBuffer.

  • Просмотр буфера как java.nio.ByteBuffer, InputStream или OutputStream.

  • Определение последнего индекса для отдельно взятого байта.

PooledDataBuffer

Как разъяснено в Javadoc по ByteBuffer, байтовые буферы могут быть прямыми или непрямыми. Прямые буферы могут находиться вне кучи Java, что устраняет необходимость копирования для операций ввода-вывода. Это делает прямые буферы особенно полезными для получения и отправки данных через сокет, но они также более затратные с точки зрения ресурсов при создании и освобождении, что приводит к идее объединения буферов в пул.

PooledDataBuffer – это расширение DataBuffer, которое помогает подсчитывать ссылки, что является необходимостью в случае объединения байтовых буферов в пул. Как это работает? Когда выделяется PooledDataBuffer, количество ссылок равно 1. Обращения к retain() увеличивают счетчик, а обращения к функции release() уменьшают его. Пока счетчик больше 0, буфер гарантированно не будет освобожден. Если счетчик уменьшится до 0, буфер, помещенный в пул, можно освободить, что на практике может означать, что зарезервированная память для буфера возвращается в пул памяти.

Обратите внимание, что вместо того, чтобы работать с PooledDataBuffer напрямую, в большинстве случаев лучше использовать вспомогательные методы в DataBufferUtils, которые применяют алгоритмы освобождения буфера или удержания данных в буфере к DataBuffer, только если он является экземпляром PooledDataBuffer.

DataBufferUtils

DataBufferUtils предлагает ряд служебных методов для работы с буферами данных:

  • Объединение потока буферов данных в один буфер с помощью нулевого копирования (zero copy), например, через составные буферы, если их поддерживает базовым API-интерфейс байтовых буферов.

  • Превращение InputStream или NIO Channel в Flux<DataBuffer>, и наоборот Publisher<DataBuffer> в OutputStream или NIO Channel.

  • Методы для освобождения или удержания данных в DataBuffer, если буфер является экземпляром PooledDataBuffer.

  • Возможность пропуска или принятия данных из потока байтов до достижения определенного количества байтов.

Кодеки

Пакет org.springframework.core.codec предоставляет следующие интерфейсы стратегий:

  • Encoder для кодирования Publisher<T> в поток буферов данных.

  • Decoder для декодирования Publisher<DataBuffer> в поток объектов более высокого уровня.

Модуль spring-core предоставляет реализации кодера и декодера byte[], ByteBuffer, DataBuffer, Resource и String. Модуль spring-web добавляет Jackson JSON, Jackson Smile, JAXB2, Protocol Buffers и другие кодеры и декодеры.

Использование DataBuffer

При работе с буферами данных необходимо уделять особое внимание тому, чтобы буферы освобождались, поскольку они могут быть объединены в пул. Мы будем использовать кодеки, чтобы проиллюстрировать, как это работает, но эти концепции применимы и в более общем смысле. Давайте взглянем, как кодеки управляют буферами данных внутри.

Decoder считывает входные буферы данных последним, до создания объектов более высокого уровня, и поэтому он должен освобождать их следующим образом:

  1. Если Decoder просто считывает каждый входной буфер и готов немедленно освободить его, он может сделать это через DataBufferUtils.release(dataBuffer).

  2. Если Decoder использует операторы Flux или Mono, такие как flatMap, reduce и другие, которые предварительно осуществляют выборку и кэшируют элементы данных внутри, или использует операторы, такие как filter, skip и другие, которые отбрасывают элементы, то в композиционный ряд необходимо добавить doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release), чтобы гарантированно освобождать такие буферы перед тем, как они будут пропущены, вероятно, в следствие ошибки или поступления сигнала отмены.

  3. Если Decoder удерживает данные в одном нескольких буферах данных каким-либо иным способом, он должен обеспечить их освобождение после полного прочтения, а также в случае возникновения ошибки или поступления сигналов отмены до того, как буферы с кэшированными данными будут прочитаны и освобождены.

Обратите внимание, что DataBufferUtils#join предлагает безопасный и эффективный способ объединения потока буферов данных в один буфер данных. Аналогично, skipUntilByteCount и takeUntilByteCount являются дополнительными безопасными методами для использования декодерами.

Encoder выделяет буферы данных, которые другие должны прочитать (и освободить). Таким образом, для Encoder остается не так много работы. Однако Encoder должен способствовать тому, чтобы освободить буфер данных, если при заполнении буфера данными произошла ошибка сериализации. Например:

Java
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
    // сериализуем и заполняем буфер...
    release = false;
}
finally {
    if (release) {
        DataBufferUtils.release(buffer);
    }
}
return buffer;
Kotlin
val buffer = factory.allocateBuffer()
var release = true
try {
    // сериализуем и заполняем буфер...
    release = false
} finally {
    if (release) {
        DataBufferUtils.release(buffer)
    }
}
return buffer

Потребитель Encoder отвечает за освобождение буферов данных, которые он получает. В приложении WebFlux результат работы Encoder используется для записи в ответ HTTP-сервера или HTTP-запрос клиента, и в этом случае освобождение буферов данных лежит на коде, записывающего в ответ сервера или запрос клиента.

Обратите внимание, что при работе на Netty существуют опции отладки для устранения утечек буфера.