RSocketRequester
предоставляет свободный API для выполнения запросов RSocket, принимая и возвращая объекты для данных и метаданных вместо буферов данных низкого уровня. Он может использоваться симметрично, для выполнения запросов от клиентов и для выполнения запросов от серверов.
Реквестер на стороне клиента
Для получения RSocketRequester
на стороне клиента необходимо подключиться к серверу, что предполагает отправку фрейма SETUP
по протоколу RSocket с настройками соединения. RSocketRequester
предусматривает средство сборки, которое помогает подготовить io.rsocket.core.RSocketConnector
, включая настройки соединения для фрейма SETUP
.
Это самый простой способ подключения с настройками по умолчанию:
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
При вышеизложенном соединение устанавливается не сразу. При поступлении запросов прозрачно устанавливается и используется общее соединение.
Настройка подключения
RSocketRequester.Builder
предусматривает следующее для настройки начального фрейма SETUP
:
-
dataMimeType(MimeType)
– устанавливает MIME-тип данных для соединения. -
metadataMimeType(MimeType)
– устанавливает MIME-тип метаданных для соединения. -
setupData(Object)
– данные для включения вSETUP
. -
setupRoute(String, Object…)
– маршрут в метаданных для включения вSETUP
. -
setupMetadata(Object, MimeType)
– другие метаданные для включения вSETUP
.
Для данных MIME-тип по умолчанию определяется первым сконфигурированным Decoder
. Для метаданных MIME-тип по умолчанию – составные метаданные, что позволяет использовать несколько пар значений метаданных и MIME-типов для каждого запроса. Как правило, оба не требуют замены.
Данные и метаданные в фрейме SETUP
являются необязательными. На стороне сервера методы, помеченные аннотацией @ConnectMapping, могут использоваться для обработки начала соединения и содержимого фрейма SETUP
. Метаданные могут использоваться для обеспечения безопасности на уровне соединения.
Стратегии
RSocketRequester.Builder
принимает RSocketStrategies
для конфигурирования реквестера. Вам нужно будет использовать его, чтобы передавать фреймы и декодеры для (де)-сериализации данных и значений метаданных. По умолчанию зарегистрированытолько базовые кодеки из spring-core
для String
, byte[]
и ByteBuffer
. Добавление spring-web
обеспечивает доступ к большему количеству функций, которые можно зарегистрировать следующим образом:
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
предназначен для повторного использования. В некоторых сценариях, например, когда клиент и сервер находятся в одном приложении, может быть предпочтительнее объявить это в конфигурации Spring.
Респондеры на стороне клиента
RSocketRequester.Builder
можно использовать для настройки респондеров, реагирующих на запросы от сервера.
Можно использовать аннотированные обработчики для реагирования на стороне клиента, основанные на той же инфраструктуре, что и серверная часть, но зарегистрированные программно следующим образом:
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))
.tcp("localhost", 7000);
- Используем
PathPatternRouteMatcher
, если имеетсяspring-web
, для более эффективного подбора маршрута. - Создаем респондер из класса с методами, помеченными аннотацией
@MessageMapping
и/или@ConnectMapping
. - Регистрируем респондер.
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher())
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) }
.tcp("localhost", 7000)
- Используем
PathPatternRouteMatcher
, если имеетсяspring-web
, для более эффективного подбора маршрута. - Создаем респондер из класса с методами, помеченными аннотацией
@MessageMapping
и/или@ConnectMapping
. - Регистрируем респондер.
Обратите внимание, что приведенное выше сокращение предназначено исключительно для программной регистрации клиентских респондеров. В альтернативных сценариях, если клиентские респондеры находятся в конфигурации Spring, можно объявить RSocketMessageHandler
как бин Spring и затем применять следующим образом:
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
Для вышеописанного также может понадобиться использовать setHandlerPredicate
в RSocketMessageHandler
, чтобы переключиться на другую стратегию обнаружения клиентских респондеров, например, на основе пользовательской аннотации, такой как @RSocketClientResponder
, а не стандартной @Controller
. Это необходимо в сценариях с клиентом и сервером или несколькими клиентами в одном приложении.
Расширенный режим
RSocketRequesterBuilder
предусматривает возможность совершения обратного вызова, открывающего базовый io.rsocket.core.RSocketConnector
для получения доступа к дополнительным параметрам конфигурации интервалов поддержания в активном состоянии (keepalive), возобновления сессии, перехватчиков и т. д. Вы можете настроить параметры на этом уровне следующим образом:
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
Реквестер на стороне сервера
Для осуществления запросов от сервера к подключенным клиентам необходимо получить от сервера реквестер для подключенного клиента.
В аннотированных респондерах методы с аннотациями @ConnectMapping
и @MessageMapping
поддерживают аргумент RSocketRequester
. Используйте его для получения доступа к конкретному реквестеру для конкретного соединения. Помните, что методы, помеченные аннотацией @ConnectMapping
, по сути являются обработчиками фрейма SETUP
, который должен быть обработан до отправки запросов. Поэтому запросы в самом начале необходимо отделить от процедуры обработки. Например:
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> {
// ...
});
return ...
}
- Запускаем запрос асинхронно, независимо от процедуры обработки.
- Выполняем обработку и возвращаем завершение
Mono<Void>
.
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect {
// ...
}
}
/// ... (2)
}
- Запускаем запрос асинхронно, независимо от процедуры обработки.
- Perform handling in the suspending function.
Запросы
Сразу после получения реквестера для client или server, запросы можно отправлять следующим образом:
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlux(AirportLocation.class);
- Задаем маршрут для включения в метаданные сообщения-запроса.
- Указываем данные для сообщения-запроса.
- Объявляем ожидаемый ответ.
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlow<AirportLocation>()
- Задаем маршрут для включения в метаданные сообщения-запроса.
- Указываем данные для сообщения-запроса.
- Объявляем ожидаемый ответ.
Тип взаимодействия определяется неявно по мощности множества (cardinality) ввода и вывода. Приведенный выше пример является Request-Stream
, поскольку отправляется одно значение, а принимается поток значений. По большей части об этом думать не требуется, пока выбор ввода и вывода соответствует типу взаимодействия RSocket и типам ввода и вывода, ожидаемым респондером. Единственным примером недопустимой комбинации является комбинация "многие к одному".
Метод data(Object)
также принимает любой Publisher
из спецификации Reactive Streams, включая Flux
и Mono
, а также любой другой генератор значений, зарегистрированный в ReactiveAdapterRegistry
. В случае многозначного Publisher
, такого как Flux
, который генерирует одинаковые типы значений, рассмотрите возможность использования одного из перегруженных методов data
, чтобы избежать проверки типов и поиска Encoder
для каждого элемента:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
Этап с data(Object)
является необязательным. Пропускаем его в случае запросов, которые не отправляют данные:
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
Дополнительные значения метаданных можно добавить, если используются составные метаданные (по умолчанию) и если значения поддерживаются зарегистрированным Encoder
. Например:
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
Для Fire-and-Forget
используйте метод send()
, который возвращает Mono<Void>
. Обратите внимание, что Mono
указывает только на то, что сообщение было успешно отправлено, а не на то, что оно было обработано.
Для Metadata-Push
используйте метод sendMetadata()
с возвращаемым значением Mono<Void>
.