RSocketRequester надає вільний API для виконання запитів RSocket, приймаючи та повертаючи об'єкти для даних та метаданих замість буферів даних низького рівня. Він може використовуватися симетрично для виконання запитів від клієнтів і для виконання запитів від серверів.

Реквестер на стороні клієнта

Для отримання RSocketRequester на стороні клієнта необхідно підключитися до сервера, що передбачає відправку кадру SETUP за протоколом RSocket з налаштуваннями з'єднання. RSocketRequester передбачає засіб збирання, який допомагає підготувати io.rsocket.core.RSocketConnector, включно з налаштуваннями з'єднання для кадру SETUP.

Це найпростіший спосіб підключення з налаштуваннями за замовчуванням:

Java

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

За умов, які описано вище, з'єднання встановлюється не відразу. Під час надходження запитів прозоро встановлюється та використовується спільне з'єднання.

Налаштування підключення

RSocketRequester.Builder передбачає наступне для налаштування початкового кадру SETUP :

  • dataMimeType(MimeType) — встановлює MIME-тип даних для з'єднання.

  • metadataMimeType(MimeType) — встановлює MIME-тип метаданих для з'єднання.

  • setupData(Object) — дані для включення до SETUP.

  • setupRoute(String, Object…) — маршрут метаданих для включення до SETUP.

  • setupMetadata(Object, MimeType) — інші метадані для включення до SETUP.

Для даних MIME-тип за замовчуванням визначається першим конфігурованим Decoder. Для метаданих MIME-тип за замовчуванням — складові метадані, що дозволяє використовувати кілька пар значень метаданих та MIME-типів для кожного запиту. Зазвичай обидва не потребують заміни.

Дані та метадані у кадрі SETUP є необов'язковими. На стороні сервера методи, позначені анотацією @ConnectMapping, можуть використовуватися для обробки початку з'єднання та вмісту кадру SETUP. Метадані можуть використовуватися для забезпечення безпеки на рівні з'єднання.

Стратегії

RSocketRequester.Builder приймає RSocketStrategies для конфігурування реквестера. Тобі потрібно буде використовувати його, щоб передавати фрейми та декодери для (де)-серіалізації даних та значень метаданих. За замовчуванням зареєстровано лише базові кодеки зі spring-core для String, byte[] та ByteBuffer. Додавання spring-web забезпечує доступ до більшої кількості функцій, які можна зареєструвати таким чином:

Java
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000) 

RSocketStrategies призначений для повторного використання. У деяких сценаріях, наприклад, коли клієнт і сервер знаходяться в одному додатку, може бути краще оголосити це в конфігурації Spring.

Респондери на стороні клієнта

RSocketRequester.Builder можна використовувати для налаштування респондерів, які реагують на запити від сервера.

Можно використовувати анотовані обробники для реагування на стороні клієнта, що базуються на тій самій інфраструктурі, що й серверна частина, але зареєстровані програмно таким чином:

Java
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())   
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());  
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))  
.tcp("localhost", 7000);
  1. Використовуємо PathPatternRouteMatcher, якщо є spring-web, для більш ефективного підбору маршруту.
  2. Створюємо респондер із класу з методами, позначеними анотацією @MessageMapping та/або @ConnectMapping.
  3. Реєструємо респондер.
Kotlin
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher())   
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());  
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) }   
.tcp("localhost", 7000)
  1. Використовуємо PathPatternRouteMatcher, якщо є spring -web, для більш ефективного підбору маршруту.
  2. Створюємо респондер із класу з методами, позначеними анотацією @MessageMapping та/або @ConnectMapping.
  3. Реєструємо респондер.

Зверни увагу, що наведене вище скорочення призначене виключно для програмної реєстрації клієнтських респондерів. В альтернативних сценаріях, якщо клієнтські респондери знаходяться в конфігурації Spring, можна оголосити RSocketMessageHandler як бін Spring і потім застосовувати наступним чином:

Java
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
Kotlin
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)

Дляописаного вище також може знадобитися використовувати setHandlerPredicate у RSocketMessageHandler, щоб перейти на іншу стратегію виявлення клієнтських респондерів, наприклад, на основі анотації користувача, такої як @RSocketClientResponder, а не стандартний @Controller. Це необхідно в сценаріях з клієнтом і сервером або кількома клієнтами в одному додатку.

Розширений режим

RSocketRequesterBuilder передбачає можливість здійснення зворотного виклику, який відкриває базовий io.rsocket.core.RSocketConnector для отримання доступу до додаткових параметрів конфігурації інтервалів підтримки в активному стані (keepalive), відновлення сесії, перехоплювачів тощо. Ти можеш налаштувати параметри на цьому рівні таким чином:

Java
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
Kotlin
val requester = RSocketRequester.builder()
.rsocketConnector {
    //...
}
.tcp("localhost", 7000)

Реквестер на стороні сервера

Для здійснення запитів від сервера до підключених клієнтів необхідно отримати від сервера реквестер для підключеного клієнта.

В анотованих респондерах методи з анотаціями @ConnectMapping та @MessageMapping підтримують аргумент RSocketRequester. Використовуй його, щоб отримати доступ до конкретного реквестера для конкретного з'єднання. Пам'ятай, що методи, позначені анотацією @ConnectMapping, по суті є обробниками кадру SETUP, який повинен бути оброблений до відправки запитів. Тому запити на початку необхідно відокремити від процедури обробки. Наприклад:

Java
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { 
    // ...
});
return ... 
}
  1. Запускаємо запит асинхронно, незалежно від процедури обробки.
  2. Виконуємо обробку та повертаємо завершення Mono<Void>.
Kotlin
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { 
    // ...
}
}
/// ... (2)
}
  1. Запускаємо запит асинхронно, незалежно від процедури обробки.
  2. Додаткове розповсюдження у виконанні suspending.

Запити

Відразу після отримання реквестера для client або server, запити можна надсилати так:

Java
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") 
.data(viewBox) 
.retrieveFlux(AirportLocation.class); 
  1. Вказуємо маршрут для включення до метаданих повідомлення-запиту.
  2. Вказуємо дані для повідомлення-запиту.
  3. Оголошуємо очікувану відповідь.
Kotlin
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") 
.data(viewBox) 
.retrieveFlow<AirportLocation>()
  1. Вказуємо маршрут для включення в метадані повідомлення-запиту.
  2. Вказуємо дані для повідомлення-запиту.
  3. Оголошуємо очікувану відповідь.

Тип взаємодії визначається неявно за потужністю множини (cardinality) введення та виведення. Наведений вище приклад є Request-Stream, оскільки надсилається одне значення, а приймається потік значень. Здебільшого про це думати не потрібно, поки вибір введення та виведення відповідає типу взаємодії RSocket та типу введення та виведення, очікуваним респондером. Єдиним прикладом неприпустимої комбінації є комбінація "багато до одного".

Метод data(Object) також приймає будь-який Publisher зі специфікації Reactive Streams, включно з Flux та Mono, а також будь-який інший генератор значень, зареєстрований у ReactiveAdapterRegistry. У випадку багатозначного Publisher, такого як Flux, який генерує однакові типи значень, розглянь можливість використання одного з перевантажених методів data, щоб уникнути перевірки типів та пошуку Encoder для кожного елемента:


data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

Етап з data(Object) є необов'язковим. Пропускаємо його у разі запитів, які не надсилають дані:

Java
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin

import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()

Додаткові значення метаданих можна додати, якщо використовуються складені метадані (за замовчуванням) і якщо значення підтримуються зареєстрованим Encoder. Наприклад:

Java
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()

Для Fire-and-Forget використовуй метод send(), який повертає Mono<Void>.Зверни увагу, що Mono вказує лише на те, що повідомлення було успішно надіслано, а не на те, що воно було оброблено.

Для Metadata-Push використовуй метод sendMetadata() зі значенням Mono<Void>, що повертається.