Протокол WebSocket визначає два типи повідомлень (текстові та двійкові), але їх зміст не визначається. Протокол визначає клієнту і серверу механізм для узгодження субпротоколу (тобто протоколу обміну повідомленнями вищого рівня), який використовуватиметься поверх WebSocket для визначення того, які повідомлення кожен із новачків може надсилати, який їх формат, зміст кожного повідомлення тощо. Використання субпротоколу необов'язкове, але в будь-якому випадку між клієнтом та сервером має бути узгоджений певний протокол, який визначає зміст повідомлення.

Короткий опис

Протокол STOMP (Simple Text Oriented Messaging Protocol) спочатку був створений для скриптових мов (таких як Ruby, Python і Perl) для підключення до корпоративних брокерів повідомлень. Він призначений для роботи з простою підмножиною шаблонів обміну повідомленнями, що часто використовуються. STOMP можна використовувати через будь-який надійний двонаправлений потоковий мережевий протокол, такий як TCP та WebSocket. Хоча STOMP є текстово-орієнтованим протоколом, корисні дані повідомлення можуть мати як текстову, так і двійкову форму.

STOMP — це протокол на основі кадрів, кадри якого змодельовані на основі HTTP. У наступному лістингу показано структуру кадру STOMP:

COMMAND
header1:value1
header2:value2
Body^@

Клієнти можуть використовувати команди SEND або SUBSCRIBE для надсилання або підписки на повідомлення разом із заголовком destination, який описує, про що повідомлення і хто має його отримати. Це дозволяє створити простий механізм публікації-підписки, який можна використовувати для надсилання повідомлень через брокер іншим підключеним клієнтам або для надсилання повідомлень на сервер із запитом на виконання певної роботи.

Якщо ти використовуєш засоби підтримки протоколу STOMP від Spring, додаток WebSocket в Spring виступає як брокер STOMP для клієнтів. Повідомлення надсилаються в методи обробки повідомлень, позначені анотацією @Controller, або доо простого брокера зі зберіганням в оперативній пам'яті, який відстежує підписки і розсилає повідомлення користувачам, що підписалися. Також можна налаштувати Spring на роботу зі спеціальним STOM-брокером (таким як RabbitMQ, ActiveMQ та іншими) для фактичного широкомовного розсилання повідомлень. У цьому випадку Spring підтримує встановлення TCP-з'єднань з брокером, ретранслює йому повідомлення та передає повідомлення від нього підключеним WebSocket-клієнтам. Таким чином, вебдодатки Spring можуть використовувати уніфіковану безпеку на основі HTTP-протоколу, загальну валідацію та знайому модель програмування для обробки повідомлень. періодично (наприклад, через заплановане завдання, яке надсилає повідомлення через SimpMessagingTemplate брокеру):

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
^@

У наступному прикладі показаний клієнт, що надсилає торгову заявку, яку сервер може обробити за допомогою методу з анотацією @MessageMapping:

SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}^@

Після виконання сервер може передати клієнту повідомлення про підтвердження угоди та детальну інформацію.

Значення адреси призначення навмисно залишено непрозорим у специфікації STOMP. Це може бути будь-який рядок, а сервери STOMP повністю визначають семантику та синтаксис адрес призначення, які вони підтримують. Однак дуже часто адреси призначення є рядками, подібними до шляхів, де /topic/.. передбачає публікацію-підписку (один до багатьох), а /queue/ передбачає обмін повідомленнями "точка-точка" (один до одного).

Сервери STOMP можуть використовувати команду MESSAGE для розсилки повідомлень усім передплатникам. У наступному прикладі показано, як сервер відправляє котирування акцій клієнту, що підписався:

MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@

Сервер не може надсилати непотрібні повідомлення. Всі повідомлення від сервера повинні бути відповіддю на певну підписку клієнта, а заголовок subscription-id повідомлення сервера повинен збігатися із заголовком id підписки клієнта.

Попередній короткий опис покликаний забезпечити базове розуміння протоколу STOMP. Ми рекомендуємо повністю ознайомитися зі специфікацією протоколу.

Переваги

Використання STOMP як субпротоколу дозволяє Spring Framework і Spring Security забезпечити більш повнофункціональну модель програмування порівняно з використанням сирих вебсокетів протоколу WebSocket. Те саме можна сказати і про порівняння HTTP з сирим TCP і про те, як це дозволяє Spring MVC та іншим веб-фреймворкам забезпечувати багату функціональність. Нижче наведено перелік переваг:

  • Не потрібно вигадувати кастомний протокол обміну повідомленнями та формат повідомлень.

  • Клієнти STOMP, включно з Java-клієнтом, доступні в Spring Framework.

  • Можна (опційно) використовувати брокери повідомлень (такі як RabbitMQ, ActiveMQ та інші) для керування підписками та широкомовними повідомленнями.

  • Логіку програми можна організувати в будь-якій кількості екземплярів @Controller, а повідомлення можна маршрутизувати до них на основі заголовка адреси призначення STOMP на відміну від обробки сирих WebSocket-повідомлень за допомогою єдиного WebSocketHandler для цього з'єднання.

  • Можна використовувати Spring Security для захисту повідомлень на основі адрес STOMP і типів повідомлень.

Активація STOMP

Підтримка STOMP поверх WebSocket доступна в модулях spring-messaging та spring-websocket. Щойно у тебе з'являться ці залежності, можна буде відкрити кінцеві точки STOMP поверх WebSocket за допомогою запасного варіанту через протокол SockJS, як показано в наступному прикладі:

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS(); 
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app"); 
        config.enableSimpleBroker("/topic", "/queue"); 
    }
}
  1. /portfolio — це URL-адреса протоколу HTTP для кінцевої точки, до якої клієнт WebSocket (або SockJS) повинен підключитися для підтвердження встановлення зв'язку за протоколом WebSocket.
  2. Повідомлення STOMP, заголовок адреси призначення яких починається з /app, направляються до методів, анотованих @MessageMapping у класах з анотацією @Controller.
  3. Використовуємо вбудований брокер повідомлень для підписки та широкомовної розсилки та надсилаємо повідомлення, заголовок адреси призначення яких починається з /topic `або `/queue в брокер.

У цьому прикладі показано XML-еквівалент конфігурації з попереднього прикладу:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>
</beans>
У разі вбудованого простого брокера префікси /topic та /queue не мають особливого значення. Вони є лише угодою, що дозволяє провести розмежування між обміном повідомленнями на кшталт "видавець-передплатник" та "точка-точка" (тобто між багатьма передплатниками та одним споживачем). Якщо ти використовуєш зовнішній брокер, звернися до сторінки STOMP брокера, щоб зрозуміти, які адреси призначення та префікси STOMP він підтримує.

Для підключення з браузера, при використанні SockJS, можна використовувати sockjs-client. Для STOMP багато програм використовували бібліотеку jmesnil/stomp-websocket (також відому як stomp.js), яка є повнофункціональною та використовувалась протягом багатьох років, але більше не підтримується. Нині JSteunou/webstomp-client є наступником цієї бібліотеки, що найбільш активно підтримується і розвивається. Наступний приклад коду на ній:


var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}

До того ж, якщо підключення здійснюється через WebSocket (без SockJS), то можна використовувати наступний код :


var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}

Зверни увагу, що в попередньому прикладі stompClient не потрібно вказувати заголовки login та passcode Навіть якби це було зроблено, вони були б проігноровані (або, швидше, перевизначені) на стороні сервера.

Додаткові приклади коду див.:

Сервер WebSocket

Для налаштування базового сервера WebSocket застосовується інформація з розділу "Серверна конфігурація". У випадку Jetty, однак, необхідно встановити HandshakeHandler та WebSocketPolicy через StompEndpointRegistry:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
    }
    @Bean
    public DefaultHandshakeHandler handshakeHandler() {
        WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
        policy.setInputBufferSize(8192);
        policy.setIdleTimeout(600000);
        return new DefaultHandshakeHandler(
                new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    }
}

Потік повідомлень

Якщо кінцева точка STOMP відкрита, програма Spring стає брокером STOMP для підключених клієнтів. У цьому розділі описується потік повідомлень на стороні сервера.

Модуль spring-messaging містить фундаментальні засоби підтримки програм для обміну повідомленнями, які зародилися ще в проєкті Spring Integration, а потім були вилучені та включені до Spring Framework для ширшого використання в багатьох проєктах Spring та сценаріях застосування. У наступному списку коротко описано деякі з доступних абстракцій обміну повідомленнями:

  • Message: Просте подання повідомлення, включно з заголовками та корисними даними.

  • MessageHandler : Угода про обробку повідомлення.

  • MessageChannel: Угода про надсилання повідомлення, що забезпечує вільну взаємодію між відправниками та одержувачами.

  • SubscribableChannel: MessageChannel з передплатниками MessageHandler.

  • ExecutorSubscribableChannel: SubscribableChannel, який використовує для доставки повідомлень.

Як Java-конфігурація (тобто анотація @EnableWebSocketMessageBroker), так і конфігурація простору імен XML (тобто <websocket:message-broker>) використовують попередні компоненти для асемблювання робочого процесу передачі повідомлення. На наступній схемі показані компоненти, що використовуються при активації простого вбудованого брокера повідомлень:

На попередній схемі показано три канали повідомлень:

  • clientInboundChannel: Для надсилання повідомлень, отриманих від клієнтів WebSocket.

  • clientOutboundChannel: Для надсилання серверних повідомлень клієнтам WebSocket.

  • brokerChannel: Для надсилання повідомлень брокеру повідомлень з коду програми на стороні сервера.

На наступній схемі показані компоненти, які використовуються, якщо зовнішній брокер (наприклад, RabbitMQ) налаштований для керування підписками та широкомовною розсилкою повідомлень:

Основна відмінність між попередніми схемами полягає у використанні "ретранслятора брокера (broker relay)" для передачі повідомлень до зовнішнього STOMP-брокера поверх TCP і для низхідної передачі повідомлень від брокера до клієнтів, що підписалися.

Якщо повідомлення приймаються від WebSocket-з'єднання, вони декодуються в STOMP-фрейми, перетворюються на подання Message зі Spring і надсилаються за clientInboundChannel для подальшої обробки. Наприклад, STOMP-повідомлення, заголовки адрес призначення яких починаються з /app, можуть бути маршрутизовані до методів з анотацією @MessageMapping в анотованих контролерах, у той час як повідомлення /topic та /queue можуть бути направлені безпосередньо до брокера повідомлень.

Анотований @Controller, який обробляє STOMP-повідомлення від клієнта, може надіслати повідомлення брокеру повідомлень через brokerChannel, а брокер надсилає повідомлення відповідним передплатникам через clientOutboundChannel. Той самий контролер може робити те саме у відповідь на HTTP-запити, тому клієнт може виконати HTTP-метод POST, а потім метод з анотацією @PostMapping може надіслати повідомлення брокеру повідомлень для розсилки передплатникам.

Ми можемо простежити цей процес на простому прикладі. Розглянемо наступний приклад, у якому налаштовується сервер:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }
}
@Controller
public class GreetingController {
    @MessageMapping("/greeting")
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }
}

Попередній приклад підтримує наступний потік:

  1. Клієнт підключається до http://localhost:8080/portfolio і щойоно встановлюється WebSocket-з'єднання, за ним починають передаватися фрейми STOMP.

  2. Клієнт відправляє кадр SUBSCRIBE із заголовком призначення /topic/greeting. Після отримання та декодування повідомлення відправляється в clientInboundChannel, а потім маршрутизується до брокера повідомлень, який зберігає підписку клієнта.

  3. Клієнт відправляє кадр SEND до /app/greeting. Префікс /app допомагає маршрутизувати його до анотованих контролерів. Після видалення префікса /app частина призначення /greeting, що залишилася, відображається на метод, анотований @MessageMapping, у GreetingController.

  4. Значення GreetingController,що повертається, перетворюється на Message зі Spring з корисними даними, заснованими на значенні, що повертається, і заголовком призначення за замовчуванням /topic/greeting (отриманим із вхідного призначення із заміною /app на /topic). Отримане повідомлення надсилається до brokerChannel та обробляється брокером повідомлень.

  5. Брокер повідомлень знаходить усіх відповідних передплатників і відправляє кожному з них фрейм MESSAGE каналом clientOutboundChannel, звідки повідомлення кодуються як фрейми STOMP і відправляються по з'єднанню на основі протоколу WebSocket. значень, що повертаються.

Анотовані контролери

Програми можуть використовувати класи, позначені анотацією @Controller, для обробки повідомлень від клієнтів. Такі класи можуть оголошувати методи з анотацією @MessageMapping, @SubscribeMapping та @ExceptionHandler, як описано в наступних темах:

  • @MessageMapping

  • @SubscribeMapping

  • @MessageExceptionHandler

@MessageMapping

Можна використовувати анотацію @MessageMapping для анотування методів, що маршрутизують повідомлення на основі їхнього призначення. Вона підтримується як на рівні методу, і лише на рівні типу. На рівні типів анотація @MessageMapping використовується для вираження загальних відображень для всіх методів контролера.

За замовчуванням значення відображення є шаблонами шляхів у стилі Ant (наприклад, /thing* , /thing/**), включно з підтримкою змінних шаблонів (наприклад, /thing/{id}). На значення можна посилатися через аргументи методу, позначеного анотацією @DestinationVariable. Програми також можуть переключитися на угоду про розділення точкою призначення повідомлення для відображення.

Аргументи методу, що підтримуються

У наступній таблиці описані аргументи методу:

Аргумент методу Опис

Message

Забезпечує доступ до повного повідомлення.

MessageHeaders

Забезпечує доступ до заголовків усередині Message.

MessageHeaderAccessor, SimpMessageHeaderAccessor та StompHeaderAccessor

Забезпечують доступ до заголовків через типізовані методи доступу.

@Payload

Забезпечує доступ до корисних даних повідомлення, перетворених (наприклад, з JSON) за допомогою налаштованого MessageConverter.

Наявність цієї анотації не є обов'язковою, оскільки за замовчуванням вона передбачена, якщо не збігається жоден інший аргумент.

Модна анотувати аргументи корисних даних за допомогою @javax.validation.Valid або @Validated із Spring, щоб аргументи корисних даних валідувалися автоматично.

@Header

Забезпечує доступ до певного значення заголовка — разом із перетворенням типу за допомогою org.springframework.core.convert.converter.Converter, якщо це необхідно.

@Headers

Забезпечує доступ до всіх заголовків у повідомленні. Цей аргумент має бути присвоєним для java.util.Map.

@DestinationVariable

Забезпечує доступ до змінних шаблонів, вилучених із призначення повідомлення. Значення при необхідності перетворюються відповідно до оголошеного типу аргументу методу.

java.security.Principal

Відображає користувача, який увійшов до системи під час підтвердження встановлення зв'язку за протоколом WebSocket через протокол HTTP.

Значення, що повертаються

За замовчуванням повертається значення методу, позначеного анотацією @MessageMapping, серіалізується в корисні дані через відповідний MessageConverter і відправляється у вигляді Message за brokerChannel, звідки воно розсилається підписникам. Призначення вихідного повідомлення таке саме, як і вхідного, але з префіксом /topic.

Можна використовувати анотації @SendTo та @SendToUser для налаштування вихідного повідомлення. Анотація @SendTo використовується для налаштування цільового призначення або для вказання кількох призначень. Анотація @SendToUser використовується для надсилання вихідного повідомлення тільки користувачу, пов'язаному з вхідним повідомленням.

Можна одночасно використовувати @SendTo та @SendToUser в тому самому методі, причому обидва методи підтримуються на рівні класу, і в цьому випадку вони будуть діяти за замовчуванням для методів у класі. Однак пам'ятай, що будь-яке анотування за допомогою @SendTo або @SendToUser на рівні методу перевизначає будь-які подібні анотації на рівні класу.

Повідомлення можуть оброблятися асинхронно, а метод з анотацією @MessageMapping може повертати ListenableFuture, CompletableFuture або CompletionStage.

Зверни увагу, що анотації @SendTo і @SendToUser — це просто зручний спосіб, який дорівнює використанню SimpMessagingTemplate для надсилання повідомлень. При необхідності, у більш складних сценаріях, методи, позначені анотацією @MessageMapping, можуть відкотитися до використання SimpMessagingTemplate безпосередньо. Це може статися замість або, можливо, на додаток до повернення значення.

@SubscribeMapping

Анотація @SubscribeMapping аналогічна @MessageMapping, але звужує відображення лише до повідомлень підписки. Вона підтримує самі аргументи методу, як і анотація @MessageMapping. Однак у разі значення, що повертається, за замовчуванням повідомлення відправляється безпосередньо клієнту (через clientOutboundChannel, у відповідь на підписку), а не брокеру (через brokerChannel, у вигляді широкомовної розсилки за відповідним підписки). Додавання анотації @SendTo або анотації @SendToUser перевизначає цю логіку роботи і натомість надсилає повідомлення брокеру.

У яких випадках це практично? Припустимо, що брокер відображено на /topic та /queue, а контролери додатків відображені на /app. За такої конфігурації брокер зберігає всі підписки на /topic та /queue, призначені для повторних розсилок, і додаток не має необхідності брати в цьому участь. Клієнт також може підписатися на певну адресу призначення /app, а контролер може повернути значення у відповідь на цю підписку без участі брокера, не зберігаючи і не використовуючи підписку повторно (фактично одноразовий обмін на кшталт "запит-відповідь"). Одним із варіантів використання цього механізму є заповнення інтерфейсу користувача початковими даними при запуску.

У яких випадках це непрактично? Не намагайся відображати брокер і контролери з тим самим префіксом адреси призначення, якщо з якоїсь причини тобі не потрібно, щоб обидва незалежно обробляли повідомлення, включно з підписками. Вхідні повідомлення обробляються паралельно. Немає жодних гарантій щодо того, що першим опрацює це повідомлення — брокер чи контролер. Якщо метою є отримання повідомлення про те, що передплата збережена і готова до широкомовної розсилки, клієнт повинен запитати підтвердження отримання, якщо сервер підтримує його (простий брокер не підтримує). Наприклад, за допомогою клієнта STOMP Java можна зробити таке, щоб додати підтвердження отримання:


@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// Під час ініціалізації...
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// Підписка...
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> {
     // Передплата готова...
});

На стороні сервера можна зареєструвати ExecutorChannelInterceptor для brokerChannel та реалізувати метод afterMessageHandled, який викликається після обробки повідомлень, включно з підписками.

@MessageExceptionHandler

Програма може використовувати методи, анотовані @MessageExceptionHandler, для обробки винятків із методів, анотованих @MessageMapping. Можна оголосити винятки в самій анотації або через аргумент методу, якщо необхідно отримати доступ до екземпляра винятку. У наступному прикладі виняток оголошується через аргумент методу:

 
@Controller
public class MyController {
    // ...
    @MessageExceptionHandler
    public ApplicationError handleException(MyException exception) {
        // ...
        return appError;
    }
}

Методи з анотацією @MessageExceptionHandler підтримують гнучкі сигнатури методів і ті ж типи аргументів методу і значення, що повертаються, що і методи з анотацією @MessageMapping.

Зазвичай методи, позначені анотацією @MessageExceptionHandler, застосовуються в межах класу з анотацією @Controller ( або ієрархії класів), у якому вони оголошені. Якщо тобі потрібно, щоб такі методи застосовувалися більш глобально (у всіх контролерах), можна оголосити їх у класі, позначеному анотацією @ControllerAdvice. Це можна порівняти з аналогічними засобами підтримки, доступними в Spring MVC.

Надсилання повідомлень

Що робити, якщо необхідно надсилати повідомлення підключеним клієнтам з будь-якої частини програми? Будь-який компонент програми може надсилати повідомлення через brokerChannel. Найпростіший спосіб зробити це — впровадити SimpMessagingTemplate та використовувати його для надсилання повідомлень. Зазвичай він впроваджується за типом, як це показано в наступному прикладі:


@Controller
public class GreetingController {
    private SimpMessagingTemplate template;
    @Autowired
    public GreetingController(SimpMessagingTemplate template) {
        this.template = template;
    }
    @RequestMapping(path="/greetings", method=POST)
    public void greet(String greeting) {
        String text = "[" + getTimestamp() + "]:" + greeting;
        this.template.convertAndSend("/topic/greetings", text);
    }
}

Однак також можна визначити його на ім'я (brokerMessagingTemplate), якщо існує інший бін того ж типу.

Простий брокер

Вбудований простий брокер повідомлень обробляє запити на підписку від клієнтів, зберігає їх у пам'яті та розсилає повідомлення підключеним клієнтам з відповідними адресами призначення. Брокер підтримує шляхи адрес призначення, включаючи підписки на шаблони адрес призначення в стилі Ant.

Програми також можуть використовувати адреси призначення, розділені точками (а не косою рискою).

Якщо налаштований планувальник завдань, простий брокер буде підтримувати heartbeat -повідомлення STOMP. Щоб налаштувати планувальник, можна оголосити власний бін TaskScheduler і встановити його через MessageBrokerRegistry. До того ж, можна використовувати той, який автоматично оголошується у вбудованій конфігурації WebSocket, проте тоді знадобиться анотація @Lazy, щоб уникнути циклічності між вбудованою конфігурацією WebSocket та вашим WebSocketMessageBrokerConfigurer. Наприклад:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private TaskScheduler messageBrokerTaskScheduler;
    @Autowired
    public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
        this.messageBrokerTaskScheduler = taskScheduler;
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue/", "/topic/")
                .setHeartbeatValue(new long[] {10000, 20000})
                .setTaskScheduler(this.messageBrokerTaskScheduler);
        // ...
    }
}

Зовнішній брокер

Простий брокер відмінно підходить для початку роботи, але підтримує тільки частину команд STOMP (він не підтримує символи ack, підтвердження отримання та деякі інші функції), використовує простий цикл надсилання повідомлень і не підходить для кластеризації. Як варіант, можна модернізувати програми для повнофункціонального брокера повідомлень.

Звернися до документації по STOMP для обраного брокера повідомлень (наприклад, RabbitMQ, ActiveMQ та інші), встанови брокер і запусти його з активованою підтримкою STOMP. Потім можна активувати ретранслятор брокера STOMP (замість простого брокера) у конфігурації Spring.

Наступний приклад конфігурації передбачає повнофункціональний брокер:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>
</beans>

Ретранслятором брокера STOMP у попередній конфігурації є MessageHandler зі Spring, який обробляє повідомлення, пересилаючи їх зовнішньому брокеру повідомлень. Для цього він встановлює TCP-з'єднання з брокером, направляє йому всі повідомлення, а потім пересилає всі повідомлення, отримані від брокера, клієнтам через WebSocket-сесії. По суті, він діє як "ретранслятор", що пересилає повідомлення в обох напрямках. Додай до проєкту залежності io.projectreactor.netty:reactor-netty та io.netty:netty-all для керування TCP-з'єднаннями.

Більше того, компоненти додатків (такі як методи обробки HTTP-запитів, бізнес- сервіси та інші) також можуть надсилати повідомлення ретранслятору брокера.

По суті, ретранслятор брокера забезпечує надійне та масштабоване розсилання повідомлень.

Підключення до брокера

Ретранслятор брокера STOMP підтримує єдине "системне" TCP-з'єднання з брокером. Це з'єднання використовується виключно для повідомлень, що надходять від програми на стороні сервера, але не для отримання повідомлень. Можна налаштувати облікові дані STOMP (тобто заголовки login та passcode у кадрі STOMP) для цього з'єднання. Це буде відображено як у просторі імен XML, так і в Java-конфігурації у вигляді властивостей systemLogin та systemPasscode зі значеннями за замовчуванням guest та guest.

Ретранслятор брокера STOMP також створює окреме TCP-з'єднання для кожного підключеного клієнта WebSocket. Можна налаштувати облікові дані STOMP, які використовуються для всіх з'єднань TCP, створених від імені клієнтів. Це буде відображено як у просторі імен XML, так і в Java-конфігурації у вигляді властивостей clientLogin та clientPasscode зі значеннями за замовчуванням guest та guest.

Ретранслятор брокера STOMP завжди встановлює заголовки login та passcode у кожному кадрі CONNECT, який він пересилає брокеру від імені клієнтів. Тому клієнтам WebSocket не потрібно встановлювати ці заголовки. Вони ігноруються. Клієнти WebSocket повинні вдаватися до HTTP-аутентифікації для захисту кінцевої точки WebSocket та встановлення особистості клієнта. Можна налаштувати інтервали надсилання та отримання heartbeat-повідомлень (за замовчуванням 10 секунд). Якщо зв'язок з брокером буде втрачено, ретранслятор брокера намагатиметься відновити з'єднання кожні 5 секунд доти, доки це не вдасться зробити.

Будь-який бін Spring може реалізувати ApplicationListener<BrokerAvailabilityEvent> для отримання повідомлень, якщо "системне" з'єднання з брокером втрачено і відновлено. Наприклад, служба Stock Quote, що розсилає котирування акцій, може припинити спроби відправлення повідомлень за відсутності активного "системного" з'єднання. За замовчуванням ретранслятор брокера STOMP завжди підключається – а в разі необхідності й перепідключається, якщо з'єднання пропаде, – до одного й того ж самого хосту та порту. Якщо потрібно надавати кілька адрес при кожній спробі підключення, можна налаштувати постачальника адрес замість фіксованого хоста і порту. У цьому прикладі показано, як це зробити:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    // ...
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
        registry.setApplicationDestinationPrefixes("/app");
    }
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
                client -> client.addressSupplier(() -> ... ),
                new StompReactorNettyCodec());
    }
}

Також можна налаштувати ретранслятор брокера STOMP за допомогою властивості virtualHost. Значення цієї властивості встановлюється як заголовок host кожного кадру CONNECT і може бути корисним (наприклад, у хмарному середовищі, де фактичний хост, з яким встановлюється TCP-з'єднання, відрізняється від хоста, що надає хмарний STOMP-сервіс).

Крапки як роздільники

Якщо повідомлення надсилаються в методи, анотовані @MessageMapping, вони зіставляються з AntPathMatcher. За замовчуванням очікується, що шаблони будуть використовувати косу межу (/) як роздільник. Це хороша угода для вебзастосунків, аналогічна URL-адресам протоколу HTTP. Однак якщо ви більше звикли до угод для обміну повідомленнями, можна перейти на використання крапки (.) як роздільника.

У наступному прикладі показано, як це зробити за допомогою Java- конфігурації:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    // ...
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.enableStompBrokerRelay("/queue", "/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:


<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:websocket="http://www.springframework.org/schema/websocket"
       xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                https://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/websocket
                https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
        <websocket:stomp-endpoint path="/stomp"/>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>
    <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
        <constructor-arg index="0" value="."/>
    </bean>
</beans>

Після цього контролер зможе використовувати крапку (.) як роздільник у методах, позначених анотацією @MessageMapping, як показано в наступному прикладі:


@Controller
@MessageMapping("red")
public class RedController {
    @MessageMapping("blue.{green}")
    public void handleGreen(@DestinationVariable String green) {
        // ...
    }
}

Тепер клієнт може надіслати повідомлення на /app/red.blue.green123.

У попередньому прикладі ми не змінювали префікси для "ретранслятора брокера", тому що вони залежать від зовнішнього брокера повідомлень. Звернися до сторінок документації STOMP, присвячених брокеру, який ти використовуєш, щоб дізнатися, які угоди він підтримує для заголовка адрес призначення.

З іншого боку, "простий брокер" покладається на налаштований PathMatcher, тому, якщо ти перемкнеш роздільник, ця зміна також застосовується до брокера і до того, як брокер зіставляє адреси призначення з повідомлення з шаблонами у підписках.

Аутентифікація

Кожен сеанс обміну повідомленнями STOMP WebSocket починається з запиту HTTP. Це може бути запит на перехід до вебсокетів протоколу WebSocket (тобто підтвердження встановлення зв'язку за протоколом WebSocket) або, у разі запасних варіантів через протокол SockJS, серія HTTP-запитів на механізми передачі для SockJS.

Багато вебзастосунків вже мають засоби автентифікації та авторизації для захисту HTTP-запитів. Зазвичай користувач аутентифікується через Spring Security за допомогою будь-якого механізму, наприклад, сторінки входу в систему, базової HTTP-автентифікації або іншим способом. Контекст безпеки для автентифікованого користувача зберігається в HTTP-сесії і зв'язується з наступними запитами в тій же сесії на основі cookie. Тому в разі підтверждення встановлення зв'язку за протоколом WebSocket або HTTP-запитів на механізми передачі для SockJS зазвичай вже існує автентифікований користувач, доступний через HttpServletRequest#getUserPrincipal(). Spring автоматично пов'язує цього користувача зі створеною для нього сесією WebSocket або SockJS і, згодом, з усіма повідомленнями STOMP, що передаються через цю сесію через заголовок користувача. Тобто, типовому вебзастосунку не потрібно робити нічого надмір того, що він вже здійснює для забезпечення безпеки. Користувач аутентифікується на рівні HTTP-запиту за допомогою контексту безпеки, який підтримується на рівні HTTP-сесії на основі cookie (яка потім зв'язується з сесіями WebSocket або SockJS, створеними для цього користувача), внаслідок чого заголовок користувача додається до кожного Message, що проходить через програму.

Протокол STOMP дійсно має заголовки login і passcode у кадрі CONNECT. Вони спочатку були розроблені для використання STOMP поверх TCP і необхідні для нього. Однак, у разі використання STOMP поверх WebSocket, за замовчуванням, Spring ігнорує заголовки автентифікації на рівні протоколу STOMP, і припускає, що користувач вже автентифікований на рівні HTTP-механізмів передачі. Передбачається, що сесія WebSocket або SockJS міститиме автентифікованого користувача.

Аутентифікація за допомогою токенів

Проєкт Spring Security OAuth забезпечує підтримку безпеки на основі токенів, включаючи JSON Web Token (JWT). Ти можеш використовувати його як механізм автентифікації у вебдодатках, включно з STOMP поверх WebSocket, як описано в попередньому розділі (тобто для збереження ідентичності через сесію на основі cookie).

В той же час сесії на основі cookie не завжди підходять краще (наприклад, у додатках, які не підтримують сесію на стороні сервера, або в мобільних додатках, де зазвичай використовуються заголовки для автентифікації).

Протокол WebSocket, RFC 6455 "не передбачає жодного конкретного способу, яким сервери можуть аутентифікувати клієнтів під час підтвердження встановлення зв'язку за протоколом WebSocket". Насправді, браузерні клієнти можуть використовувати лише стандартні заголовки аутентифікації (тобто базову аутентифікацію HTTP) або файли cookie, але не можуть (наприклад) надавати кастомні заголовки. Аналогічно, клієнт SockJS на JavaScript не має можливості надсилати HTTP-заголовки за транспортними запитами SockJS. Див. sockjs-client issue 196.Натомість він дозволяє відправляти параметри запиту, які ти можеш використовувати для відправки токена, але в цьому є свої недоліки (наприклад, токен може бути випадково зареєстрований разом з URL-адресою в журналах сервера).

Попередні обмеження відносяться до клієнтів на базі браузера і не поширюються на клієнт STOMP на базі Java у Spring, який підтримує відправлення заголовків із запитами за протоколами WebSocket та SockJS.

Тому для додатків, в яких потрібно уникнути використання файлів cookie, може не знайти хороших альтернатив для аутентифікації на рівні протоколу HTTP. У такому разі замість використання файлів cookie можна віддати перевагу аутентифікації за допомогою заголовків на рівні протоколу обміну повідомленнями STOMP. Для цього необхідно виконати дві прості дії:

  1. Використовуй клієнт STOMP для передачі заголовків аутентифікації під час підключення.

  2. Опрацюй заголовки аутентифікації з за допомогою ChannelInterceptor.

У наступному прикладі використовується конфігурація на стороні сервера для реєстрації кастомного перехоплювача аутентифікації. Зверни увагу, що перехоплювачу необхідно лише пройти автентифікацію та встановити заголовок користувача в Message виду CONNECT. Spring відзначає та зберігає автентифікованого користувача та пов'язує його з наступними повідомленнями STOMP у тій же сесії. У цьому прикладі показано, як зареєструвати кастомний перехоплювач аутентифікації:


@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message preSend(Message message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    Authentication user = ... ; // access authentication header(s)
                    accessor.setUser(user);
                }
                return message;
            }
        });
    }
}}

Зверни увагу також, що при використанні авторизації Spring Security для повідомлень наразі необхідно переконатися, що конфігурація ChannelInterceptor для автентифікації перебуває в порядку перед конфігурацією Spring Security. Це найкраще зробити, оголосивши користувальницький перехоплювач у власній реалізації WebSocketMessageBrokerConfigurer, яка позначена анотацією @Order(Ordered.HIGHEST_PRECEDENCE + 99).

Авторизація

Spring Security передбачає авторизацію за субпротоколом WebSocket, який використовує ChannelInterceptor для авторизації повідомлень на основі заголовка користувача. До того ж, Spring Session забезпечує інтеграцію протоколу WebSocket, яка гарантує, що HTTP-сесія користувача не закінчиться, поки WebSocket-сесія все ще активна.

Користувацькі адреси призначення

Додаток може надсилати повідомлення, адресовані конкретному користувачу, і для цього засоби підтримки STOMP у Spring розпізнають адреси призначення з префіксом /user/. Наприклад, клієнт може підписатися на адресу призначення /user/queue/position-updates. UserDestinationMessageHandler обробляє цю адресу призначення і перетворює її на адресу призначення, унікальну для сесії користувача (наприклад, /queue/position-updates-user123). Це забезпечує зручність підписки на загальну адресу призначення і водночас гарантує відсутність конфліктів з іншими користувачами, що підписалися на ту саму адресу призначення, тому кожен користувач зможе отримувати унікальні оновлення біржових позицій.

При роботі з кастомними адресами призначення важливо налаштувати префікси адрес призначення брокера та програми, інакше брокер оброблятиме повідомлення з префіксом "/user", які повинні оброблятися тільки UserDestinationMessageHandler.

На стороні передачі повідомлення можуть надсилати за адресою призначення, такою як /user/{username}/queue/position-updates, яка у свою чергу перетворюється на UserDestinationMessageHandler в одну або кілька адрес призначення, по одній для кожної сесії, пов'язаної з користувачем. Це дозволяє будь-якому компоненту програми надсилати повідомлення, призначені для конкретного користувача, не знаючи нічого, крім його імені та загальної адреси призначення. Це також можна зробити за допомогою анотації та шаблону обміну повідомленнями.

Метод обробки повідомлень може надсилати повідомлення користувачу, пов'язаному з обробленим повідомленням, за допомогою анотації @SendToUser (також підтримується на рівні класу для загальної адреси призначення), як показано в наступному прикладі:


@Controller
public class PortfolioController {
    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

Якщо користувач має більше однієї сесії, за замовчуванням усі сесії, підписані на цю адресу призначення, є цільовими. Однак іноді може знадобитися вибрати метою лише ту сесію, яка надіслала оброблене повідомлення. Зробити це можна, встановивши атрибут broadcast у false, як показано в наступному прикладі:


@Controller
public class MyController {
    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // MyBusinessException is thrown here
    }
    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}
Хоча адреси призначення користувачів зазвичай мають на увазі автентифікованого користувача, це не є суворо обов'язковим. Сесія WebSocket, не пов'язана з автентифікованим користувачем, може підписатися на адресу призначення користувача. У таких випадках анотація @SendToUser функціонує так само, як і при broadcast=false (тобто вибирає метою лише ту сесію, яка надіслала оброблене повідомлення).

Можна надіслати повідомлення на адреси призначення користувача з будь-якого компонента програми, наприклад, впровадивши SimpMessagingTemplate, створений Java-конфігурацією або простором імен XML. (Ім'я біна - brokerMessagingTemplate, якщо потрібно для повного уточнення імені за допомогою анотації @Qualifier). У цьому прикладі показано, як це зробити:


@Service
public class TradeServiceImpl implements TradeService {
    private final SimpMessagingTemplate messagingTemplate;
    @Autowired
    public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }
    // ...
    public void afterTradeExecuted(Trade trade) {
        this.messagingTemplate.convertAndSendToUser(
                trade.getUserName(), "/queue/position-updates", trade.getResult());
    }
}
Якщо використовуються призначені для користувача адреси призначення із зовнішнім брокером повідомлень, то слід ознайомитися з документацією до брокеру щодо того, як керувати неактивними чергами, щоб після завершення сесії користувача всі унікальні черги користувача були видалені. Наприклад, RabbitMQ створює черги з автоматичним видаленням, якщо використовуються такі адреси призначення, як /exchange/amq.direct/position-updates. У цьому випадку клієнт може підписатися на /user/exchange/amq.direct/position-updates. Аналогічно, ActiveMQ має конфігураційні опції для очищення неактивних адрес призначення.

У сценарії з кількома серверами програм користувача адреса призначення може залишитися невирішеною, оскільки користувач підключений до іншого сервера. У таких випадках можна налаштувати адресу призначення на широкомовну розсилку недозволених повідомлень, щоб інші сервери могли спробувати зробити це. Здійснити це можна за допомогою властивості userDestinationBroadcast реєстру MessageBrokerRegistry в Java-конфігурації та атрибуту user-destination-broadcast елемента message-broker у XML.

Порядок повідомлень

Повідомлення від брокера публікуються через clientOutboundChannel, звідки вони записуються до WebSocket-сесій. Оскільки канал підтримується ThreadPoolExecutor, повідомлення обробляються в різних потоках, а результуюча послідовність, отримана клієнтом, може не збігатися з точним порядком публікації.

Якщо це є проблемою, активуй прапорець setPreservePublishOrder, як показано в наступному прикладі:


@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    protected void configureMessageBroker(MessageBrokerRegistry registry) {
        // ...
        registry.setPreservePublishOrder(true);
    }
}

У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker preserve-publish-order="true">
            <!-- ... -->
    </websocket:message-broker>
</beans>

Якщо прапорець встановлений, повідомлення в межах однієї клієнтської сесії публікуються через clientOutboundChannel по одному за раз, що забезпечує необхідний порядок публікації. Зверни увагу, що це призводить до невеликого зниження продуктивності, тому необхідно активувати цей прапор тільки в разі потреби.

Події

Кілька подій ApplicationContext публікуються і можуть бути отримані шляхом реалізації інтерфейсу ApplicationListener для Spring:

  • BrokerAvailabilityEvent: Вказує, коли брокер стає доступним або недоступним. Хоча "простий" брокер стає доступним відразу під час запуску і залишається таким під час роботи програми, "ретранслятор брокера" STOMP може втратити з'єднання з повнофункціональним брокером (наприклад, при перезапуску брокера). Ретранслятор брокера містить логіку перепідключення та відновлює "системне" з'єднання з брокером, коли він знову активується. В результаті ця подія публікується щоразу, коли стан змінюється з підключеного на відключений і навпаки. Компонентам, які використовують SimpMessagingTemplate, потрібно підписатися на цю подію та уникати надсилання повідомлень, поки брокер недоступний. У будь-якому випадку, вони повинні бути підготовлені до обробки MessageDeliveryException під час надсилання повідомлення.

  • SessionConnectEvent: Публікується при отриманні нового кадру CONNECT протоколу STOMP для позначення початку нової клієнтської сесії. Подія містить повідомлення, що представляє підключення, включно з ідентифікатором сесії, інформацією про користувача (якщо така є) і будь-якими заголовками, що надіслані клієнтом. Це корисно для відстеження клієнтських сесій. Компоненти, підписані на цю подію, можуть обернути повідомлення за допомогою SimpMessageHeaderAccessor або StompMessageHeaderAccessor.

  • SessionConnectedEvent: Публікується незабаром після SessionConnectEvent, коли брокер відправив кадр CONNECTED протоколу STOMP у відповідь на кадр CONNECT. На цьому етапі STOMP-сесію можна вважати повністю встановленою.

  • SessionUnsubscribeEvent: Публікується при отриманні нового кадру UNSUBSCRIBE протоколу STOMP.

  • SessionDisconnectEvent: Публікується при завершенні STOMP-сесії. Фрейм DISCONNECT може передатися від клієнта або автоматично згенеруватися під час закриття WebSocket-сесії. У деяких випадках ця подія публікується більше ніж один раз за сесію. Компоненти повинні бути ідемпотентними по відношенню до кількох подій втрати з'єднання. Якщо ти використовуєш повнофункціональний брокер, "ретранслятор брокера" STOMP автоматично встановлює "системне" з'єднання, якщо брокер стає тимчасово недоступним. Однак, клієнтські з'єднання автоматично повторно не встановлюються. Якщо heartbeat-повідомлення включено, то клієнт зазвичай реагує на те, що брокер не відповідає протягом 10 секунд. У клієнтах має бути реалізована власна логіка повторного встановлення з'єднання.

Перехоплення

Події передають повідомлення про життєвий цикл STOMP-з'єднання, але не про кожне повідомлення клієнта. Програми також можуть реєструвати ChannelInterceptor для перехоплення будь-якого повідомлення у будь-якій частині ланцюжка обробки. У цьому прикладі показано, як перехоплювати вхідні повідомлення від клієнтів:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new MyChannelInterceptor());
    }
}

Користувацький ChannelInterceptor може використовувати StompHeaderAccessor або SimpMessageHeaderAccessor для отримання доступу до інформації про повідомлення, як показано в наступному прикладі:


public class MyChannelInterceptor implements ChannelInterceptor {
    @Override
    public Message preSend(Message message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getStompCommand();
        // ...
        return message;
    }
}

Програми також можуть реалізувати ExecutorChannelInterceptor, який є підінтерфейсом ChannelInterceptor зі зворотними викликами в потоці, у якому обробляються повідомлення. Хоча ChannelInterceptor викликається один раз для кожного повідомлення, надісланого в канал, ExecutorChannelInterceptor передбачає перехоплювачі у потоці кожного MessageHandler, підписаного на повідомлення з каналу.

Зверни увагу, що, як і у випадку з SessionDisconnectEvent, описаним раніше, повідомлення DISCONNECT може бути отримане від клієнта, а також може бути автоматично згенероване під час закриття сесії WebSocket. У деяких випадках перехоплювач може перехопити це повідомлення більше одного разу кожної сесії. Компоненти повинні бути ідемпотентними по відношенню до кількох подій втрати з'єднання.

Клієнт STOMP

Spring надає клієнт STOMP поверх WebSocket і клієнт STOMP поверх TCP.

Для початку можна створити та налаштувати WebSocketStompClient, як показано в наступному прикладі:


WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

У попередньому прикладі можна замінити StandardWebSocketClient на SockJsClient, оскільки він також є реалізацією WebSocketClient. SockJsClient може використовувати WebSocket або механізм передачі на основі HTTP як запасний варіант.

Далі можна встановити з'єднання та надати обробник для сесії STOMP, як показано в наступному прикладі:


String url = "ws://127.0. 0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

Коли сесія буде готова до використання, обробник отримає повідомлення, як показано в наступному прикладі:


public class MyStompSessionHandler extends StompSessionHandlerAdapter {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

Щойно буде встановлена, можна надсилати будь-які корисні дані, які серіалізуються за допомогою налаштованого MessageConverter, як показано в наступному прикладі:

session.send("/topic/something", "payload");

Ти також можеш підписатися на адреси призначення. Методи subscribe вимагають наявності обробника повідомлень для передплати та повертають дескриптор (хендл) Subscription, який ви можете використовувати для відписки. Для кожного отриманого повідомлення обробник може вказати цільовий тип Object, у який мають бути десеріалізовані корисні дані, як показано в наступному прикладі:


session.subscribe("/topic/something", new StompFrameHandler() {
    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }
    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }
});

Щоб увімкнути heartbeat-повідомлення STOMP, можна налаштувати WebSocketStompClient з використанням TaskScheduler та опціонально налаштувати інтервали передачі heartbeat-повідомлень (10 секунд для відсутності дій запису, що викличе відправлення heart і 10 секунд для відсутності дій читання, що викличе закриття з'єднання).

WebSocketStompClient надсилає heartbeat-повідомлення лише в разі бездіяльності, тобто коли не надсилаються інші повідомлення. Це може стати проблемою при використанні зовнішнього брокера, оскільки повідомлення з адресою призначення, що не є брокерською, відображають активність, але фактично не надсилаються брокеру. У цьому випадку можна налаштувати TaskScheduler при ініціалізації зовнішнього брокера, який забезпечує пересилання heartbeat-повідомлення брокеру також і в тому випадку, якщо надсилаються лише повідомлення з адресою призначення, що не є брокерською.

Якщо ти використовуєш WebSocketStompClient для тестів продуктивності, щоб імітувати тисячі клієнтів з однієї машини, подумай про вимкнення heartbeat-повідомлень, оскільки кожне з'єднання планує власні завдання, а оптимізація цього процесу для великої кількості клієнтів, що працюють на одній машині, відсутня.

Протокол STOMP також забезпечує підтримку підтвердження отримання, якщо клієнт повинен додати заголовок receipt, на який сервер відповідає кадром RECEIPT після обробки надсилання або підписки. Для забезпечення такої підтримки StompSession пропонує setAutoReceipt(boolean), який забезпечує додавання заголовка receipt при кожній подальшій події відправки або підписки. До того ж, можна вручну додати заголовок підтвердження отримання в StompHeaders. І надсилання, і підписка повертають екземпляр Receiptable, який можна використовувати для реєстрації зворотних викликів успішного та невдалого отримання. Для цієї функції необхідно налаштувати клієнта за допомогою TaskScheduler та вказанням кількості часу до закінчення терміну дії підтвердження отримання (за замовчуванням 15 секунд).

Зверни увагу, що StompSessionHandler сам є StompFrameHandler, що дозволяє йому обробляти кадри ERROR на додаток до зворотного виклику handleException, призначеного для винятків, згенерованих при обробці повідомлень, та handleTransportError, призначеного для помилок на рівні передачі, включаючи ConnectionLostException.

Область доступності протоколу WebSocket

Кожна сесія WebSocket має Map атрибутів. Map прив'язується як заголовок до вхідних повідомлень клієнта і може бути доступна з методу контролера, як показано в наступному прикладі:


@Controller
public class MyController {
    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // ...
    }
}

Можна оголосити керований Spring бін в області доступності websocket. Ти можеш впроваджувати біни, що входять до області доступності WebSocket, у контролери та будь-які перехоплювачі каналу, зареєстровані через clientInboundChannel. Зазвичай вони є одинаками, а їхній життєвий цикл довше, ніж кожна окрема WebSocket-сесія. Тому для бінів, що входять до області доступності WebSocket, необхідно використовувати режим проксі для області доступності, як показано в наступному прикладі:


@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
    @PostConstruct
    public void init() {
        // Викликається після впровадження залежностей
    }
    // ...
    @PreDestroy
    public void destroy() {
        // Викликається після впровадження залежностей
    }
}
@Controller
public class MyController {
    private final MyBean myBean;
    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }
    @MessageMapping("/action")
    public void handle() {
        // this.myBean з поточної сесії WebSocket
    }
}

Як і будь-яка кастомна область доступності, Spring ініціалізує новий екземпляр MyBean при першому зверненні до нього з контролера та зберігає його в атрибутах сесії WebSocket. Цей екземпляр згодом повертається до закінчення сесії. Для бінів, що входять до області доступності WebSocket, викликаються всі методи життєвого циклу Spring, як показано в попередніх прикладах.

Продуктивність

Коли справа доходить до продуктивності, жодного єдино правильного рішення немає. На неї впливає безліч факторів, включно з розміром та обсяг повідомлень, виконання прикладними методами роботи, що вимагає блокування, та зовнішні фактори (такі як швидкість мережі та інші деталі). Мета цього розділу — коротко описати доступні варіанти конфігурації, а також висловити деякі ідеї та поміркувати про масштабування.

У додатку для обміну повідомленнями повідомлення передаються каналами для асинхронного виконання, які підтримуються пулами потоків. Конфігурація такої програми вимагає хорошого розуміння принципів роботи каналів та потоку повідомлень.

Очевидно, що почати слід з налаштування пулів потоків, що підтримують clientInboundChannel та clientOutboundChannel. За замовчуванням обидві конфігурації налаштовані на подвоєну кількість доступних процесорів.

Якщо обробка повідомлень в анотованих методах в основному залежить від процесора, кількість потоків для clientInboundChannel повинна залишатися близькою до кількості процесорів. Якщо робота, яку вони виконують, більше пов'язана з введенням-виведенням і вимагає блокування або очікування в базі даних або іншій зовнішній системі, розмір пулу потоків, ймовірно, потрібно збільшити.

ThreadPoolExecutor має три важливі властивості: розмір основного пулу потоків, максимальний розмір пулу потоків та місткість черги для зберігання завдань, для яких немає вільних потоків.

Часто виникає плутанина: конфігурування розміру основного пулу (наприклад, 10) та максимального розміру пулу (наприклад, 20) призводить до створення пулу потоків з 10-20 потоками. Насправді, якщо залишити значення місткості за умовчанням Integer.MAX_VALUE, пул потоків ніколи не перевищить розмір основного пулу, оскільки всі додаткові завдання ставляться в чергу. Дивися javadoc про ThreadPoolExecutor, щоб дізнатися, як працюють ці властивості, і ознайомитися з різними стратегіями постановки в чергу.

Якщо клієнти знаходяться у швидкій мережі, кількість потоків повинна залишатися близькою до кількості доступних процесорів. Якщо вони повільні або мають низьку пропускну здатність, то довше споживають повідомлення та створюють навантаження на пул потоків. Тому потрібно збільшити розмір пулу потоків.

Хоча робоче навантаження для каналу clientInboundChannel можна передбачити — зрештою, воно залежить від того, чим займається програма — налаштувати канал "clientOutboundChannel" вже складніше, оскільки його робота заснована на факторах, які не залежать від застосування. З цієї причини з надсиланням повідомлень пов'язані дві додаткові властивості: sendTimeLimit та sendBufferSizeLimit. Ти можеш використовувати ці методи для того, щоб налаштувати тривалість відправлення та обсяг даних, що підлягають буферизації, при надсиланні повідомлень клієнту. Всі додаткові повідомлення, тим часом, буферизуються, і ви можете використовувати ці властивості, щоб вирішити, скільки часу має займати надсилання повідомлення і скільки даних може буферизуватися за цей час. Важливі додаткові подробиці див. у javadoc та документації до XML-схеми.

У наступному прикладі показано можливу конфігурацію:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }
    // ...
}

У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>
</beans>

Також можна використовувати конфігурацію механізму передачі WebSocket, показану раніше, для налаштування максимально допустимого розміру вхідних повідомлень STOMP. Теоретично розмір повідомлення WebSocket може бути практично необмеженим. Насправді сервери WebSocket встановлюють обмеження – наприклад, 8 Кбайт для Tomcat і 64 Кбайт для Jetty. З цієї причини клієнти STOMP (такі як webstomp-client з JavaScript та інші) розбивають великі повідомлення STOMP у досягненні розміру в 16 Кбайт і надсилають їх у вигляді кількох повідомлень WebSocket, що вимагає від сервера буферизації та повторного асемблювання.

Засоби підтримки STOMP поверх WebSocket у Spring здійснюють це, тому програми можуть конфігурувати максимальний розмір STOMP-повідомлень від розмірів повідомлень, специфічних для WebSocket сервера. Пам'ятай, що розмір повідомлення WebSocket автоматично коригується, якщо необхідно, щоб гарантовано передавати WebSocket-повідомлення розміром мінімум 16 Кбайт.

У наступному прикладі показано одну з можливих конфігурацій:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }
    // ...
}

У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker>
        <websocket:transport message-size="131072" />
            <!-- ... -->
    </websocket:message-broker>
</beans>

Важливим моментом у масштабуванні є використання кількох екземплярів додатків. Наразі не можна зробити це за допомогою простого брокера. Однак при використанні повнофункціонального брокера (наприклад, RabbitMQ) кожен екземпляр програми підключається до брокера, а повідомлення, що надсилаються одним екземпляром програми, можуть надсилатися через брокер клієнтам WebSocket, підключеним через будь-які інші екземпляри додатків.

Моніторинг

Якщо ти використовуєш анотацію @EnableWebSocketMessageBroker або <websocket:message-broker>, ключові компоненти інфраструктури автоматично збирають статистику та лічильники, які дають важливе уявлення про внутрішній стан програми. У конфігурації також оголошується бін типу WebSocketMessageBrokerStats, який збирає всю доступну інформацію в одному місці та за умовчанням реєструє її на рівні INFO раз на 30 хвилин. Цей бін може бути експортований в JMX через MBeanExporter з Spring для перегляду під час виконання (наприклад, через jconsole з JDK). У наступному списку наведено коротку інформацію:

Клієнтські сесії WebSocket
Поточна (Current)

Показує, скільки клієнтських сесій існує в даний час, з подальшою розбивкою потокової передачі даних через протокол WebSocket щодо протоколу HTTP і сесіям поллінгу через протокол SockJS.

Усього (Total)

Вказує, скільки всього сесій було встановлено.

Аварійно закрита
Збої в з'єднанні (Connect Failures)

Сесії, які були встановлені, але були закриті після того, як протягом 60 секунд не було отримано жодного повідомлення. Зазвичай це свідчить про проблеми з проксі-сервером або мережею.

Перевищено ліміт надсилання (Send Limit Exceeded)

Сесії закриваються після перевищення налаштованого часу очікування відправки або ліміту буфера відправки, що може статися з повільними клієнтами (див. попередній розділ).

Помилки передачі (Transport Errors)

Сесії закриваються після помилки передачі, наприклад, через неможливості читання або запису в WebSocket-з'єднання або HTTP-запит або відповідь.

Фрейми STOMP (STOMP Frames)

Загальна кількість оброблених кадрів CONNECT, CONNECTED і DISCONNECT, що показує, скільки клієнтів підключилося на рівні STOMP. Зверни увагу, що значення лічильника кадру DISCONNECT може бути меншим, якщо сесії закриті аварійно або якщо клієнти закриті без надсилання кадру DISCONNECT.

Ретранслятор брокера STOMP (STOMP Broker Relay)
TCP-з'єднання

Вказує, скільки TCP-з'єднань від імені клієнтських WebSocket-сесій встановлено з брокером. Значення має дорівнювати кількості клієнтських WebSocket-сесій + 1 додаткове загальне "системне" з'єднання для надсилання повідомлень з програми.

Фрейми STOMP (STOMP Frames)

Загальна кількість кадрів CONNECT, CONNECTED та DISCONNECT, переданих брокеру або отриманих від нього від імені клієнтів. Зверни увагу, що фрейм DISCONNECT надсилається брокеру незалежно від того, як було закрито клієнтську WebSocket-сесію. Тому менша кількість фреймів DISCONNECT є ознакою того, що брокер активно закриває з'єднання (можливо, через heartbeat-повідомлення, що не вчасно прийшло, недійсного вхідного кадру або іншої проблеми).

Вхідний канал клієнта

Статистика з пулу потоків, що підтримує clientInboundChannel, яка дає уявлення про стан обробки вхідних повідомлень. Завдання, що стоять у черзі, є ознакою того, що програма може занадто повільно обробляти повідомлення. Якщо є завдання, пов'язані з введенням-виведенням (наприклад, повільні запити до бази даних, HTTP-запити до стороннього REST API тощо), розглянь можливість збільшення розміру пулу потоків.

Вихідний канал клієнта (Client Outbound Channel)

Статистика з пулу потоків, що підтримує clientOutboundChannel, яка дає уявлення про працездатність широкомовної розсилки повідомлень клієнтам. Завдання, що стоять у черзі, є ознакою того, що клієнти надто повільно приймають повідомлення. Один із способів вирішення цієї проблеми – збільшити розмір пулу потоків, щоб вмістити очікувану кількість одночасних повільних клієнтів. Інший варіант – знизити обмеження на час очікування відправки та розмір буфера відправки (див. попередній розділ).

Вхідний канал клієнта

Статистика з пулу потоків планувальника завдань SockJS, який використовується для надсилання heartbeat-повідомлень. Зверни увагу, що при узгодженні heartbeat-повідомлень на рівні STOMP відправка heartbeat-повідомлень для SockJS вимкнена.

Тестування

Існує два основних підходи до тестування додатків, коли ти використовуєш підтримку STOMP поверх WebSocket у Spring. Перший полягає в написанні тестів на стороні сервера для перевірки функціональності контролерів та їх анотованих методів обробки повідомлень. Другий — це написання повних наскрізних тестів, які включають запуск клієнта і сервера.

Ці два підходи не є взаємовиключними. Навпаки, кожному їх відведено своє місце у загальній стратегії тестування. Тести на стороні сервера більш вузькоспрямовані, їх легше писати та супроводжувати. З іншого боку, наскрізні інтеграційні тести є більш повними і охоплюють набагато більше, але вони також складніші в написанні та супроводі.

Найпростішою формою тестування на стороні сервера є написання модульних тестів контролера. Однак це недостатньо практично, оскільки багато з того, що робить контролер, залежить від його анотацій. Чисті модульні тести просто не зможуть протестувати його роботу.

В ідеалі, контролери, що тестуються, повинні викликатися в тому вигляді, в якому вони знаходяться під час виконання програми, подібно до підходу до тестування контролерів, що обробляють HTTP-запити, за допомогою фреймворка Spring MVC Test — без запуску контейнера сервлетів, але з використанням Spring Framework для виклику анотованих контролерів. Як і у випадку зі Spring MVC Test, тут у вас є дві можливі альтернативи: використовувати "контекстно-орієнтовану" або "автономну" конфігурацію:

  • Завантаж актуальну конфігурацію Spring за допомогою фреймворку Spring TestContext, впровадь clientInboundChannel як тестове поле і використовуй його для надсилання повідомлень, які будуть оброблятися методами контролера.

  • Вручну встанови мінімальну інфраструктуру фреймворку Spring, необхідну для виклику контролерів (а саме SimpAnnotationMethodMessageHandler) і передавай повідомлення для контролерів безпосередньо йому.

Обидва ці сценарії налаштування продемонстровані в тестах прикладу програми для портфеля акцій.

Другий підхід полягає у створенні наскрізних інтеграційних тестів. Для цього необхідно запустити WebSocket-сервер у вбудованому режимі та підключитися до нього як WebSocket-клієнт, який надсилає WebSocket-повідомлення, що містять STOMP-фрейми. Тести прикладу програми для портфеля акцій також застосовують цей підхід, використовуючи Tomcat як вбудований сервер WebSocket і простого клієнта STOMP для цілей тестування.