Протокол WebSocket визначає два типи повідомлень (текстові та двійкові), але їх зміст не визначається. Протокол визначає клієнту і серверу механізм для узгодження субпротоколу (тобто протоколу обміну повідомленнями вищого рівня), який використовуватиметься поверх WebSocket визначення того, які повідомлення кожен із новачків може відправляти, який їх формат, зміст кожного повідомлення тощо. Використання субпротоколу необов'язкове, але в будь-якому випадку між клієнтом та сервером має бути узгоджений деякий протокол, який визначає зміст повідомлення.

Короткий опис

Протокол STOMP (Simple Text Oriented Messaging Protocol) спочатку був створений для скриптових мов (таких як Ruby, Python і Perl) для підключення до корпоративних брокерів повідомлень. Він призначений для роботи з простим підмножиною шаблонів обміну повідомленнями, що часто використовуються. STOMP можна використовувати через будь-який надійний двонаправлений потоковий мережевий протокол, такий як TCP та WebSocket. Хоча STOMP є текстово-орієнтованим протоколом, корисні дані повідомлення можуть мати як текстову, так і двійкову форму.

STOMP – це протокол на основі кадрів, кадри якого змодельовані на основі HTTP. У наступному лістингу показано структуру кадру STOMP:

COMMAND header1:value1 header2: value2 Body^@

Клієнти можуть використовувати команди SEND або SUBSCRIBE для надсилання або підписки на повідомлення разом із заголовком destination, який описує, про що повідомлення і хто має його отримати. Це дозволяє створити простий механізм публікації-підписки, який можна використовувати для надсилання повідомлень через брокер іншим підключеним клієнтам або для надсилання повідомлень на сервер із запитом на виконання певної роботи.

Якщо ви використовуєте засоби підтримки протоколу STOMP від Spring, додаток WebSocket в Spring виступає як брокер STOMP для клієнтів. Повідомлення надсилаються в методи обробки повідомлень, позначені анотацією @Controller, або в простий брокер зі зберіганням в оперативній пам'яті, який відстежує підписки і розсилає повідомлення користувачам, що підписалися. Також можна налаштувати Spring на роботу зі спеціальним STOM-брокером (таким як RabbitMQ, ActiveMQ та іншими) для фактичного широкомовного розсилання повідомлень. У цьому випадку Spring підтримує встановлення TCP-з'єднань з брокером, ретранслює йому повідомлення та передає повідомлення від нього підключеним WebSocket-клієнтам. Таким чином, веб-додатки Spring можуть використовувати уніфіковану безпеку на основі HTTP-протоколу, загальну валідацію та знайому модель програмування для обробки повідомлень. періодично (наприклад, через заплановане завдання, яке надсилає повідомлення через SimpMessagingTemplate брокеру):

SUBSCRIBE id:sub-1 destination:/topic/price.stock.* ^@

У наступному прикладі показаний клієнт, що відправляє торгову заявку, яку сервер може обробити за допомогою методу з анотацією @MessageMapping:

SEND destination:/queue/trade
        content-type:application/json content-length:44 {"action":"BUY","ticker":"MMM","shares",44}^ @

Після виконання сервер може передати клієнту повідомлення про підтвердження угоди та детальну інформацію.

Значення адреси призначення навмисно залишено непрозорим у специфікації STOMP. Це може бути будь-який рядок, а сервери STOMP повністю визначають семантику та синтаксис адрес призначення, які вони підтримують. Однак дуже часто адреси призначення є рядками, подібними до шляхів, де /topic/.. передбачає публікацію-підписку (один до багатьох), а /queue/ передбачає обмін повідомленнями " точка-точка" (один до одного).

Сервери STOMP можуть використовувати команду MESSAGE для розсилки повідомлень усім передплатникам. У наступному прикладі показано, як сервер відправляє котирування акцій клієнту, що підписався:

MESSAGE message-id:nxahklf6-1 subscription:sub-1 destination:/topic/price.stock.MMM {"ticker":"MMM","price":129.45}^@

Сервер не може надсилати непотрібні повідомлення. Всі повідомлення від сервера повинні бути відповіддю на певну підписку клієнта, а заголовок subscription-id повідомлення сервера повинен збігатися із заголовком id підписки клієнта.

Попереднє коротке опис покликаний забезпечити базове розуміння протоколу STOMP. Ми рекомендуємо повністю ознайомитися з специфікацією протоколу.

Переваги

Використання STOMP як субпротокол дозволяє Spring Framework і Spring Security забезпечити більш повнофункціональну модель програмування порівняно з використанням сирих веб-сокетів протоколу WebSocket. Те саме можна сказати і про порівняння HTTP з сирим TCP і про те, як це дозволяє Spring MVC та іншим веб-фреймворкам забезпечувати багату функціональність. Нижче наведено перелік переваг:

  • Немає вигадувати кастомний протокол обміну повідомленнями та формат повідомлень.

  • Клієнти STOMP, включаючи Java-клієнт , доступні в Spring Framework.

  • Можна (опційно) використовувати брокери повідомлень (такі як RabbitMQ, ActiveMQ та інші) для керування підписками та широкомовними повідомленнями.

  • Логіку програми можна організувати в будь-якій кількості екземплярів @Controller, а повідомлення можна маршрутизувати до них на основі заголовка адреси призначення STOMP на відміну від обробки сирих WebSocket-повідомлень за допомогою єдиного WebSocketHandler для цього з'єднання.

  • Можна використовувати Spring Security для захисту повідомлень на основі адрес STOMP і типів повідомлень.

Активація STOMP

Підтримка STOMP поверх WebSocket доступна в модулях spring-messaging та spring-websocket. Як тільки у вас з'являться ці залежності, можна буде відкрити кінцеві точки STOMP поверх WebSocket за допомогою запасного варіанту через протокол SockJS, як показано в наступному прикладі:

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/portfolio").wiS;  } @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.setApplicationDestinationPrefixes("/app");  config.enableSimpleBroker("/topic", "/queue");  } }
  1. /portfolio – це URL-адреса протоколу HTTP для кінцевої точки, до якої клієнт WebSocket (або SockJS) повинен підключитися для підтвердження встановлення зв'язку за протоколом WebSocket.
  2. Повідомлення STOMP, заголовок адреси призначення яких починається з /app, направляються до методів, анотованих @MessageMapping у класах з анотацією @Controller.
  3. Використовуємо вбудований брокер повідомлень для передплати та широкомовної розсилки та надсилаємо повідомлення, заголовок адреси призначення яких починається з /topic `або `/queue в брокер.

У цьому прикладі показано XML-еквівалент конфігурації з попереднього прикладу:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:websocket ="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring- beans.xsd http://www.springframework.org/schema/websocket https://www.springframework.org/schema/websocket/spring-websocket.xsd"> <websocket:message-broker application-destination-prefix="/app"> <websocket:stomp-endpoint path="/portfolio"> <websocket:sockjs/> </websocket:stomp-endpoint> <websocket:simple-broker prefix="/topic, /queue"/> </websocket:message-broker> </beans>
У разі вбудованого простого брокера префікси /topic та /queue не мають особливого значення. Вони є лише угодою, що дозволяє провести розмежування між обміном повідомленнями на кшталт "видавець-передплатник" та "крапка-крапка" (тобто між багатьма передплатниками та одним споживачем). Якщо ви використовуєте зовнішній брокер, зверніться до сторінки STOMP брокера, щоб зрозуміти, які адреси призначення та префікси STOMP він підтримує.

Для підключення з браузера, при використанні SockJS, можна використовувати sockjs-client. Для STOMP багато програм використовували бібліотеку jmesnil/stomp-websocket (також відому як stomp.js), яка є повнофункціональною та використовувалась у виробництві протягом багатьох років, але більше не підтримується. В даний час JSteunou/webstomp-client є наступником цієї бібліотеки, що найбільш активно підтримується і розвивається. Наступний приклад коду на ній:

var socket = new SockJS("/spring-websocket-portfolio/portfolio"); var stompClient = webstomp.over(socket); stompClient.connect({}, function(frame) { }

Крім того, якщо підключення здійснюється через WebSocket (без SockJS), то можна використовувати наступний код :

var socket=new WebSocket ("/spring-websocket-portfolio/portfolio"); var stompClient = Stomp.over(socket); stompClient.connect({}, function(frame) { }

Зверніть увагу, що в попередньому прикладі stompClient не потрібно вказувати заголовки login та passcode Навіть якби це було зроблено, вони були б проігноровані (або, швидше, перевизначені) на стороні сервера.

Додаткові приклади коду див.:

Сервер WebSocket

Для налаштування базового сервера WebSocket застосовується інформація з розділу "Серверна конфігурація". У випадку Jetty, однак, необхідно встановити HandshakeHandler та WebSocketPolicy через StompEndpointRegistry:

@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { .addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler()); } @Bean public DefaultHandshakeHandler handshakeHandler() { WebSocketPolicy policy = новий WebSocketPolicy(WebSocketBehavior.SERVER); policy.setInputBufferSize(8192); policy.setIdleTimeout(600000); return new DefaultHandshakeHandler( new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)))); } }

Потік повідомлень

Якщо кінцева точка STOMP відкрита, програма Spring стає брокером STOMP для підключених клієнтів. У цьому розділі описується потік повідомлень на стороні сервера.

Модуль spring-messaging містить фундаментальні засоби підтримки програм для обміну повідомленнями, які зародилися ще в проекті Spring Integration, а потім були вилучені та включені до Spring Framework для ширшого використання в багатьох проектах Spring та сценаріях застосування. У наступному списку коротко описано деякі з доступних абстракцій обміну повідомленнями:

  • Message: Просте подання повідомлення, включаючи заголовки та корисні дані.

  • MessageHandler : Угода про обробку повідомлення.

  • MessageChannel: Угода про надсилання повідомлення, що забезпечує вільну взаємодію між відправниками та одержувачами.

  • SubscribableChannel: MessageChannel з передплатниками MessageHandler.

  • ExecutorSubscribableChannel: SubscribableChannel, який використовує для доставки повідомлень.

Як Java-конфігурація (тобто анотація @EnableWebSocketMessageBroker), так і конфігурація простору імен XML (тобто є <websocket:message-broker>) використовують попередні компоненти для асемблювання робочого процесу передачі повідомлення. На наступній схемі показані компоненти, що використовуються при активації простого вбудованого брокера повідомлень:

/cdn.javarush.com/images/article/5eb30f69-aee7-436e-8736-917c7014b67f/800.jpeg" alt="">

На попередній схемі показано три канали повідомлень:

  • clientInboundChannel: Для надсилання повідомлень, отриманих від клієнтів WebSocket.

  • clientOutboundChannel: Для надсилання серверних повідомлень клієнтам WebSocket.

  • brokerChannel: Для надсилання повідомлень брокеру повідомлень з коду програми на стороні сервера.

На наступній схемі показані компоненти, які використовуються, якщо зовнішній брокер (наприклад, RabbitMQ) налаштований для керування підписками та широкомовною розсилкою повідомлень:

попередніми схемами полягає у використанні "ретранслятора брокера (broker relay)" для передачі повідомлень до зовнішнього STOMP-брокера поверх TCP і для низхідної передачі повідомлень від брокера до клієнтів, що підписалися.

Якщо повідомлення приймаються від WebSocket-з'єднання, вони декодуються в STOMP-фрейми, перетворюються на представлення Message з Spring і відправляються clientInboundChannel для подальшої обробки. Наприклад, STOMP-повідомлення, заголовки адрес призначення яких починаються з /app, можуть бути маршрутизовані до методів з анотацією @MessageMapping в анотованих контролерах, у той час як повідомлення /topic та /queue можуть бути направлені безпосередньо в брокер повідомлень.

Анотований @Controller, який обробляє STOMP-повідомлення від клієнта, може надіслати повідомлення брокеру повідомлень через brokerChannel, а брокер надсилає повідомлення відповідним передплатникам через clientOutboundChannel. Той самий контролер може робити те саме у відповідь на HTTP-запити, тому клієнт може виконати HTTP-метод POST, а потім метод з анотацією @PostMapping може надіслати повідомлення брокеру повідомлень для розсилки передплатникам.

>

Ми можемо простежити цей процес на простому прикладі. Розглянемо наступний приклад, у якому налаштовується сервер:

 @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/portfolio") } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app"); registry.enableSimpleBroker("/topic"); } } @Controller public class GreetingController { @MessageMapping("/greeting") public String handle(String greeting) { return "[" + getTimestamp() + ": " + greeting; } }

Попередній приклад підтримує наступний потік:

  1. Клієнт підключається до http://localhost:8080/portfolio і, як тільки встановлюється WebSocket-з'єднання, по ньому починають передаватися фрейми STOMP.

  2. Клієнт відправляє кадр SUBSCRIBE із заголовком призначення /topic/greeting. Після отримання та декодування повідомлення відправляється в clientInboundChannel, а потім маршрутизується до брокера повідомлень, який зберігає підписку клієнта.

  3. Клієнт відправляє кадр SEND до /app/greeting. Префікс /app допомагає маршрутизувати його до анотованих контролерів. Після видалення префікса /app частина призначення /greeting, що залишилася, відображається на метод, анотований @MessageMapping, у GreetingController.

  4. Значення, що повертається GreetingController, перетворюється на Message з Spring з корисними даними, заснованими на значенні, що повертається, і заголовком призначення за замовчуванням /topic/greeting (отриманим із вхідного призначення із заміною /app на /topic). Отримане повідомлення надсилається до brokerChannel та обробляється брокером повідомлень.

  5. Брокер повідомлень знаходить усіх відповідних передплатників і відправляє кожному з них фрейм MESSAGE каналом clientOutboundChannel , звідки повідомлення кодуються як фрейми STOMP і відправляються по з'єднанню на основі протоколу WebSocket. значень, що повертаються.

    Анотовані контролери

    Програми можуть використовувати класи, позначені анотацією @Controller, для обробки повідомлень від клієнтів. Такі класи можуть оголошувати методи з анотацією @MessageMapping, @SubscribeMapping та @ExceptionHandler, як описано в наступних темах:

    • @MessageMapping

    • @SubscribeMapping

    • @MessageExceptionHandler

    @MessageMapping

    Можна використовувати інструкцію @MessageMapping для анотування методів, що маршрутизують повідомлення на основі їхнього призначення. Вона підтримується як у рівні методу, і лише на рівні типу. На рівні типів анотація @MessageMapping використовується для вираження загальних відображень для всіх методів контролера.

    За замовчуванням значення відображення є шаблонами шляхів у стилі Ant (наприклад, /thing* , /thing/**), включаючи підтримку змінних шаблонів (наприклад, /thing/{id}). На значення можна посилатися через аргументи методу, позначеного інструкцією @DestinationVariable. Програми також можуть переключитися на угоду про розділення точкою призначення повідомлення для відображення.

    Аргументи методу, що підтримуються

    У наступній таблиці описані аргументи методу:

    Аргумент методу Опис

    Message

    Забезпечує доступ до повного повідомлення.

    >

    MessageHeaders

    Забезпечує доступ до заголовків усередині Message.

    MessageHeaderAccessor, SimpMessageHeaderAccessor та StompHeaderAccessor

    Забезпечують доступ до заголовків через типізовані методи доступу.

    @ Payload

    Забезпечує доступ до корисних даних повідомлення, перетворених (наприклад, з JSON) за допомогою налаштованого MessageConverter.

    Наявність цієї анотації не є обов'язковою, оскільки за умовчанням вона передбачена, якщо не збігається жоден інший аргумент.

    Ви можете анотувати аргументи корисних даних за допомогою @javax.validation.Valid або @Validated із Spring, щоб аргументи корисних даних валідувалися автоматично.

    @Header

    Забезпечує доступ до певного значення заголовка – разом із перетворенням типу за допомогою org.springframework.core.convert.converter.Converter, якщо це необхідно.

    @Headers

    Забезпечує доступ до всіх заголовків у повідомленні. Цей аргумент має бути присвоєним для java.util.Map.

    @DestinationVariable

    Забезпечує доступ до змінних шаблонів, вилучених із призначення повідомлення. Значення при необхідності перетворюються відповідно до оголошеного типу аргументу методу.

    java.security.Principal

    Відображає користувача, який увійшов до системи під час підтвердження встановлення зв'язку за протоколом WebSocket через протокол HTTP.

    Значення, що повертаються

    За умовчанням повертається значення методу, позначеного анотацією @MessageMapping, серіалізується в корисні дані через відповідний MessageConverter і відправляється у вигляді Message за brokerChannel, звідки воно розсилається передплатникам. Призначення вихідного повідомлення таке саме, як і вхідного, але з префіксом /topic.

    Можна використовувати анотації @SendTo та @SendToUser для налаштування вихідного повідомлення. Анотація @SendTo використовується для налаштування цільового призначення або для вказівки кількох призначень. Анотація @SendToUser використовується для надсилання вихідного повідомлення тільки користувачу, пов'язаному з вхідним повідомленням.

    Можна одночасно використовувати @SendTo та @SendToUser в тому самому методі, причому обидва методи підтримуються на рівні класу, і в цьому випадку вони будуть діяти за умовчанням для методів у класі. Однак пам'ятайте, що будь-яке анотування за допомогою @SendTo або @SendToUser на рівні методу перевизначає будь-які подібні анотації на рівні класу.

    Повідомлення можуть оброблятися асинхронно, а метод з анотацією @MessageMapping може повертати ListenableFuture, CompletableFuture або CompletionStage.

    Зверніть увага, що анотації @SendTo і @SendToUser – це просто зручний захід, який дорівнює використання SimpMessagingTemplate для відправки повідомлень. При необхідності, у більш складних сценаріях, методи, позначені анотацією @MessageMapping, можуть відкотитися до використання SimpMessagingTemplate безпосередньо. Це може статися замість або, можливо, на додаток до повернення значення. >@MessageMapping, але звужує відображення лише до повідомлень передплати. Вона підтримує самі аргументи методу, як і інструкція @MessageMapping. Однак у разі значення, що повертається, за умовчанням, повідомлення відправляється безпосередньо клієнту (через clientOutboundChannel, у відповідь на підписку), а не брокеру (через brokerChannel, у вигляді широкомовної розсилки за відповідним підписки). Додавання анотації @SendTo або анотації @SendToUser перевизначає цю логіку роботи і натомість надсилає повідомлення брокеру.

    У яких випадках це практично? Припустимо, що брокер відображено на /topic та /queue, а контролери додатків відображені на /app. За такої конфігурації брокер зберігає всі підписки на /topic та /queue, призначені для повторних розсилок, і додаток немає необхідності брати в цьому участь. Клієнт також може підписатися на деяку адресу призначення /app, а контролер може повернути значення у відповідь на цю підписку без участі брокера, не зберігаючи і не використовуючи передплату повторно (фактично одноразовий обмін на кшталт "запит-відповідь" ). Одним із варіантів використання цього механізму є заповнення інтерфейсу користувача початковими даними при запуску.

    У яких випадках це непрактично? Не намагайтеся відображати брокер і контролери з тим самим префіксом адреси призначення, якщо з якоїсь причини вам не потрібно, щоб обидва незалежно обробляли повідомлення, включаючи підписки. Вхідні повідомлення обробляються паралельно. Немає жодних гарантій щодо того, що першим опрацює це повідомлення – брокер або контролер. Якщо метою є отримання повідомлення про те, що передплата збережена і готова до широкомовної розсилки, клієнт повинен запитати підтвердження отримання, якщо сервер підтримує його (простий брокер не підтримує). Наприклад, за допомогою клієнта STOMP Java можна зробити таке, щоб додати підтвердження отримання:

    @Autowired private TaskScheduler messageBrokerTaskScheduler; // Під час ініціалізації... stompClient.setTaskScheduler(this.messageBrokerTaskScheduler); // Підписка... StompHeaders headers = new StompHeaders(); headers.setDestination("/topic/..."); headers.setReceipt("r1"); FrameHandler handler = ...; stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> { // Передплата готова... });

    На стороні сервера можна зареєструвати ExecutorChannelInterceptor для brokerChannel та реалізувати метод afterMessageHandled, який викликається після обробки повідомлень, включаючи підписки.

    @MessageExceptionHandler

    Програма може використовувати методи, анотовані @MessageExceptionHandler, для обробки винятків із методів, анотованих @MessageMapping. Можна оголосити винятки в самій анотації або через аргумент методу, якщо необхідно отримати доступ до екземпляра винятку. У наступному прикладі виняток оголошується через аргумент методу:

     @Controller public class MyController { // ... @MessageExceptionHandler public ApplicationError handleException(MyException exception) { // ... return appError; } }

    Методи з анотацією @MessageExceptionHandler підтримують гнучкі сигнатури методів і ті ж типи аргументів методу і значення, що повертаються, що і методи з анотацією @MessageMapping.

    Зазвичай методи, позначені анотацією @MessageExceptionHandler, застосовуються в межах класу з анотацією @Controller ( або ієрархії класів), у якому вони оголошені. Якщо вам потрібно, щоб такі методи застосовувалися більш глобально (у всіх контролерах), можна оголосити їх у класі, позначеному анотацією @ControllerAdvice. Це можна порівняти з аналогічними засобами підтримки, доступними в Spring MVC.

    Надсилання повідомлень

    Що якщо вам необхідно надсилати повідомлення підключеним клієнтам з будь-якої частини програми? Будь-який компонент програми може надсилати повідомлення через brokerChannel. Найпростіший спосіб зробити це – впровадити SimpMessagingTemplate та використовувати його для надсилання повідомлень. Як правило, він впроваджується за типом, як це показано в наступному прикладі:

    @Controller public class GreetingController { private SimpMessagingTemplate template; @Autowired public GreetingController(SimpMessagingTemplate template) { this.template = template; } @RequestMapping(path="/greetings", method=POST) public void greet(String greeting) { String text = "[" + getTimestamp() + "]:" + greeting; this.template.convertAndSend("/topic/greetings", text); } }

    Однак також можна визначити його на ім'я(brokerMessagingTemplate), якщо існує інший бін того ж типу.

    >

    Простий брокер

    Вбудований простий брокер повідомлень обробляє запити на підписку від клієнтів, зберігає їх у пам'яті та розсилає повідомлення підключеним клієнтам з відповідними адресами призначення. Брокер підтримує шляхи адрес призначення, включаючи підписки на шаблони адрес призначення в стилі Ant.

    Програми також можуть використовувати адреси призначення, розділені точками (а не косою рисою) ).

    Якщо налаштований планувальник завдань, простий брокер буде підтримувати heartbeat -повідомлення STOMP. Щоб налаштувати планувальник, можна оголосити свій власний бін TaskScheduler і встановити його через MessageBrokerRegistry. Крім того, можна використовувати той, який автоматично оголошується у вбудованій конфігурації WebSocket, проте тоді знадобиться анотація @Lazy, щоб уникнути циклічності між вбудованою конфігурацією WebSocket та вашим WebSocketMessageBrokerConfigurer. Наприклад:

    @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { private TaskScheduler messageBrokerTaskScheduler; @Autowired public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) { this.messageBrokerTaskScheduler = taskScheduler; } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/queue/", "/topic/") .setHeartbeatValue(new long[] {10000, 20000B) ; // ... } }

    Зовнішній брокер

    Простий брокер відмінно підходить для початку роботи, але підтримує тільки частину команд STOMP ( він не підтримує символи ack, підтвердження отримання та деякі інші функції), використовує простий цикл надсилання повідомлень і не підходить для кластеризації. Як варіант, можна модернізувати програми для повнофункціонального брокера повідомлень.

    Зверніться до документації по STOMP для вибраного вами брокера повідомлень (наприклад, RabbitMQ, ActiveMQ та інші), встановіть брокер і запустіть його з активованою підтримкою STOMP. Потім можна активувати ретранслятор брокера STOMP (замість простого брокера) у конфігурації Spring.

    Наступний приклад конфігурації передбачає повнофункціональний брокер:

    @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoint folio ").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableStompBrokerRelay("/topic", "/queue"); registry.setApplicationDestinationPrefixes("/app"); } }

    У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:

    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http://www.springframework. org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket https://www.springframework.org/schema/websocket /spring-websocket.xsd"> <websocket:message-broker application-destination-prefix="/app"> <websocket:stomp-endpoint path="/portfolio" /> <websocket:sockjs/> </websocket:stomp-endpoint> <websocket:stomp-broker-relay prefix="/topic,/queue" /> </websocket:message-broker> </beans>

    Ретранслятор брокера STOMP у попередній конфігурації є MessageHandler з Spring, який обробляє повідомлення, пересилаючи їх зовнішньому брокеру повідомлень. Для цього він встановлює TCP-з'єднання з брокером, направляє йому всі повідомлення, а потім пересилає всі повідомлення, отримані від брокера, клієнтам через WebSocket-сесії. По суті, він діє як "ретранслятор", що пересилає повідомлення в обох напрямках. Додайте в проект залежності io.projectreactor.netty:reactor -netty та io.netty:netty-all для керування TCP-з'єднаннями.

    Більше того, компоненти додатків (такі як методи обробки HTTP-запитів, бізнес- сервіси та інші) також можуть надсилати повідомлення ретранслятору брокера.

    По суті, ретранслятор брокера забезпечує надійне та масштабоване розсилання повідомлень.

    Підключення до брокера

    Ретранслятор брокера STOMP підтримує єдине "системне" TCP-з'єднання з брокером. Це з'єднання використовується виключно для повідомлень, що надходять від програми на стороні сервера, але не для отримання повідомлень. Можна налаштувати облікові дані STOMP (тобто заголовки login та passcode у кадрі STOMP) для цього з'єднання. Це буде відображено як у просторі імен XML, так і в Java-конфігурації у вигляді властивостей systemLogin та systemPasscode зі значеннями за промовчанням guest та guest.

    Ретранслятор брокера STOMP також створює окреме TCP-з'єднання для кожного підключеного клієнта WebSocket. Можна налаштувати облікові дані STOMP, які використовуються для всіх з'єднань TCP, створених від імені клієнтів. Це буде відображено як у просторі імен XML, так і в Java-конфігурації у вигляді властивостей clientLogin та clientPasscode зі значеннями за промовчанням guest та guest.

    Ретранслятор брокера STOMP завжди встановлює заголовки login та passcode у кожному кадрі CONNECT, який він пересилає брокеру від імені клієнтів. Тому клієнтам WebSocket не потрібно встановлювати ці заголовки. Вони ігноруються. Клієнти WebSocket повинні вдаватися до HTTP-аутентифікації для захисту кінцевої точки WebSocket та встановлення особистості клієнта. Можна налаштувати інтервали надсилання та отримання heartbeat-повідомлень (за замовчуванням 10 секунд). Якщо зв'язок з брокером буде втрачено, ретранслятор брокера продовжуватиме намагатися відновити з'єднання кожні 5 секунд доти, доки це не вдасться зробити.

    Будь-який бін Spring може реалізувати ApplicationListener<BrokerAvailabilityEvent> для отримання повідомлень, якщо "системне" з'єднання з брокером втрачено і відновлено. Наприклад, служба Stock Quote, що розсилає котирування акцій, може припинити спроби відправлення повідомлень за відсутності активного "системного" з'єднання. одному й тому ж хосту та порту. Якщо потрібно надавати кілька адрес при кожній спробі підключення, можна настроїти постачальника адрес замість фіксованого хоста і порту. У цьому прикладі показано, як це зробити:

     @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { // ... @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableStompBroker ient(createTcpClient() ); registry.setApplicationDestinationPrefixes("/app"); } private ReactorNettyTcpClient<byte[]> createTcpClient() { return new ReactorNettyTcpClient<>( client -> client.addressSupplier(() -> ... ), new StompReactorNettyCodec()); } }

    Ви також можете налаштувати ретранслятор брокера STOMP за допомогою властивості virtualHost. Значення цієї властивості встановлюється як заголовок host кожного кадру CONNECT і може бути корисним (наприклад, у хмарному оточенні, де фактичний хост, з яким встановлюється TCP-з'єднання, відрізняється від хоста , що надає хмарний STOMP-сервіс).

    Точки як роздільники

    Якщо повідомлення надсилаються в методи, анотовані @MessageMapping, вони зіставляються з AntPathMatcher. За промовчанням очікується, що шаблони будуть використовувати косу межу (/) як роздільник. Це хороша угода для веб-застосунків, аналогічна URL-адресам протоколу HTTP. Однак якщо ви більше звикли до угод для обміну повідомленнями, то можете перейти на використання точки (.) як роздільник.

    У наступному прикладі показано, як це зробити за допомогою Java- конфігурації:

    @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { // ... @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setPathMatcher(new AntPathMatcher(".")); registry.enableStompBrokerRelay("/queue", "/topic"); registry.setApplicationDestinationPrefixes("/app"); } }

    У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:

    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http://www.springframework. org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket https://www.springframework.org/schema/websocket /spring-websocket.xsd"> <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher"> <websocket:stomp-endpoint path="/stomp"/> <websocket:stomp-broker-relay prefix="/topic,/queue" /> </websocket:message-broker> <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher"> <constructor-arg index="0" value="."/> </bean> </beans>

    Після цього контролер зможе використовувати точку (.) як роздільник у методах, помічених анотацією @MessageMapping, як показано в наступному прикладі:

    @Controller @MessageMapping("red") public class RedController { @MessageMapping("blue.{green}") public void handleGreen(@DestinationVariable String green) { // ... } }

    Тепер клієнт може надіслати повідомлення на /app/red.blue.green123.

    У попередньому прикладі ми не змінювали префікси для "ретранслятора брокера", тому що вони залежать від зовнішнього брокера повідомлень. Зверніться до сторінок документації STOMP, присвячених використаному вами брокеру, щоб дізнатися, які угоди він підтримує для заголовка адрес призначення.

    З іншого боку, "простий брокер" покладається на налаштований PathMatcher, тому, якщо ви перемкнете роздільник, ця зміна також застосовується до брокера і до того, як брокер зіставляє адреси призначення з повідомлення з шаблонами у підписках.

    Аутентифікація

    Кожен сеанс обміну повідомленнями STOMP WebSocket починається з запиту HTTP. Це може бути запит на перехід до веб-сокетів протоколу WebSocket (тобто підтвердження встановлення зв'язку за протоколом WebSocket) або, у разі запасних варіантів через протокол SockJS, серія HTTP-запитів на механізми передачі для SockJS.

    Багато веб-застосунки вже мають засоби автентифікації та авторизації для захисту HTTP-запитів. Як правило, користувач аутентифікується через Spring Security за допомогою будь-якого механізму, наприклад, сторінки входу в систему, базової HTTP-автентифікації або іншим способом. Контекст безпеки для автентифікованого користувача зберігається в HTTP-сесії і зв'язується з наступними запитами в тій же сесії на основі cookie. правило, вже існує автентифікований користувач, доступний через HttpServletRequest#getUserPrincipal(). Spring автоматично пов'язує цього користувача зі створеною для нього сесією WebSocket або SockJS і, згодом, з усіма повідомленнями STOMP, що передаються через цю сесію через заголовок користувача. , що вона вже здійснює для забезпечення безпеки. Користувач аутентифікується на рівні HTTP-запиту за допомогою контексту безпеки, який підтримується на рівні HTTP-сесії на основі cookie (яка потім зв'язується з сесіями WebSocket або SockJS, створеними для цього користувача), внаслідок чого заголовок користувача додається до кожного Message , що проходить через програму.

    Протокол STOMP дійсно має заголовки login і passcode у кадрі CONNECT. Вони спочатку були розроблені для використання STOMP поверх TCP і необхідні для нього. Однак, у разі використання STOMP поверх WebSocket, за замовчуванням, Spring ігнорує заголовки автентифікації на рівні протоколу STOMP, і припускає, що користувач вже автентифікований на рівні HTTP-механізмів передачі. Передбачається, що сесія WebSocket або SockJS міститиме автентифікованого користувача.

    Аутентифікація за допомогою токенів

    Проект Spring Security OAuth забезпечує підтримку безпеки на основі токенів, включаючи JSON Web Token (JWT). Ви можете використовувати його як механізм автентифікації у веб-додатках, включаючи STOMP поверх WebSocket, як описано в попередньому розділі (тобто для збереження ідентичності через сесію на основі cookie).

    В той же час сесії на основі cookie не завжди підходять краще (наприклад, у додатках, які не підтримують сесію на стороні сервера, або в мобільних додатках, де зазвичай використовуються заголовки для автентифікації).

    Протокол WebSocket, RFC 6455 "не передбачає жодного конкретного способу, яким сервери можуть аутентифікувати клієнтів під час підтвердження встановлення зв'язку за протоколом WebSocket". Насправді, браузерні клієнти можуть використовувати лише стандартні заголовки аутентифікації (тобто базову аутентифікацію HTTP) або файли cookie, але не можуть (наприклад) надавати кастомні заголовки. Аналогічно, клієнт SockJS на JavaScript не має можливості надсилати HTTP-заголовки за транспортними запитами SockJS. Див. sockjs-client issue 196.Натомість він дозволяє відправляти параметри запиту, які ви можете використовувати для відправки токена, але в цьому є свої недоліки (наприклад, токен може бути випадково зареєстрований разом з URL-адресою в журналах сервера).

    Попередні обмеження відносяться до клієнтів на базі браузера і не поширюються на клієнт STOMP на базі Java у Spring, який підтримує відправлення заголовків із запитами по протоколах WebSocket та SockJS.

    Тому для додатків, в яких потрібно уникнути використання файлів cookie, може не знайти хороших альтернатив для аутентифікації на рівні протоколу HTTP. У такому разі замість використання файлів cookie можна віддати перевагу аутентифікації за допомогою заголовків на рівні протоколу обміну повідомленнями STOMP. Для цього необхідно виконати дві прості дії:

    1. Використовуйте клієнт STOMP для передачі заголовків аутентифікації під час підключення.

    2. Опрацюйте заголовки аутентифікації з за допомогою ChannelInterceptor.

    У наступному прикладі використовується конфігурація на стороні сервера для реєстрації кастомного перехоплювача аутентифікації. Зверніть увагу, що перехоплювачу необхідно лише пройти автентифікацію та встановити заголовок користувача в Message виду CONNECT. Spring відзначає та зберігає автентифікованого користувача та пов'язує його з наступними повідомленнями STOMP у тій же сесії. У цьому прикладі показано, як зареєструвати кастомний перехоплювач аутентифікації:

    @Configuration @EnableWebSocketMessageBroker public class MyConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(? <?> message , MessageChannel channel) { Accessor = StompHeaderAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (StompCommand.CONNECT.equals(accessor.getCommand())) { Authentication user = ... ; accessor.setUser(user); } return message; } }); } }

    Зверніть увагу також, що при використанні авторизації Spring Security для повідомлень в даний час необхідно переконатися, що конфігурація ChannelInterceptor для автентифікації перебуває в порядку перед конфігурацією Spring Security. Це найкраще зробити, оголосивши користувальницький перехоплювач у власній реалізації WebSocketMessageBrokerConfigurer, яка позначена інструкцією @Order(Ordered.HIGHEST_PRECEDENCE + 99).

    Авторизація

    Spring Security передбачає авторизацію за субпротоколом WebSocket, який використовує ChannelInterceptor для авторизації повідомлень на основі заголовка користувача. Крім того, Spring Session забезпечує інтеграцію протоколу WebSocket, яка гарантує, що інтеграцію протоколу WebSocket що HTTP-сесія користувача не закінчиться, поки WebSocket-сесія все ще активна. розпізнають адреси призначення з префіксом /user/. Наприклад, клієнт може передплатити адресу призначення /user/queue/position-updates. UserDestinationMessageHandler обробляє цю адресу призначення і перетворює її на адресу призначення, унікальну для сесії користувача (наприклад, /queue/position-updates-user123). Це забезпечує зручність підписки на загальну адресу призначення і в той же час гарантує відсутність конфліктів з іншими користувачами, що підписалися на ту саму адресу призначення, тому кожен користувач зможе отримувати унікальні оновлення біржових позицій.

    При роботі з кастомними адресами призначення важливо налаштувати префікси адрес призначення брокера та програми, інакше брокер оброблятиме повідомлення з префіксом "/user", які повинні оброблятися тільки UserDestinationMessageHandler.

    На стороні передачі повідомлення можуть надсилати за адресою призначення, такою як /user/{username}/queue/position-updates, яка у свою чергу перетворюється на UserDestinationMessageHandler в одну або кілька адрес призначення, по одній для кожної сесії, пов'язаної з користувачем. Це дозволяє будь-якому компоненту програми надсилати повідомлення, призначені для конкретного користувача, не знаючи нічого, крім його імені та загальної адреси призначення. Це також можна зробити за допомогою анотації та шаблону обміну повідомленнями.

    Метод обробки повідомлень може надсилати повідомлення користувачу, пов'язаному з обробленим повідомленням, за допомогою анотації @SendToUser (також підтримується на рівні класу для загальної адреси призначення), як показано в наступному прикладі:

    @Controller public class PortfolioController { @MessageMapping("/trade") @SendToUser("/queue/position-updates") public TradeResult executeTrade(Trade trade, Principal principal) { // ... return tradeResult ; } }

    Якщо користувач має більше однієї сесії, за замовчуванням усі сесії, підписані на цю адресу призначення, є цільовими. Однак іноді може знадобитися вибрати метою лише ту сесію, яка надіслала оброблене повідомлення. Зробити це можна, встановивши атрибут broadcast у false, як показано в наступному прикладі:

    @Controller public class MyController { @MessageMapping("/action") public void handleAction() throws Exception{ // тут генерується MyBusinessException } @MessageExceptionHandler @Send destinations="/queue/errors", broadcast=false) public ApplicationError handleException(MyBusinessException exception) { // ... return appError; } }
    Хоча адреси призначення користувачів зазвичай мають на увазі автентифікованого користувача, це не є суворо обов'язковим. Сесія WebSocket, не пов'язана з автентифікованим користувачем, може підписатися на адресу призначення користувача. У таких випадках анотація @SendToUser функціонує так само, як і при broadcast=false (тобто вибирає метою лише ту сесію, яка надіслала оброблене повідомлення).

    Можна надіслати повідомлення на адреси призначення користувача з будь-якого компонента програми, наприклад, впровадивши SimpMessagingTemplate, створений Java-конфігурацією або простором імен XML. (Ім'я біна - brokerMessagingTemplate, якщо потрібно для повного уточнення імені за допомогою анотації @Qualifier). У цьому прикладі показано, як це зробити:

     @Service public class TradeServiceImpl implements TradeService { private final SimpMessagingTemplate messagingTemplate; @Autowired public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) { this.messagingTemplate = messagingTemplate; } // ... public void afterTradeExecuted(Trade trade) { this.messagingTemplate.convertAndSendToUser( trade.getUserName(), "/queue/position-updates", trade.getResult()); } }
    Якщо використовуються призначені для користувача адреси призначення із зовнішнім брокером повідомлень, то слід ознайомитися з документацією до брокеру щодо того, як керувати неактивними чергами, щоб після завершення сесії користувача всі унікальні черги користувача були видалені. Наприклад, RabbitMQ створює черги з автоматичним видаленням, якщо використовуються такі адреси призначення, як /exchange/amq.direct/position-updates. У цьому випадку клієнт може підписатися на /user/exchange/amq.direct/position-updates. Аналогічно, ActiveMQ має конфігураційні опції для очищення неактивних адрес призначення.

    У сценарії з кількома серверами програм користувача адреса призначення може залишитися невирішеною, оскільки користувач підключений до іншого сервера. У таких випадках можна налаштувати адресу призначення на широкомовну розсилку недозволених повідомлень, щоб інші сервери могли спробувати зробити це. Здійснити це можна за допомогою властивості userDestinationBroadcast реєстру MessageBrokerRegistry в Java-конфігурації та атрибуту user-destination-broadcast елемента message-broker у XML.

    Порядок повідомлень

    Повідомлення від брокера публікуються через clientOutboundChannel, звідки вони записуються в WebSocket-сесії. Оскільки канал підтримується ThreadPoolExecutor, повідомлення обробляються в різних потоках, а результуюча послідовність, отримана клієнтом, може не збігатися з точним порядком публікації.

    Якщо це є проблемою, активуйте прапорець setPreservePublishOrder, як показано в наступному прикладі:

    @Configuration @EnableWebSocketMessageBroker public class MyConfig implements WebSocketMessageBrokerConfigurer { @Override protected void configureMessageBroker(MessageBrokerRegistry registry) { // ... registry.setPreservePu } }

    У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:

    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http://www.springframework. org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket https://www.springframework.org/schema/websocket /spring-websocket.xsd"> <websocket:message-broker preserve-publish-order="true"> <!-- ... --> </websocket:message-broker> </beans>

    Якщо прапор встановлений, повідомлення в рамках однієї клієнтської сесії публікуються через clientOutboundChannel по одному за раз, що забезпечує необхідний порядок публікації. Зверніть увагу, що це призводить до невеликого зниження продуктивності, тому необхідно активувати цей прапор тільки в разі потреби.

    Події

    Кілька подій ApplicationContext публікуються і можуть бути отримані шляхом реалізації інтерфейсу ApplicationListener для Spring:

    • BrokerAvailabilityEvent: Вказує, коли брокер стає доступним або недоступним. Хоча "простий" брокер стає доступним відразу під час запуску і залишається таким під час роботи програми, "ретранслятор брокера" STOMP може втратити з'єднання з повнофункціональним брокером (наприклад, при перезапуску брокера). Ретранслятор брокера містить логіку перепідключення та відновлює "системне" з'єднання з брокером, коли він знову активується. В результаті ця подія публікується щоразу, коли стан змінюється з підключеного на відключений і навпаки. Компонентам, які використовують SimpMessagingTemplate, потрібно підписатися на цю подію та уникати надсилання повідомлень, поки брокер недоступний. У будь-якому випадку, вони повинні бути підготовлені до обробки MessageDeliveryException під час надсилання повідомлення.

    • SessionConnectEvent: Публікується при отриманні нового кадру CONNECT протоколу STOMP для позначення початку нової клієнтської сесії. Подія містить повідомлення, що представляє підключення, включаючи ідентифікатор сесії, інформацію про користувача (якщо така є) і будь-які заголовки, відправлені клієнтом. Це корисно для відстеження клієнтських сесій. Компоненти, підписані на цю подію, можуть обернути повідомлення за допомогою SimpMessageHeaderAccessor або StompMessageHeaderAccessor.

    • SessionConnectedEvent: Публікується незабаром після SessionConnectEvent, коли брокер відправив кадр CONNECTED протоколу STOMP у відповідь на кадр CONNECT. На цьому етапі STOMP-сесію можна вважати повністю встановленою.

    • SessionUnsubscribeEvent: Публікується при отриманні нового кадру UNSUBSCRIBE протоколу STOMP.

    • SessionDisconnectEvent: Публікується при завершення STOMP-сесії. Фрейм DISCONNECT може бути переданий від клієнта або автоматично згенерований під час закриття WebSocket-сесії. У деяких випадках ця подія публікується більше ніж один раз за сесію. Компоненти повинні бути ідемпотентними по відношенню до кількох подій втрати з'єднання. Якщо ви використовуєте повнофункціональний брокер, "ретранслятор брокера" STOMP автоматично встановлює "системне" з'єднання, якщо брокер стає тимчасово недоступним. Однак, клієнтські з'єднання автоматично повторно не встановлюються. Якщо heartbeat-повідомлення включено, то клієнт зазвичай реагує на те, що брокер не відповідає протягом 10 секунд. У клієнтах має бути реалізована їхня власна логіка повторного встановлення з'єднання.

    Перехоплення

    Події передають повідомлення про життєвий цикл STOMP-з'єднання, але не про кожне повідомлення клієнта. Програми також можуть реєструвати ChannelInterceptor для перехоплення будь-якого повідомлення у будь-якій частині ланцюжка обробки. У цьому прикладі показано, як перехоплювати вхідні повідомлення від клієнтів:

    @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(new MyChannel) } }

    Користувацький ChannelInterceptor може використовувати StompHeaderAccessor або SimpMessageHeaderAccessor для отримання доступу до інформації про повідомлення, як показано в наступному прикладі:

    public class MyChannelInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getStompCommand(); // ... return message; } }

    Програми також можуть реалізувати ExecutorChannelInterceptor, який є підінтерфейсом ChannelInterceptor із зворотними викликами в потоці , у якому обробляються повідомлення. Хоча ChannelInterceptor викликається один раз для кожного повідомлення, відправленого в канал, ExecutorChannelInterceptor передбачає перехоплювачі у потоці кожного MessageHandler, підписаного на повідомлення з каналу.

    Зверніть увагу, що, як і у випадку з SessionDisconnectEvent, описаним раніше, повідомлення DISCONNECT може бути отримане від клієнта, а також може бути автоматично згенероване під час закриття сесії WebSocket. У деяких випадках перехоплювач може перехопити це повідомлення більше одного разу кожної сесії. Компоненти повинні бути ідемпотентними по відношенню до кількох подій втрати з'єднання.

    Клієнт STOMP

    Spring надає клієнт STOMP поверх WebSocket і клієнт STOMP поверх TCP.

    Для початку можна створити та налаштувати WebSocketStompClient, як показано в наступному прикладі:

    WebSocketClient webSocketClient = новий StandardWebSocketClient(); WebSocketStompClient stompClient = новий WebSocketStompClient(webSocketClient); stompClient.setMessageConverter(new StringMessageConverter()); stompClient.setTaskScheduler(taskScheduler); // for heartbeats

    У попередньому прикладі можна замінити StandardWebSocketClient на SockJsClient, оскільки він також є реалізацією WebSocketClient. SockJsClient може використовувати WebSocket або механізм передачі на основі HTTP як запасний варіант.

    Далі можна встановити з'єднання та надати обробник для сесії STOMP, як показано в наступному прикладі:

    String url = "ws://127.0. 0.1:8080/endpoint"; StompSessionHandler sessionHandler = New MyStompSessionHandler(); stompClient.connect(url, sessionHandler);

    Коли сесія буде готова до використання, обробник отримає повідомлення, як показано в наступному прикладі:

    public class MyStompSessionHandler extends StompSessionHandlerAdapter { @Override public void afterConnected (StompSession session, StompHeaders connectedHeaders) { // ... } }

    Як тільки сесія буде встановлена, можна надсилати будь-які корисні дані, які серіалізуються за допомогою налаштованого MessageConverter, як показано в наступному прикладі:

    session.send("/topic/something", "payload");

    Ви також можете підписатися на адреси призначення. Методи subscribe вимагають наявності обробника повідомлень для передплати та повертають дескриптор (хендл) Subscription, який ви можете використовувати для відписки. Для кожного отриманого повідомлення обробник може вказати цільовий тип Object, у який мають бути десеріалізовані корисні дані, як показано в наступному прикладі:

    session.subscribe("/topic/something", новий StompFrameHandler() { @Override public Type getPayloadType(StompHeaders headers ) { return String.class; } @Override public void handleFrame(StompHeaders headers, Object payload) { // ... } });

    Щоб увімкнути heartbeat-повідомлення STOMP, ви можете налаштувати WebSocketStompClient з використанням TaskScheduler та опціонально налаштувати інтервали передачі heartbeat-повідомлень (10 секунд для відсутності дій запису, що викличе відправлення heart і 10 секунд для відсутності дій читання, що викличе закриття з'єднання). коли не надсилаються інші повідомлення. Це може стати проблемою при використанні зовнішнього брокера, оскільки повідомлення з адресою призначення, що не є брокерською, відображають активність, але фактично не надсилаються брокеру. У цьому випадку можна налаштувати TaskScheduler при ініціалізації зовнішнього брокера, який забезпечує пересилання heartbeat-повідомлення брокеру також і в тому випадку, якщо надсилаються лише повідомлення з адресою призначення, що не є брокерською.

    Якщо ви використовуєте WebSocketStompClient для тестів продуктивності, щоб імітувати тисячі клієнтів з однієї машини, подумайте про відключення heartbeat-повідомлень, оскільки кожне з'єднання планує свої власні -завдання, а оптимізація цього процесу для великої кількості клієнтів, що працюють на одній машині, відсутня.

    Протокол STOMP також забезпечує підтримку підтвердження отримання, якщо клієнт повинен додати заголовок receipt, на який сервер відповідає кадром RECEIPT після обробки надсилання або підписки. Для забезпечення такої підтримки StompSession пропонує setAutoReceipt(boolean), який забезпечує додавання заголовка receipt при кожній подальшій події відправки або підписки. Крім того, можна вручну додати заголовок підтвердження отримання в StompHeaders. І відправка, і передплата повертають екземпляр Receiptable, який можна використовувати для реєстрації зворотних викликів успішного та невдалого отримання. Для цієї функції необхідно налаштувати клієнт за допомогою TaskScheduler та завданням кількості часу до закінчення терміну дії підтвердження отримання (за замовчуванням 15 секунд).

    Зверніть увагу, що StompSessionHandler сам є StompFrameHandler, що дозволяє йому обробляти кадри ERROR на додаток до зворотного виклику handleException, призначеного для винятків, згенерованих при обробці повідомлень, та handleTransportError, призначеного для помилок на рівні передачі, включаючи ConnectionLostException.

    Область доступності протоколу WebSocket

    Кожна сесія WebSocket має Map атрибутів. Map прив'язується як заголовок до вхідних повідомлень клієнта і може бути доступна з методу контролера, як показано в наступному прикладі:

    @Controller public class MyController { @MessageMapping("/action") public void handle(SimpMessageHeaderAccessor headerAccessor) { Map<String, Object> attrs = headerAccessor.getSessionAttributes(); // ... } }

    Можна оголосити керований Spring бін в області доступності websocket. Ви можете впроваджувати біни, що входять до області доступності WebSocket, у контролери та будь-які перехоплювачі каналу, зареєстровані через clientInboundChannel. Як правило, вони є одинаками, а їхній життєвий цикл довше, ніж кожна окрема WebSocket-сесія. Тому для бінів, що входять до області доступності WebSocket, необхідно використовувати режим проксі для області доступності, як показано в наступному прикладі:

    @Component @Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS) public class MyBean { @PostConstruct public void init() { // Викликається після впровадження залежностей } // ... @PreDestroy public void destroy() { // Викликається після впровадження залежностей } } @Controller public class MyController { private final MyBean myBean; @Autowired public MyController(MyBean myBean) { this.myBean = myBean; } @MessageMapping("/action") public void handle() { // this.myBean з поточної сесії WebSocket } }

    Як і будь-яка кастомна область доступності, Spring ініціалізує новий екземпляр MyBean при першому зверненні до нього з контролера та зберігає його в атрибутах сесії WebSocket. Цей екземпляр згодом повертається до закінчення сесії. Для бінів, що входять в область доступності WebSocket, викликаються всі методи життєвого циклу Spring, як показано в попередніх прикладах. На неї впливає безліч факторів, включаючи розмір та обсяг повідомлень, виконання прикладними методами роботи, що вимагає блокування, та зовнішні фактори (такі як швидкість мережі та інші деталі). Мета цього розділу – коротко описати доступні варіанти конфігурації, а також висловити деякі ідеї та поміркувати про масштабування.

    У додатку для обміну повідомленнями повідомлення передаються каналами для асинхронного виконання, які підтримуються пулами потоків. Конфігурація такої програми вимагає хорошого розуміння принципів роботи каналів та потоку повідомлень.

    Очевидно, що почати слід з налаштування пулів потоків, що підтримують clientInboundChannel та clientOutboundChannel. За умовчанням обидві конфігурації налаштовані на подвоєну кількість доступних процесорів.

    Якщо обробка повідомлень в анотованих методах в основному залежить від процесора, кількість потоків для clientInboundChannel повинна залишатися близькою до кількості процесорів. Якщо робота, яку вони виконують, більше пов'язана з введенням-виводом і вимагає блокування або очікування в базі даних або іншій зовнішній системі, розмір пулу потоків, ймовірно, потрібно збільшити.

    ThreadPoolExecutor має три важливі властивості: розмір основного пулу потоків, максимальний розмір пулу потоків та місткість черги для зберігання завдань, для яких немає вільних потоків.

    Часто виникає плутанина : конфігурування розміру основного пулу (наприклад, 10) та максимального розміру пулу (наприклад, 20) призводить до створення пулу потоків з 10-20 потоками. Насправді, якщо залишити значення місткості за умовчанням Integer.MAX_VALUE, пул потоків ніколи не перевищить розмір основного пулу, оскільки всі додаткові завдання ставляться в чергу. щоб дізнатися, як працюють ці властивості, і ознайомитися з різними стратегіями постановки в чергу. Якщо клієнти знаходяться у швидкій мережі, кількість потоків повинна залишатися близькою до кількості доступних процесорів. Якщо вони повільні або мають низьку пропускну здатність, то довше споживають повідомлення та створюють навантаження на пул потоків. Тому потрібно збільшити розмір пулу потоків.

    Хоча робоче навантаження для каналу clientInboundChannel можна передбачити — зрештою, воно залежить від того, чим займається програма — налаштувати канал "clientOutboundChannel" вже складніше , оскільки його робота заснована на факторах, які не залежать від застосування. З цієї причини з надсиланням повідомлень пов'язані дві додаткові властивості: sendTimeLimit та sendBufferSizeLimit. Ви можете використовувати ці методи для того, щоб налаштувати тривалість відправлення та обсяг даних, що підлягають буферизації, при надсиланні повідомлень клієнту. . Всі додаткові повідомлення, тим часом, буферизуються, і ви можете використовувати ці властивості, щоб вирішити, скільки часу має займати надсилання повідомлення і скільки даних може буферизуватися за цей час. Важливі додаткові подробиці див. у javadoc та документації до XML-схеми.

    У наступному прикладі показано можливу конфігурацію:

    @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureWebSocketTransport(WebSocketTransport( 1000) .setSendBufferSizeLimit(512 * 1024); } // ... }

    У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:

    <beans xmlns="http://www.springframework.org/schema/ beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http:/ /www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket https://www.springframework. org/schema/websocket/spring-websocket.xsd"> <websocket:message-broker> <websocket:transport send-timeout="15000" send-buffer-size="524288" /> <!-- ... --> </websocket:message-broker> </beans>

    Ви також можете використовувати конфігурацію механізму передачі WebSocket, показану раніше, для налаштування максимально допустимого розміру вхідних повідомлень STOMP. Теоретично розмір повідомлення WebSocket може бути практично необмеженим. Насправді сервери WebSocket встановлюють обмеження – наприклад, 8 Кбайт для Tomcat і 64 Кбайт для Jetty. З цієї причини клієнти STOMP (такі як webstomp-client з JavaScript та інші) розбивають великі повідомлення STOMP у досягненні розміру в 16 Кбайт і надсилають їх у вигляді кількох повідомлень WebSocket, що вимагає від сервера буферизації та повторного асемблювання.

    Засоби підтримки STOMP поверх WebSocket у Spring здійснюють це, тому програми можуть конфігурувати максимальний розмір STOMP-повідомлень від розмірів повідомлень, специфічних для WebSocket сервера. Пам'ятайте, що розмір повідомлення WebSocket автоматично коригується, якщо необхідно, щоб гарантовано передавати WebSocket-повідомлення розміром мінімум 16 Кбайт. code-block">

    @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override registration) {registration.setMessageSizeLimit(128 * 1024); } // ... }

    У наступному прикладі показаний XML-еквівалент конфігурації з попереднього прикладу:

    <beans xmlns="http://www.springframework.org/schema/ beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http:/ /www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket https://www.springframework. org/schema/websocket/spring-websocket.xsd"> <websocket:message-broker> <websocket:transport message-size="131072" /> <!-- ... --> </websocket:message-broker> </beans>

    Важливим моментом у масштабуванні є використання кількох екземплярів додатків. В даний час не можна зробити це за допомогою простого брокера. Однак при використанні повнофункціонального брокера (наприклад, RabbitMQ) кожен екземпляр програми підключається до брокера, а повідомлення, що надсилаються одним екземпляром програми, можуть надсилатися через брокер клієнтам WebSocket, підключеним через будь-які інші екземпляри додатків.

    Моніторинг

    Якщо ви використовуєте анотацію @EnableWebSocketMessageBroker або <websocket:message-broker>, ключові компоненти інфраструктури автоматично збирають статистику та лічильники, які дають важливе уявлення про внутрішній стан програми. У конфігурації також оголошується бін типу WebSocketMessageBrokerStats, який збирає всю доступну інформацію в одному місці та за умовчанням реєструє її на рівні INFO раз на 30 хвилин. Цей бін може бути експортований в JMX через MBeanExporter з Spring для перегляду під час виконання (наприклад, через jconsole з JDK). У наступному списку наведено коротку інформацію:

    Клієнтські сесії WebSocket
    Поточна (Current)

    Показує, скільки клієнтських сесій існує в даний час, з подальшою розбивкою потокової передачі даних через протокол WebSocket щодо протоколу HTTP і сесіям поллінгу через протокол SockJS.

    Усього (Total)

    Вказує, скільки всього сесій було встановлено.

    Аварійно закрита
    Збої в з'єднанні (Connect Failures)

    Сесії, які були встановлені, але були закриті після того, як протягом 60 секунд не було отримано жодного повідомлення. Зазвичай це свідчить про проблеми з проксі-сервером або мережею. настроєного часу очікування відправки або ліміту буфера відправки, що може статися з повільними клієнтами (див. попередній розділ). >

    Сесії закриваються після помилки передачі, наприклад, через неможливості читання або запису в WebSocket-з'єднання або HTTP-запит або відповідь.

    Фрейми STOMP (STOMP Frames)

    Загальна кількість оброблених кадрів CONNECT, CONNECTED і DISCONNECT, що показує, скільки клієнтів підключилося на рівні STOMP. Зверніть увагу, що значення лічильника кадру DISCONNECT може бути меншим, якщо сесії закриті аварійно або якщо клієнти закриті без надсилання кадру DISCONNECT.

    Ретранслятор брокера STOMP (STOMP Broker Relay)
    TCP-з'єднання

    Вказує, скільки TCP-з'єднань від імені клієнтських WebSocket-сесій встановлено з брокером. Значення має дорівнювати кількості клієнтських WebSocket-сесій + 1 додаткове загальне "системне" з'єднання для надсилання повідомлень з програми.

    Фрейми STOMP (STOMP Frames)

    Загальна кількість кадрів CONNECT, CONNECTED та DISCONNECT, переданих брокеру або отриманих від нього від імені клієнтів. Зверніть увагу, що фрейм DISCONNECT відправляється брокеру незалежно від того, як було закрито клієнтську WebSocket-сесію. Тому менша кількість фреймів DISCONNECT є ознакою того, що брокер активно закриває з'єднання (можливо, через heartbeat-повідомлення, що не вчасно прийшло, недійсного вхідного кадру або іншої проблеми).

    Вхідний канал клієнта

    Статистика з пулу потоків, що підтримує clientInboundChannel, яка дає уявлення про стан обробки вхідних повідомлень. Завдання, що стоять у черзі, є ознакою того, що програма може занадто повільно обробляти повідомлення. Якщо є завдання, пов'язані з введенням-виводом (наприклад, повільні запити до бази даних, HTTP-запити до стороннього REST API тощо), розгляньте можливість збільшення розміру пулу потоків.

    Вихідний канал клієнта (Client Outbound Channel)

    Статистика з пулу потоків, що підтримує clientOutboundChannel, яка дає уявлення про працездатність широкомовної розсилки повідомлень клієнтам. Завдання, що стоять у черзі, є ознакою того, що клієнти надто повільно приймають повідомлення. Один із способів вирішення цієї проблеми – збільшити розмір пулу потоків, щоб вмістити очікувану кількість одночасних повільних клієнтів. Інший варіант – знизити обмеження на час очікування відправки та розмір буфера відправки (див. попередній розділ). >

    Статистика з пулу потоків планувальника завдань SockJS, який використовується для надсилання heartbeat-повідомлень. Зверніть увагу, що при узгодженні heartbeat-повідомлень на рівні STOMP відправка heartbeat-повідомлень для SockJS вимкнена.

    Тестування

    Існує два основних підходу до тестування додатків, коли ви використовуєте підтримку STOMP поверх WebSocket у Spring. Перший полягає в написанні тестів на стороні сервера для перевірки функціональності контролерів та їх анотованих методів обробки повідомлень. Другий – це написання повних наскрізних тестів, які включають запуск клієнта і сервера.

    Ці два підходи не є взаємовиключними. Навпаки, кожному їх відведено своє місце у загальній стратегії тестування. Тести на стороні сервера більш вузькоспрямовані, їх легше писати та супроводжувати. З іншого боку, наскрізні інтеграційні тести є більш повними і охоплюють набагато більше, але вони також складніші в написанні та супроводі.

    Найпростішою формою тестування на стороні сервера є написання модульних тестів контролера. Однак це недостатньо практично, оскільки багато з того, що робить контролер, залежить від його інструкцій. Чисті модульні тести просто не зможуть протестувати його роботу.

    В ідеалі, контролери, що тестуються, повинні викликатися в тому вигляді, в якому вони знаходяться під час виконання програми, подібно до підходу до тестування контролерів, що обробляють HTTP-запити, за допомогою фреймворка Spring MVC Test – без запуску контейнера сервлетів, але з використанням Spring Framework для виклику анотованих контролерів. Як і у випадку з Spring MVC Test, тут у вас є дві можливі альтернативи: використовувати "контекстно-орієнтовану" або "автономну" конфігурацію:

    • Завантажте актуальну конфігурацію Spring за допомогою фреймворку Spring TestContext, впровадьте clientInboundChannel як тестове поле і використовуйте його для надсилання повідомлень, які будуть оброблятися методами контролера.

    • Вручну встановіть мінімальну інфраструктуру фреймворку Spring, необхідну для виклику контролерів (а саме SimpAnnotationMethodMessageHandler) і передавайте повідомлення для контролерів безпосередньо йому.

    Обидва ці сценарії настройки продемонстровані в тестах прикладу програми для портфеля акцій.

    Другий підхід полягає у створенні наскрізних інтеграційних тестів. Для цього необхідно запустити WebSocket-сервер у вбудованому режимі та підключитися до нього як WebSocket-клієнт, який надсилає WebSocket-повідомлення, що містять STOMP-фрейми. Тести прикладу програми для портфеля акцій також застосовують цей підхід, використовуючи Tomcat як вбудований сервер WebSocket і простого клієнта STOMP для цілей тестування.