RSocketRequester надає вільний API для виконання запитів RSocket, приймаючи та повертаючи об'єкти для даних та метаданих замість буферів даних низького рівня. Він може використовуватися симетрично для виконання запитів від клієнтів і для виконання запитів від серверів.

Реквестер на стороні клієнта

Для отримання RSocketRequester на стороні клієнта необхідно підключитися до сервера, що передбачає відправку кадру SETUP за протоколом RSocket з налаштуваннями з'єднання. RSocketRequester передбачає засіб збирання, який допомагає підготувати io.rsocket.core.RSocketConnector, включаючи налаштування з'єднання для кадру SETUP.

Це найпростіший спосіб підключення з налаштуваннями за замовчуванням:

Java
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000); URI url = URI.create("https://example.org:8080/rsocket"); RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin
val requester = RSocketRequester.builder().tcp("localhost", 7000) URI url = URI.create("https://example.org:8080/rsocket"); val requester = RSocketRequester.builder().webSocket(url)

При вищевикладеному з'єднання встановлюється не відразу. Під час надходження запитів прозоро встановлюється та використовується спільне з'єднання.

Налаштування підключення

RSocketRequester.Builder передбачає наступне для налаштування початкового кадру SETUP :

  • dataMimeType(MimeType) – встановлює MIME-тип даних для з'єднання.

  • metadataMimeType(MimeType) – встановлює MIME-тип метаданих для з'єднання.

  • setupData(Object)– дані для включення до SETUP.

  • setupRoute(String, Object…) – маршрут метаданих для включення до SETUP .

  • setupMetadata(Object, MimeType)– інші метадані для включення до SETUP.

Для даних MIME-тип за замовчуванням визначається першим конфігурованим Decoder. Для метаданих MIME-тип за умовчанням – складові метадані, що дозволяє використовувати кілька пар значень метаданих та MIME-типів для кожного запиту. Як правило, обидва не потребують заміни.

Дані та метадані у кадрі SETUP є необов'язковими. На стороні сервера методи, позначені анотацією @ConnectMapping, можуть використовуватися для обробки початку з'єднання та вмісту кадру SETUP. Метадані можуть використовуватися для забезпечення безпеки на рівні з'єднання.

Стратегії

RSocketRequester.Builder приймає RSocketStrategies для конфігурування реквестера. Вам потрібно буде використовувати його, щоб передавати фрейми та декодери для (де)-серіалізації даних та значень метаданих. За промовчанням зареєстровано лише базові кодеки з spring-core для String, byte[] та ByteBuffer. Додавання spring-web забезпечує доступ до більшої кількості функцій, які можна зареєструвати таким чином:

Java
RSocketStrategies strategies = RSocketStrategies.builder( ) .encoders(encoders -> encoders.add(new Jackson2CborEncoder())) .decoders(decoders -> decoders.add(new Jackson2CborDecoder())) .build(); RSocketRequester requester = RSocketRequester.builder() .rsocketStrategies(strategies) .tcp("localhost", 7000) ;
Kotlin
val strategies = RSocketStrategies .builder() .encoders { it.add(Jackson2CborEncoder()) } .decoders { it.add(Jackson2CborDecoder()) } .build() val requester = RSocket .builder() .rsocketStrategies(strategies) .tcp("localhost", 7000) 

RSocketStrategies призначений для повторного використання. У деяких сценаріях, наприклад, коли клієнт і сервер знаходяться в одному додатку, може бути краще оголосити це в конфігурації Spring.

Респондери на стороні клієнта

RSocketRequester.Builder можна використовувати для налаштування респондерів, які реагують на запити від сервера.

Можно використовувати анотовані обробники для реагування на стороні клієнта, що базуються на тій самій інфраструктурі, що й серверна частина, але зареєстровані програмно таким чином:

Java
RSocketStrategies strategies = RSocketStrategies.builder() .routeMatcher(new PathPatternRouteMatcher())  .build(); SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());  RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> connector.acceptor(responder))  .tcp("localhost", 7000 );
  1. Використовуємо PathPatternRouteMatcher, якщо є spring-web, для більш ефективного підбору маршруту.
  2. Створюємо респондер із класу з методами, поміченими анотацією @MessageMapping та/або @ConnectMapping.
  3. Реєструємо респондер.
Kotlin
val strategies = RSocketStrategies.builder() .routeMatcher(PathPatternRouteMatcher())  .build() val responder = RSocketMessageHandler.responder(strategies, new ClientHandler());  val requester = RSocketRequester.builder() .rsocketConnector { it.acceptor( responder) }  .tcp("localhost", 7000)
  1. Використовуємо PathPatternRouteMatcher, якщо є spring -web, для більш ефективного підбору маршруту.
  2. Створюємо респондер із класу з методами, поміченими анотацією @MessageMapping та/або @ConnectMapping .
  3. Реєструємо респондер.

Зверніть увагу, що наведене вище скорочення призначене виключно для програмної реєстрації клієнтських респондерів. В альтернативних сценаріях, якщо клієнтські респондери знаходяться в конфігурації Spring, можна оголосити RSocketMessageHandler як бін Spring і потім застосовувати наступним чином:

Java
 ApplicationContext context = ...; RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> connector.acceptor(handler.responder())) .tcp("localhost", 7000);
Kotlin
import org.springframework.beans.factory.getBean val context: ApplicationContext = ... val handler = context.getBean<RSocketMessageHandler>() val requester = RSocketRequester.builder() .rsocketConnector { it.acceptor(handler.responder() ) } .tcp("localhost", 7000)

Для вищеописаного також може знадобитися використовувати setHandlerPredicate у RSocketMessageHandler, щоб перейти на іншу стратегію виявлення клієнтських респондерів, наприклад, на основі анотації користувача, такої як @RSocketClientResponder, а не стандартний @Controller. Це необхідно в сценаріях з клієнтом і сервером або кількома клієнтами в одному додатку. io.rsocket.core.RSocketConnector для отримання доступу до додаткових параметрів конфігурації інтервалів підтримки в активному стані (keepalive), відновлення сесії, перехоплювачів тощо. Ви можете налаштувати параметри на цьому рівні таким чином:

Java
RSocketRequester requester =
        RSocketRequester.builder() .rsocketConnector(connector -> { // ... }) .tcp("localhost", 7000);
Kotlin
val requester = RSocketRequester.builder() .rsocketConnector { //... } .tcp("localhost", 7000)

Реквестер на стороні сервера

Для здійснення запитів від сервера до підключених клієнтів необхідно отримати від сервера реквестер для підключеного клієнта.

В анотованих респондерах методи з анотаціями @ConnectMapping та @MessageMapping підтримують аргумент RSocketRequester. Використовуйте його, щоб отримати доступ до конкретного реквестера для конкретного з'єднання. Пам'ятайте, що методи, позначені анотацією @ConnectMapping, по суті, є обробниками кадру SETUP, який повинен бути оброблений до відправки запитів. Тому запити на початку необхідно відокремити від процедури обробки. Наприклад:

Java
@ConnectMapping Mono<Void> ; handle(RSocketRequester requester) { requester.route("status").data("5") .retrieveFlux(StatusReport.class) .subscribe(bar -> {  // ... }); return ...  }
  1. Запускаємо запит асинхронно, незалежно від процедури обробки.
  2. Виконуємо обробку та повертаємо завершення Mono<Void>.
Kotlin
@ConnectMapping suspend fun handle(requester: RSocketRequester) { GlobalScope.launch { requester.route("status").data("5").retrieveFlow< StatusReport>().collect {  // ... } } /// ... (2) }
  1. Запускаємо запит асинхронно, незалежно від процедури обробки.
  2. Додаткове розповсюдження у виконанні suspending.

Запити

Відразу після отримання реквестера для client або server, запити можна надсилати так:

Java
ViewBox viewBox = ... ; Flux<AirportLocation> locations = requester.route("locate.radars.within")  . data(viewBox)  .retrieveFlux(AirportLocation.class); 
  1. Задаємо маршрут для включення до метаданих повідомлення-запиту.
  2. Вказуємо дані для повідомлення-запиту.
  3. Оголошуємо очікувану відповідь.
Kotlin
val viewBox: ViewBox = ... val locations = requester .route("locate.radars.within")  .data(viewBox)  .retrieveFlow<AirportLocation>() 
  1. Задаємо маршрут для включення в метадані повідомлення-запиту.
  2. Вказуємо дані для повідомлення-запиту.
  3. Оголошуємо очікувану відповідь.

Тип взаємодії визначається неявно за потужністю множини (cardinality ) введення та виведення. Наведений вище приклад є Request-Stream, оскільки надсилається одне значення, а приймається потік значень. Здебільшого про це думати не потрібно, поки вибір введення та виведення відповідає типу взаємодії RSocket та типу введення та виведення, очікуваним респондером. Єдиним прикладом неприпустимої комбінації є комбінація "багато до одного".

Метод data(Object) також приймає будь-який Publisher зі специфікації Reactive Streams, включаючи Flux та Mono, а також будь-який інший генератор значень, зареєстрований у ReactiveAdapterRegistry. У випадку багатозначного Publisher, такого як Flux, який генерує однакові типи значень, розгляньте можливість використання одного з перевантажених методів data, щоб уникнути перевірки типів та пошуку Encoder для кожного елемента:

data(Object producer, Class<?> elementClass); data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

Етап з data(Object) є необов'язковим. Пропускаємо його у разі запитів, які не надсилають дані:

Java
Mono<AirportLocation> location = requester.route("find.radar.EWR")) .retrieveMono(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveAndAwait val location = requester.route("find.radar.EWR") .retrieveAndAwait<AirportLocation>()

Додаткові значення метаданих можна додати, якщо використовуються складові метадані (за замовчуванням) і якщо значення підтримуються зареєстрованим Encoder. Наприклад:

Java
String securityToken = ... ; ViewBox viewBox = ...; MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0"); Flux<AirportLocation> locations = requester.route("locate.radars.within") .metadata(securityToken, mimeType) .data(viewBox) .retrieveFlux(AirportLocation.class);
Kotlin
import org.springframework.messaging. rsocket.retrieveFlow val requester: RSocketRequester = ... val securityToken: String = ... val viewBox: ViewBox = ... val mimeType = MimeType.valueOf( "message/x.rsocket.authentication.bearer.v0") val locations = requester.route(" locate.radars.within") .metadata(securityToken, mimeType) .data(viewBox) .retrieveFlow<AirportLocation>()

Для Fire-and-Forget використовуйте метод send(), який повертає Mono<Void>.Зверніть увагу, що Mono вказує лише на те, що повідомлення було успішно надіслано, а не на те, що воно було оброблено.

Для Metadata-Push використовуйте метод sendMetadata() з значенням, що повертається Mono<Void>.