Java NIO предоставляет ByteBuffer
, но многие библиотеки надстраивают свои собственные API-интерфейсы байтовых буферов поверх, особенно для сетевых операций, где повторное использование буферов и/или использование прямых буферов выгодно для производительности. Например, Netty имеет иерархию ByteBuf
, Undertow использует XNIO, Jetty использует объединенные байтовые буферы с обратным вызовом для освобождения буфера, и так далее. Модуль spring-core
предоставляет набор абстракций для работы с различными API-интерфейсами байтовых буферов в таком виде:
-
DataBufferFactory
абстрагирует создание буфера данных. -
DataBuffer
представляет собой байтовый буфер, который может быть объединен в пул. -
DataBufferUtils
предоставляет служебные (вспомогательные/утилитные) методы для буферов данных. -
<Кодеки декодируют или кодируют потоки буфера данных в объекты более высокого уровня.
DataBufferFactory
DataBufferFactory
используется для создания буферов данных одним из двух способов:
-
Выделение нового буфера данных с предварительным опциональным указанием объема, если он известен, что более эффективно, даже при том, что реализация
DataBuffer
может увеличиваться и уменьшаться в объеме по требованию. -
Оборачивание существующего
byte[]
илиjava.nio.ByteBuffer
, который декорирует заданные данные реализациейDataBuffer
и не включает выделение.
Обратите внимание, что приложения WebFlux не создают DataBufferFactory
напрямую, а обращаются к нему через ServerHttpResponse
или ClientHttpRequest
на стороне клиента. Тип фабрики зависит от базового клиента или сервера, например, NettyDataBufferFactory
для Reactor Netty, DefaultDataBufferFactory
для других.
DataBuffer
Интерфейс DataBuffer
предлагает операции, схожие с java.nio.ByteBuffer
, но также привносит несколько дополнительных преимуществ, некоторые из которых вдохновлены ByteBuf
из фреймворка Netty. Ниже приведен неполный перечень преимуществ:
-
Чтение и запись с независимыми позициями, т.е. не требующие вызова
flip()
для чередования чтения и записи. -
Объем расширяется по требованию, как и в случае с
java.lang.StringBuilder
. -
Объединение в пул буферов и подсчет ссылок через
PooledDataBuffer
. -
Просмотр буфера как
java.nio.ByteBuffer
,InputStream
илиOutputStream
. -
Определение последнего индекса для отдельно взятого байта.
PooledDataBuffer
Как разъяснено в Javadoc по ByteBuffer, байтовые буферы могут быть прямыми или непрямыми. Прямые буферы могут находиться вне кучи Java, что устраняет необходимость копирования для операций ввода-вывода. Это делает прямые буферы особенно полезными для получения и отправки данных через сокет, но они также более затратные с точки зрения ресурсов при создании и освобождении, что приводит к идее объединения буферов в пул.
PooledDataBuffer
– это расширение DataBuffer
, которое помогает подсчитывать ссылки, что является необходимостью в случае объединения байтовых буферов в пул. Как это работает? Когда выделяется PooledDataBuffer,
количество ссылок равно 1. Обращения к retain()
увеличивают счетчик, а обращения к функции release()
уменьшают его. Пока счетчик больше 0, буфер гарантированно не будет освобожден. Если счетчик уменьшится до 0, буфер, помещенный в пул, можно освободить, что на практике может означать, что зарезервированная память для буфера возвращается в пул памяти.
Обратите внимание, что вместо того, чтобы работать с PooledDataBuffer
напрямую, в большинстве случаев лучше использовать вспомогательные методы в DataBufferUtils
, которые применяют алгоритмы освобождения буфера или удержания данных в буфере к DataBuffer
, только если он является экземпляром PooledDataBuffer
.
DataBufferUtils
DataBufferUtils
предлагает ряд служебных методов для работы с буферами данных:
-
Объединение потока буферов данных в один буфер с помощью нулевого копирования (zero copy), например, через составные буферы, если их поддерживает базовым API-интерфейс байтовых буферов.
-
Превращение
InputStream
или NIOChannel
вFlux<DataBuffer>
, и наоборотPublisher<DataBuffer>
вOutputStream
или NIOChannel
. -
Методы для освобождения или удержания данных в
DataBuffer
, если буфер является экземпляромPooledDataBuffer
. -
Возможность пропуска или принятия данных из потока байтов до достижения определенного количества байтов.
Кодеки
Пакет org.springframework.core.codec
предоставляет следующие интерфейсы стратегий:
-
Encoder
для кодированияPublisher<T>
в поток буферов данных. -
Decoder
для декодированияPublisher<DataBuffer>
в поток объектов более высокого уровня.
Модуль spring-core
предоставляет реализации кодера и декодера byte[]
, ByteBuffer
, DataBuffer
, Resource
и String
. Модуль spring-web
добавляет Jackson JSON, Jackson Smile, JAXB2, Protocol Buffers и другие кодеры и декодеры.
Использование DataBuffer
При работе с буферами данных необходимо уделять особое внимание тому, чтобы буферы освобождались, поскольку они могут быть объединены в пул. Мы будем использовать кодеки, чтобы проиллюстрировать, как это работает, но эти концепции применимы и в более общем смысле. Давайте взглянем, как кодеки управляют буферами данных внутри.
Decoder
считывает входные буферы данных последним, до создания объектов более высокого уровня, и поэтому он должен освобождать их следующим образом:
-
Если
Decoder
просто считывает каждый входной буфер и готов немедленно освободить его, он может сделать это черезDataBufferUtils.release(dataBuffer)
. -
Если
Decoder
использует операторыFlux
илиMono
, такие какflatMap
,reduce
и другие, которые предварительно осуществляют выборку и кэшируют элементы данных внутри, или использует операторы, такие какfilter
,skip
и другие, которые отбрасывают элементы, то в композиционный ряд необходимо добавитьdoOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
, чтобы гарантированно освобождать такие буферы перед тем, как они будут пропущены, вероятно, в следствие ошибки или поступления сигнала отмены. -
Если
Decoder
удерживает данные в одном нескольких буферах данных каким-либо иным способом, он должен обеспечить их освобождение после полного прочтения, а также в случае возникновения ошибки или поступления сигналов отмены до того, как буферы с кэшированными данными будут прочитаны и освобождены.
Обратите внимание, что DataBufferUtils#join
предлагает безопасный и эффективный способ объединения потока буферов данных в один буфер данных. Аналогично, skipUntilByteCount
и takeUntilByteCount
являются дополнительными безопасными методами для использования декодерами.
Encoder
выделяет буферы данных, которые другие должны прочитать (и освободить). Таким образом, для Encoder
остается не так много работы. Однако Encoder
должен способствовать тому, чтобы освободить буфер данных, если при заполнении буфера данными произошла ошибка сериализации. Например:
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// сериализуем и заполняем буфер...
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// сериализуем и заполняем буфер...
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Потребитель Encoder
отвечает за освобождение буферов данных, которые он получает. В приложении WebFlux результат работы Encoder
используется для записи в ответ HTTP-сервера или HTTP-запрос клиента, и в этом случае освобождение буферов данных лежит на коде, записывающего в ответ сервера или запрос клиента.
Обратите внимание, что при работе на Netty существуют опции отладки для устранения утечек буфера.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ