RSocketRequester предоставляет свободный API для выполнения запросов RSocket, принимая и возвращая объекты для данных и метаданных вместо буферов данных низкого уровня. Он может использоваться симметрично, для выполнения запросов от клиентов и для выполнения запросов от серверов.

Реквестер на стороне клиента

Для получения RSocketRequester на стороне клиента необходимо подключиться к серверу, что предполагает отправку фрейма SETUP по протоколу RSocket с настройками соединения. RSocketRequester предусматривает средство сборки, которое помогает подготовить io.rsocket.core.RSocketConnector, включая настройки соединения для фрейма SETUP.

Это самый простой способ подключения с настройками по умолчанию:

Java
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

При вышеизложенном соединение устанавливается не сразу. При поступлении запросов прозрачно устанавливается и используется общее соединение.

Настройка подключения

RSocketRequester.Builder предусматривает следующее для настройки начального фрейма SETUP:

  • dataMimeType(MimeType) – устанавливает MIME-тип данных для соединения.

  • metadataMimeType(MimeType) – устанавливает MIME-тип метаданных для соединения.

  • setupData(Object)– данные для включения в SETUP.

  • setupRoute(String, Object…) – маршрут в метаданных для включения в SETUP.

  • setupMetadata(Object, MimeType)– другие метаданные для включения в SETUP.

Для данных MIME-тип по умолчанию определяется первым сконфигурированным Decoder. Для метаданных MIME-тип по умолчанию – составные метаданные, что позволяет использовать несколько пар значений метаданных и MIME-типов для каждого запроса. Как правило, оба не требуют замены.

Данные и метаданные в фрейме SETUP являются необязательными. На стороне сервера методы, помеченные аннотацией @ConnectMapping, могут использоваться для обработки начала соединения и содержимого фрейма SETUP. Метаданные могут использоваться для обеспечения безопасности на уровне соединения.

Стратегии

RSocketRequester.Builder принимает RSocketStrategies для конфигурирования реквестера. Вам нужно будет использовать его, чтобы передавать фреймы и декодеры для (де)-сериализации данных и значений метаданных. По умолчанию зарегистрированытолько базовые кодеки из spring-core для String, byte[] и ByteBuffer. Добавление spring-web обеспечивает доступ к большему количеству функций, которые можно зарегистрировать следующим образом:

Java
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)

RSocketStrategies предназначен для повторного использования. В некоторых сценариях, например, когда клиент и сервер находятся в одном приложении, может быть предпочтительнее объявить это в конфигурации Spring.

Респондеры на стороне клиента

RSocketRequester.Builder можно использовать для настройки респондеров, реагирующих на запросы от сервера.

Можно использовать аннотированные обработчики для реагирования на стороне клиента, основанные на той же инфраструктуре, что и серверная часть, но зарегистрированные программно следующим образом:

Java
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))
.tcp("localhost", 7000);
  1. Используем PathPatternRouteMatcher, если имеется spring-web, для более эффективного подбора маршрута.
  2. Создаем респондер из класса с методами, помеченными аннотацией @MessageMapping и/или @ConnectMapping.
  3. Регистрируем респондер.
Kotlin
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher())
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) }
.tcp("localhost", 7000)
  1. Используем PathPatternRouteMatcher, если имеется spring-web, для более эффективного подбора маршрута.
  2. Создаем респондер из класса с методами, помеченными аннотацией @MessageMapping и/или @ConnectMapping.
  3. Регистрируем респондер.

Обратите внимание, что приведенное выше сокращение предназначено исключительно для программной регистрации клиентских респондеров. В альтернативных сценариях, если клиентские респондеры находятся в конфигурации Spring, можно объявить RSocketMessageHandler как бин Spring и затем применять следующим образом:

Java
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
Kotlin
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)

Для вышеописанного также может понадобиться использовать setHandlerPredicate в RSocketMessageHandler, чтобы переключиться на другую стратегию обнаружения клиентских респондеров, например, на основе пользовательской аннотации, такой как @RSocketClientResponder, а не стандартной @Controller. Это необходимо в сценариях с клиентом и сервером или несколькими клиентами в одном приложении.

Расширенный режим

RSocketRequesterBuilder предусматривает возможность совершения обратного вызова, открывающего базовый io.rsocket.core.RSocketConnector для получения доступа к дополнительным параметрам конфигурации интервалов поддержания в активном состоянии (keepalive), возобновления сессии, перехватчиков и т. д. Вы можете настроить параметры на этом уровне следующим образом:

Java
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
Kotlin
val requester = RSocketRequester.builder()
.rsocketConnector {
    //...
}
.tcp("localhost", 7000)

Реквестер на стороне сервера

Для осуществления запросов от сервера к подключенным клиентам необходимо получить от сервера реквестер для подключенного клиента.

В аннотированных респондерах методы с аннотациями @ConnectMapping и @MessageMapping поддерживают аргумент RSocketRequester. Используйте его для получения доступа к конкретному реквестеру для конкретного соединения. Помните, что методы, помеченные аннотацией @ConnectMapping, по сути являются обработчиками фрейма SETUP, который должен быть обработан до отправки запросов. Поэтому запросы в самом начале необходимо отделить от процедуры обработки. Например:

Java
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> {
    // ...
});
return ...
}
  1. Запускаем запрос асинхронно, независимо от процедуры обработки.
  2. Выполняем обработку и возвращаем завершение Mono<Void>.
Kotlin
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect {
    // ...
}
}
/// ... (2)
}
  1. Запускаем запрос асинхронно, независимо от процедуры обработки.
  2. Perform handling in the suspending function.

Запросы

Сразу после получения реквестера для client или server, запросы можно отправлять следующим образом:

Java
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlux(AirportLocation.class);
  1. Задаем маршрут для включения в метаданные сообщения-запроса.
  2. Указываем данные для сообщения-запроса.
  3. Объявляем ожидаемый ответ.
Kotlin
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlow<AirportLocation>()
  1. Задаем маршрут для включения в метаданные сообщения-запроса.
  2. Указываем данные для сообщения-запроса.
  3. Объявляем ожидаемый ответ.

Тип взаимодействия определяется неявно по мощности множества (cardinality) ввода и вывода. Приведенный выше пример является Request-Stream, поскольку отправляется одно значение, а принимается поток значений. По большей части об этом думать не требуется, пока выбор ввода и вывода соответствует типу взаимодействия RSocket и типам ввода и вывода, ожидаемым респондером. Единственным примером недопустимой комбинации является комбинация "многие к одному".

Метод data(Object) также принимает любой Publisher из спецификации Reactive Streams, включая Flux и Mono, а также любой другой генератор значений, зарегистрированный в ReactiveAdapterRegistry. В случае многозначного Publisher, такого как Flux, который генерирует одинаковые типы значений, рассмотрите возможность использования одного из перегруженных методов data, чтобы избежать проверки типов и поиска Encoder для каждого элемента:

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

Этап с data(Object) является необязательным. Пропускаем его в случае запросов, которые не отправляют данные:

Java
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()

Дополнительные значения метаданных можно добавить, если используются составные метаданные (по умолчанию) и если значения поддерживаются зарегистрированным Encoder. Например:

Java
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()

Для Fire-and-Forget используйте метод send(), который возвращает Mono<Void>. Обратите внимание, что Mono указывает только на то, что сообщение было успешно отправлено, а не на то, что оно было обработано.

Для Metadata-Push используйте метод sendMetadata() с возвращаемым значением Mono<Void>.