Протокол WebSocket определяет два типа сообщений (текстовые и двоичные), но их содержание не определяется. Протокол определяет клиенту и серверу механизм для согласования субпротокола (то есть протокола обмена сообщениями более высокого уровня), который будет использоваться поверх WebSocket для определения того, какие сообщения каждый из них может отправлять, каков их формат, содержание каждого сообщения и так далее. Использование субпротокола необязательно, но в любом случае между клиентом и сервером должен быть согласован некоторый протокол, определяющий содержание сообщения.

Краткое описание

Протокол STOMP (Simple Text Oriented Messaging Protocol) изначально был создан для скриптовых языков (таких как Ruby, Python и Perl) для подключения к корпоративным брокерам сообщений. Он предназначен для работы с простым подмножеством часто используемых шаблонов обмена сообщениями. STOMP можно использовать через любой надежный двунаправленный потоковый сетевой протокол, такой как TCP и WebSocket. Хотя STOMP является текстово-ориентированным протоколом, полезные данные сообщения могут иметь как текстовую, так и двоичную форму.

STOMP – это протокол на основе фреймов, фреймы которого смоделированы на основе HTTP. В следующем листинге показана структура фрейма STOMP:

COMMAND
header1:value1
header2:value2
Body^@

Клиенты могут использовать команды SEND или SUBSCRIBE для отправки или подписки на сообщения вместе с заголовком destination, который описывает, о чем сообщение и кто должен его получить. Это позволяет создать простой механизм публикации-подписки, который можно использовать для отправки сообщений через брокер другим подключенным клиентам или для отправки сообщений на сервер с запросом на выполнение определенной работы.

Если вы используете средства поддержки протокола STOMP от Spring, приложение WebSocket в Spring выступает в качестве брокера STOMP для клиентов. Сообщения направляются в методы обработки сообщений, помеченные аннотацией @Controller, или в простой брокер с хранением в оперативной памяти, который отслеживает подписки и рассылает сообщения подписавшимся пользователям. Также можно сконфигурировать Spring на работу со специальным STOM-брокером (таким как RabbitMQ, ActiveMQ и другими) для фактической широковещательной рассылки сообщений. В этом случае Spring поддерживает установление TCP-соединений с брокером, ретранслирует ему сообщения и передает сообщения от него подключенным WebSocket-клиентам. Таким образом, веб-приложения Spring могут использовать унифицированную безопасность на основе HTTP-протокола, общую валидацию и знакомую модель программирования для обработки сообщений.

В следующем примере показан клиент, подписавшийся на получение котировок акций, которые сервер может генерировать периодически (например, через запланированную задачу, которая отправляет сообщения через SimpMessagingTemplate брокеру):

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
^@

В следующем примере показан клиент, отправляющий торговую заявку, которую сервер может обработать с помощью метода с аннотацией @MessageMapping:

SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}^@

После выполнения сервер может передать клиенту сообщение о подтверждении сделки и подробную информацию.

Значение адреса назначения намеренно оставлено непрозрачным в спецификации STOMP. Это может быть любая строка, а серверы STOMP полностью определяют семантику и синтаксис адресов назначения, которые они поддерживают. Однако очень часто адреса назначения представляют собой строки, подобные пути, где /topic/.. подразумевает публикацию-подписку (один ко многим), а /queue/ подразумевает обмен сообщениями "точка-точка" (один к одному).

Серверы STOMP могут использовать команду MESSAGE для рассылки сообщений всем подписчикам. В следующем примере показано, как сервер отправляет котировку акций подписавшемуся клиенту:

MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@

Сервер не может отправлять незатребованные сообщения. Все сообщения от сервера должны быть ответом на определенную подписку клиента, а заголовок subscription-id сообщения сервера должен совпадать с заголовком id подписки клиента.

Предыдущее краткое описание призвано обеспечить самое базовое понимание протокола STOMP. Мы рекомендуем полностью ознакомиться со спецификацией протокола.

Преимущества

Использование STOMP в качестве субпротокола позволяет Spring Framework и Spring Security обеспечить более полнофункциональную модель программирования по сравнению с использованием сырых веб-сокетов протокола WebSocket. То же самое можно сказать и о сравнении HTTP с сырым TCP и о том, как это позволяет Spring MVC и другим веб-фреймворкам обеспечивать богатую функциональность. Ниже приведен перечень преимуществ:

  • Нет необходимости выдумывать кастомный протокол обмена сообщениями и формат сообщений.

  • Клиенты STOMP, включая Java-клиент, доступны в Spring Framework.

  • Можно (опционально) использовать брокеры сообщений (такие как RabbitMQ, ActiveMQ и другие) для управления подписками и широковещательными сообщениями.

  • Логику приложения можно организовать в любом количестве экземпляров @Controller, а сообщения можно маршрутизировать к ним на основе заголовка адреса назначения STOMP в отличие от обработки сырых WebSocket-сообщений с помощью единственного WebSocketHandler для данного соединения.

  • Можно использовать Spring Security для защиты сообщений на основе адресов назначения STOMP и типов сообщений.

Активация STOMP

Поддержка STOMP поверх WebSocket доступна в модулях spring-messaging и spring-websocket. Как только у вас появятся эти зависимости, то можно будет открыть конечные точки STOMP поверх WebSocket с помощью запасного варианта через протокол SockJS, как показано в следующем примере:

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();  
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app"); 
        config.enableSimpleBroker("/topic", "/queue"); 
    }
}
  1. /portfolio – это URL-адрес протокола HTTP для конечной точки, к которой клиент WebSocket (или SockJS) должен подключиться для подтверждения установления связи по протоколу WebSocket.
  2. Сообщения STOMP, заголовок адреса назначения которых начинается с /app, направляются в методы, аннотированные @MessageMapping в классах с аннотацией @Controller.
  3. Используем встроенный брокер сообщений для подписки и широковещательной рассылки и направляем сообщения, заголовок адреса назначения которых начинается с /topic `или `/queue в брокер.

В следующем примере показан XML-эквивалент конфигурации из предыдущего примера:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>
</beans>
В случае встроенного простого брокера префиксы /topic и /queue не имеют специального значения. Они являются лишь соглашением, позволяющим провести разграничение между обменом сообщениями по типу "издатель-подписчик" и "точка-точка" (то есть между многими подписчиками и одним потребителем). Если вы используете внешний брокер, обратитесь к странице, STOMP брокера, чтобы понять, какие адреса назначения и префиксы STOMP он поддерживает.

Для подключения из браузера, при использовании SockJS, можно использовать sockjs-client. Для STOMP многие приложения использовали библиотеку jmesnil/stomp-websocket (также известную как stomp.js), которая является полнофункциональной и использовалась в производстве в течение многих лет, но больше не поддерживается. В настоящее время JSteunou/webstomp-client является наиболее активно поддерживаемым и развивающимся преемником этой библиотеки. Следующий пример кода основан на ней:

var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}

Кроме того, если подключение осуществляется через WebSocket (без SockJS), то можно использовать следующий код:

var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}

Обратите внимание, что в предыдущем примере stompClient не нужно указывать заголовки login и passcode. Даже если бы это было сделано, они были бы проигнорированы (или, скорее, переопределены) на стороне сервера.

Дополнительные примеры кода см.:

Сервер WebSocket

Для настройки базового сервера WebSocket применима информация из раздела "Серверная конфигурация". В случае Jetty, однако, необходимо установить HandshakeHandler и WebSocketPolicy через StompEndpointRegistry:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
    }
    @Bean
    public DefaultHandshakeHandler handshakeHandler() {
        WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
        policy.setInputBufferSize(8192);
        policy.setIdleTimeout(600000);
        return new DefaultHandshakeHandler(
                new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    }
}

Поток сообщений

Если конечная точка STOMP открыта, приложение Spring становится брокером STOMP для подключенных клиентов. В этом разделе описывается поток сообщений на стороне сервера.

Модуль spring-messaging содержит фундаментальные средства поддержки приложений для обмена сообщениями, которые зародились еще в проекте Spring Integration, а затем были извлечены и включены в Spring Framework для более широкого использования во многих проектах Spring и сценариях применения. В следующем списке кратко описаны некоторые из доступных абстракций обмена сообщениями:

  • Message: Простое представление сообщения, включая заголовки и полезные данные.

  • MessageHandler: Соглашение об обработке сообщения.

  • MessageChannel: Соглашение об отправке сообщения, обеспечивающее свободное взаимодействие между отправителями и получателями.

  • SubscribableChannel: MessageChannel с подписчиками MessageHandler.

  • ExecutorSubscribableChannel: SubscribableChannel, который использует Executor для доставки сообщений.

Как Java-конфигурация (то есть аннотация @EnableWebSocketMessageBroker), так и конфигурация пространства имен XML (то есть <websocket:message-broker>) используют предыдущие компоненты для ассемблирования рабочего процесса передачи сообщения. На следующей схеме показаны компоненты, используемые при активации простого встроенного брокера сообщений:

На предыдущей схеме показаны три канала сообщений:

  • clientInboundChannel: Для передачи сообщений, полученных от клиентов WebSocket.

  • clientOutboundChannel: Для отправки серверных сообщений клиентам WebSocket.

  • brokerChannel: Для отправки сообщений брокеру сообщений из кода приложения на стороне сервера.

На следующей схеме показаны компоненты, используемые, если внешний брокер (например, RabbitMQ) сконфигурирован для управления подписками и широковещательной рассылкой сообщений:

Основное различие между двумя предыдущими схемами заключается в использовании "ретранслятора брокера (broker relay)" для передачи сообщений до внешнего STOMP-брокера поверх TCP и для нисходящей передачи сообщений от брокера к подписавшимся клиентам.

Если сообщения принимаются от WebSocket-соединения, они декодируются в STOMP-фреймы, превращаются в представление Message из Spring и отправляются по clientInboundChannel для дальнейшей обработки. Например, STOMP-сообщения, заголовки адресов назначения которых начинаются с /app, могут быть маршрутизированы к методам с аннотацией @MessageMapping в аннотированных контроллерах, в то время как сообщения /topic и /queue могут быть направлены непосредственно в брокер сообщений.

Аннотированный @Controller, который обрабатывает STOMP-сообщение от клиента, может отправить сообщение брокеру сообщений через brokerChannel, а брокер рассылает сообщение соответствующим подписчикам через clientOutboundChannel. Тот же контроллер может делать то же самое в ответ на HTTP-запросы, поэтому клиент может выполнить HTTP-метод POST, а затем метод с аннотацией @PostMapping может отправить сообщение брокеру сообщений для рассылки подписчикам.

Мы можем проследить этот процесс на простом примере. Рассмотрим следующий пример, в котором настраивается сервер:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }
}
@Controller
public class GreetingController {
    @MessageMapping("/greeting")
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }
}

Предыдущий пример поддерживает следующий поток:

  1. Клиент подключается к http://localhost:8080/portfolio и, как только устанавливается WebSocket-соединение, по нему начинают передаваться фреймы STOMP.

  2. Клиент отправляет фрейм SUBSCRIBE с заголовком назначения /topic/greeting. После получения и декодирования сообщение отправляется в clientInboundChannel, а затем маршрутизируется в брокер сообщений, который хранит подписку клиента.

  3. Клиент отправляет фрейм SEND в /app/greeting. Префикс /app помогает маршрутизировать его к аннотированным контроллерам. После удаления префикса /app оставшаяся часть назначения /greeting отображается на метод, аннотированный @MessageMapping, в GreetingController.

  4. Значение, возвращаемое GreetingController, превращается в Message из Spring с полезными данными, основанными на возвращаемом значении, и заголовком назначения по умолчанию /topic/greeting (полученным из входного назначения с заменой /app на /topic). Полученное сообщение отправляется в brokerChannel и обрабатывается брокером сообщений.

  5. Брокер сообщений находит всех подходящих подписчиков и отправляет каждому из них фрейм MESSAGE по каналу clientOutboundChannel, откуда сообщения кодируются как фреймы STOMP и отправляются по соединению на основе протокола WebSocket.

Следующий раздел содержит более подробную информацию об аннотированных методах, включая поддерживаемые типы аргументов и возвращаемых значений.

Аннотированные контроллеры

Приложения могут использовать классы, помеченные аннотацией @Controller, для обработки сообщений от клиентов. Такие классы могут объявлять методы с аннотацией @MessageMapping, @SubscribeMapping и @ExceptionHandler, как описано в следующих темах:

  • @MessageMapping

  • @SubscribeMapping

  • @MessageExceptionHandler

@MessageMapping

Можно использовать аннотацию @MessageMapping для аннотирования методов, которые маршрутизируют сообщения на основе их назначения. Она поддерживается как на уровне метода, так и на уровне типа. На уровне типов аннотация @MessageMapping используется для выражения общих отображений для всех методов контроллера.

По умолчанию значения отображения представляют собой шаблоны путей в стиле Ant (например, /thing*, /thing/**), включая поддержку переменных шаблона (например, /thing/{id}). На значения можно ссылаться через аргументы метода, помеченного аннотацией @DestinationVariable. Приложения также могут переключиться на соглашение о разделении точкой назначения сообщения для отображений.

Поддерживаемые аргументы метода

В следующей таблице описаны аргументы метода:

Аргумент метода Описание

Message

Обеспечивает доступ к полному сообщению.

MessageHeaders

Обеспечивает доступ к заголовкам внутри Message.

MessageHeaderAccessor, SimpMessageHeaderAccessor, и StompHeaderAccessor

Обеспечивают доступ к заголовкам через типизированные методы доступа.

@Payload

Обеспечивает доступ к полезным данным сообщения, преобразованным (например, из JSON) с помощью сконфигурированного MessageConverter.

Наличие этой аннотации не является обязательным, так как по умолчанию она предусмотрена, если не совпадает ни один другой аргумент.

Вы можете аннотировать аргументы полезных данных с помощью @javax.validation.Valid или @Validated из Spring, чтобы аргументы полезных данных валидировались автоматически.

@Header

Обеспечивает доступ к определенному значению заголовка – вместе с преобразованием типа с помощью org.springframework.core.convert.converter.Converter, если это необходимо.

@Headers

Обеспечивает доступ ко всем заголовкам в сообщении. Этот аргумент должен быть присваиваемым для java.util.Map.

@DestinationVariable

Обеспечивает доступ к переменным шаблона, извлеченным из назначения сообщения. Значения при необходимости преобразуются в соответствии с объявленным типом аргумента метода.

java.security.Principal

Отражает пользователя, вошедшего в систему во время подтверждения установления связи по протоколу WebSocket через протокол HTTP.

Возвращаемые значения

По умолчанию возвращаемое значение метода, помеченного аннотацией @MessageMapping, сериализуется в полезные данные через соответствующий MessageConverter и отправляется в виде Message по brokerChannel, откуда оно рассылается подписчикам. Назначение исходящего сообщения такое же, как и у входящего, но с префиксом /topic.

Можно использовать аннотации @SendTo и @SendToUser для настройки назначения выходного сообщения. Аннотация @SendTo используется для настройки целевого назначения или для указания нескольких назначений. Аннотация @SendToUser используется для направления выходного сообщения только пользователю, связанному с входным сообщением.

Можно одновременно использовать @SendTo и @SendToUser в одном и том же методе, причем оба метода поддерживаются на уровне класса, и в этом случае они будут действовать по умолчанию для методов в классе. Однако помните, что любой аннотирование с помощью @SendTo или @SendToUser на уровне метода переопределяет любые подобные аннотации на уровне класса.

Сообщения могут обрабатываться асинхронно, а метод с аннотацией @MessageMapping может возвращать ListenableFuture, CompletableFuture или CompletionStage.

Обратите внимание, что аннотации @SendTo и @SendToUser – это просто удобная мера, которая равносильна использованию SimpMessagingTemplate для отправки сообщений. При необходимости, в более сложных сценариях, методы, помеченные аннотацией @MessageMapping, могут откатиться к использованию SimpMessagingTemplate напрямую. Это может произойти вместо или, возможно, в дополнение к возврату значения.

@SubscribeMapping

Аннотация @SubscribeMapping аналогична @MessageMapping, но сужает отображение только до сообщений подписки. Она поддерживает те же аргументы метода, что и аннотация @MessageMapping. Однако в случае возвращаемого значения, по умолчанию, сообщение отправляется непосредственно клиенту (через clientOutboundChannel, в ответ на подписку), а не брокеру (через brokerChannel, в виде широковещательной рассылки по соответствующим подпискам). Добавление аннотации @SendTo или аннотации @SendToUser переопределяет эту логику работы и вместо этого отправляет сообщения брокеру.

В каких случаях это практично? Предположим, что брокер отображен на /topic и /queue, а контроллеры приложений отображены на /app. При такой конфигурации брокер хранит все подписки на /topic и /queue, предназначенные для повторных рассылок, и приложению нет необходимости принимать в этом участие. Клиент также может подписаться на некоторый адрес назначения /app, а контроллер может вернуть значение в ответ на эту подписку без участия брокера, не храня и не используя подписку повторно (фактически одноразовый обмен по типу "запрос-ответ"). Одним из вариантов использования этого механизма является заполнение пользовательского интерфейса начальными данными при запуске.

В каких случаях это непрактично? Не пытайтесь отображать брокер и контроллеры с одним и тем же префиксом адреса назначения, если по какой-то причине вам не нужно, чтобы оба независимо обрабатывали сообщения, включая подписки. Входящие сообщения обрабатываются параллельно. Нет никаких гарантий относительно того, что первым обработает данное сообщение – брокер или контроллер. Если целью является получение уведомления о том, что подписка сохранена и готова к широковещательной рассылке, клиент должен запросить подтверждение получения, если сервер поддерживает его (простой брокер не поддерживает). Например, с помощью клиента STOMP на Java можно сделать следующее, чтобы добавить подтверждение получения:

@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// Во время инициализации...
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// При подписке...
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> {
    // Подписка готова...
});

На стороне сервера можно зарегистрировать ExecutorChannelInterceptor для brokerChannel и реализовать метод afterMessageHandled, который вызывается после обработки сообщений, включая подписки.

@MessageExceptionHandler

Приложение может использовать методы, аннотированные @MessageExceptionHandler, для обработки исключений из методов, аннотированных @MessageMapping. Можно объявить исключения в самой аннотации или через аргумент метода, если нужно получить доступ к экземпляру исключения. В следующем примере исключение объявляется через аргумент метода:

@Controller
public class MyController {
    // ...
    @MessageExceptionHandler
    public ApplicationError handleException(MyException exception) {
        // ...
        return appError;
    }
}

Методы с аннотацией @MessageExceptionHandler поддерживают гибкие сигнатуры методов и те же типы аргументов метода и возвращаемые значения, что и методы с аннотацией @MessageMapping.

Обычно методы, помеченные аннотацией @MessageExceptionHandler, применяются в пределах класса с аннотацией @Controller (или иерархии классов), в котором они объявлены. Если вам требуется, чтобы такие методы применялись более глобально (во всех контроллерах), то можно объявить их в классе, помеченном аннотацией @ControllerAdvice. Это сравнимо с аналогичными средствами поддержки, доступными в Spring MVC.

Отправка сообщений

Что если вам необходимо отправлять сообщения подключенным клиентам из любой части приложения? Любой компонент приложения может отправлять сообщения через brokerChannel. Самый простой способ сделать это – внедрить SimpMessagingTemplate и использовать его для отправки сообщений. Как правило, он внедряется по типу, как это показано в следующем примере:

@Controller
public class GreetingController {
    private SimpMessagingTemplate template;
    @Autowired
    public GreetingController(SimpMessagingTemplate template) {
        this.template = template;
    }
    @RequestMapping(path="/greetings", method=POST)
    public void greet(String greeting) {
        String text = "[" + getTimestamp() + "]:" + greeting;
        this.template.convertAndSend("/topic/greetings", text);
    }
}

Однако также можно определить его по имени(brokerMessagingTemplate), если существует другой бин того же типа.

Простой брокер

Встроенный простой брокер сообщений обрабатывает запросы на подписку от клиентов, хранит их в памяти и рассылает сообщения подключенным клиентам с подходящими адресами назначения. Брокер поддерживает пути адресов назначения, включая подписки на шаблоны адресов назначения в стиле Ant.

Приложения также могут использовать адреса назначения, разделенные точками (а не косой чертой).

Если сконфигурирован планировщик задач, простой брокер будет поддерживать heartbeat-сообщения STOMP. Чтобы сконфигурировать планировщик, можно объявить свой собственный бин TaskScheduler и установить его через MessageBrokerRegistry. Кроме того, можно использовать тот, который автоматически объявляется во встроенной конфигурации WebSocket, однако тогда понадобится аннотация @Lazy, чтобы избежать цикличности между встроенной конфигурацией WebSocket и вашим WebSocketMessageBrokerConfigurer. Например:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private TaskScheduler messageBrokerTaskScheduler;
    @Autowired
    public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
        this.messageBrokerTaskScheduler = taskScheduler;
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue/", "/topic/")
                .setHeartbeatValue(new long[] {10000, 20000})
                .setTaskScheduler(this.messageBrokerTaskScheduler);
        // ...
    }
}

Внешний брокер

Простой брокер отлично подходит для начала работы, но поддерживает только часть команд STOMP (он не поддерживает символы ack, подтверждения получения и некоторые другие функции), использует простой цикл отправки сообщений и не подходит для кластеризации. Как вариант, можно модернизировать приложения для использования полнофункционального брокера сообщений.

Обратитесь к документации по STOMP для выбранного вами брокера сообщений (например, RabbitMQ, ActiveMQ и другие), установите брокер и запустите его с активированной поддержкой STOMP. Затем можно активировать ретранслятор брокера STOMP (вместо простого брокера) в конфигурации Spring.

Следующий пример конфигурации предусматривает полнофункциональный брокер:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

В следующем примере показан XML-эквивалент конфигурации из предыдущего примера:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>
</beans>

Ретранслятор брокера STOMP в предыдущей конфигурации представляет собой MessageHandler из Spring, который обрабатывает сообщения, пересылая их внешнему брокеру сообщений. Для этого он устанавливает TCP-соединение с брокером, направляет ему все сообщения, а затем пересылает все сообщения, полученные от брокера, клиентам через их WebSocket-сессии. По сути, он действует как "ретранслятор", пересылающий сообщения в обоих направлениях.

Добавьте в проект зависимости io.projectreactor.netty:reactor-netty и io.netty:netty-all для управления TCP-соединениями.

Более того, компоненты приложений (такие как методы обработки HTTP-запросов, бизнес-сервисы и другие) также могут отправлять сообщения ретранслятору брокера.

По сути, ретранслятор брокера обеспечивает надежную и масштабируемую рассылку сообщений.

Подключение к брокеру

Ретранслятор брокера STOMP поддерживает единственное "системное" TCP-соединение с брокером. Это соединение используется исключительно для сообщений, исходящих от приложения на стороне сервера, но не для получения сообщений. Можно сконфигурировать учетные данные STOMP (т.е. заголовки login и passcode в фрейме STOMP) для этого соединения. Это будет отображено как в пространстве имен XML, так и в Java-конфигурации в виде свойств systemLogin и systemPasscode со значениями по умолчанию guest и guest.

Ретранслятор брокера STOMP также создает отдельное TCP-соединение для каждого подключенного клиента WebSocket. Можно сконфигурировать учетные данные STOMP, которые используются для всех TCP-соединений, созданных от имени клиентов. Это будет отображено как в пространстве имен XML, так и в Java-конфигурации в виде свойств clientLogin и clientPasscode со значениями по умолчанию guest и guest.

Ретранслятор брокера STOMP всегда устанавливает заголовки login и passcode в каждом фрейме CONNECT, который он пересылает брокеру от имени клиентов. Поэтому клиентам WebSocket не нужно устанавливать эти заголовки. Они игнорируются. Клиенты WebSocket должны прибегать к HTTP-аутентификации для защиты конечной точки WebSocket и установления личности клиента.

Ретранслятор брокера STOMP также отправляет и получает heartbeat-сообщения на брокер сообщений и от него через "системное" TCP-соединение. Можно настроить интервалы отправки и получения heartbeat-сообщений (по умолчанию 10 секунд). Если связь с брокером будет потеряна, ретранслятор брокера будет продолжать пытаться восстановить соединение каждые 5 секунд до тех пор, пока это не удастся сделать.

Любой бин Spring может реализовать ApplicationListener<BrokerAvailabilityEvent> для получения уведомлений, если "системное" соединение с брокером потеряно и затем восстановлено. Например, служба Stock Quote, рассылающая котировки акций, может прекратить попытки отправки сообщений при отсутствии активного "системного" соединения.

По умолчанию ретранслятор брокера STOMP всегда подключается – а при необходимости и переподключается, если соединение пропадет, – к одному и тому же хосту и порту. Если необходимо предоставлять несколько адресов при каждой попытке подключения, то можно настроить поставщика адресов вместо фиксированного хоста и порта. В следующем примере показано, как это сделать:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    // ...
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
        registry.setApplicationDestinationPrefixes("/app");
    }
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
                client -> client.addressSupplier(() -> ... ),
                new StompReactorNettyCodec());
    }
}

Вы также можете сконфигурировать ретранслятор брокера STOMP с помощью свойства virtualHost. Значение этого свойства устанавливается в качестве заголовка host каждого фрейма CONNECT и может быть полезным (например, в облачном окружении, где фактический хост, с которым устанавливается TCP-соединение, отличается от хоста, предоставляющего облачный STOMP-сервис).

Точки как разделители

Если сообщения направляются в методы, аннотированные @MessageMapping, они сопоставляются с AntPathMatcher. По умолчанию ожидается, что шаблоны будут использовать косую черту (/) в качестве разделителя. Это хорошее соглашение для веб-приложений, аналогичное URL-адресам протокола HTTP. Однако если вы больше привыкли к соглашениям для обмена сообщениями, то можете перейти на использование точки (.) в качестве разделителя.

В следующем примере показано, как это сделать с помощью Java-конфигурации:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    // ...
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.enableStompBrokerRelay("/queue", "/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

В следующем примере показан XML-эквивалент конфигурации из предыдущего примера:

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:websocket="http://www.springframework.org/schema/websocket"
        xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                https://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/websocket
                https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
        <websocket:stomp-endpoint path="/stomp"/>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>
    <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
        <constructor-arg index="0" value="."/>
    </bean>
</beans>

После этого контроллер сможет использовать точку (.) в качестве разделителя в методах, помеченных аннотацией @MessageMapping, как показано в следующем примере:

@Controller
@MessageMapping("red")
public class RedController {
    @MessageMapping("blue.{green}")
    public void handleGreen(@DestinationVariable String green) {
        // ...
    }
}

Теперь клиент может отправить сообщение на /app/red.blue.green123.

В предыдущем примере мы не меняли префиксы для "ретранслятора брокера", потому что они полностью зависят от внешнего брокера сообщений. Обратитесь к страницам документации STOMP, посвященным используемому вами брокеру, чтобы узнать, какие соглашения он поддерживает для заголовка адресов назначения.

С другой стороны, "простой брокер" полагается на сконфигурированный PathMatcher, поэтому, если вы переключите разделитель, это изменение также применится к брокеру и к тому, как брокер сопоставляет адреса назначения из сообщения с шаблонами в подписках.

Аутентификация

Каждый сеанс обмена сообщениями STOMP поверх WebSocket начинается с HTTP-запроса. Это может быть запрос на переход к веб-сокетам протокола WebSocket (то есть подтверждение установления связи по протоколу WebSocket) или, в случае запасных вариантов через протокол SockJS, серия HTTP-запросов на механизмы передачи для SockJS.

Многие веб-приложения уже имеют средства аутентификации и авторизации для защиты HTTP-запросов. Как правило, пользователь аутентифицируется через Spring Security с помощью какого-либо механизма, например, страницы входа в систему, базовой HTTP-аутентификации или другим способом. Контекст безопасности для аутентифицированного пользователя сохраняется в HTTP-сессии и связывается с последующими запросами в той же сессии на основе cookie.

Поэтому в случае подтверждения установления связи по протоколу WebSocket или HTTP-запросов на механизмы передачи для SockJS, как правило, уже существует аутентифицированный пользователь, доступный через HttpServletRequest#getUserPrincipal(). Spring автоматически связывает этого пользователя с созданной для него сессией WebSocket или SockJS и, впоследствии, со всеми сообщениями STOMP, передаваемыми через эту сессию через заголовок пользователя.

Одним словом, типичному веб-приложению не нужно осуществлять ничего сверх того, что оно уже осуществляет для обеспечения безопасности. Пользователь аутентифицируется на уровне HTTP-запроса с помощью контекста безопасности, который поддерживается на уровне HTTP-сессии на основе cookie (которая затем связывается с сессиями WebSocket или SockJS, созданными для этого пользователя), в результате чего заголовок пользователя добавляется к каждому Message, проходящему через приложение.

Протокол STOMP действительно имеет заголовки login и passcode в фрейме CONNECT. Они были первоначально разработаны для использования STOMP поверх TCP и необходимы для него. Однако, в случае использования STOMP поверх WebSocket, по умолчанию, Spring игнорирует заголовки аутентификации на уровне протокола STOMP, и предполагает, что пользователь уже аутентифицирован на уровне HTTP-механизмов передачи. Предполагается, что сессия WebSocket или SockJS будет содержать аутентифицированного пользователя.

Аутентификация с помощью токенов

Проект Spring Security OAuth обеспечивает поддержку безопасности на основе токенов, включая JSON Web Token (JWT). Вы можете использовать его в качестве механизма аутентификации в веб-приложениях, включая STOMP поверх WebSocket, как описано в предыдущем разделе (то есть для сохранения идентичности через сессию на основе cookie).

В то же время сессии на основе cookie не всегда подходят лучше (например, в приложениях, которые не поддерживают сессию на стороне сервера, или в мобильных приложениях, где обычно используются заголовки для аутентификации).

Протокол WebSocket, RFC 6455 "не предписывает никакого конкретного способа, которым серверы могут аутентифицировать клиентов во время подтверждения установления связи по протоколу WebSocket". На практике, однако, браузерные клиенты могут использовать только стандартные заголовки аутентификации (то есть базовую аутентификацию HTTP) или файлы cookie, но не могут (например) предоставлять кастомные заголовки. Аналогично, клиент SockJS на JavaScript не имеет возможности отправлять HTTP-заголовки по транспортным запросам SockJS. См. sockjs-client issue 196. Вместо этого он позволяет отправлять параметры запроса, которые вы можете использовать для отправки токена, но в этом есть и свои недостатки (например, токен может быть случайно зарегистрирован вместе с URL-адресом в журналах сервера).

Предыдущие ограничения относятся к клиентам на базе браузера и не распространяются на клиент STOMP на базе Java в Spring, который поддерживает отправку заголовков с запросами по протоколам WebSocket и SockJS.

Поэтому для приложений, в которых нужно избежать использования файлов cookie, может не найтись хороших альтернатив для аутентификации на уровне протокола HTTP. В таком случае вместо использования файлов cookie можно отдать предпочтение аутентификации с помощью заголовков на уровне протокола обмена сообщениями STOMP. Для этого необходимо выполнить два простых действия:

  1. Используйте клиент STOMP для передачи заголовков аутентификации во время подключения.

  2. Обработайте заголовки аутентификации с помощью ChannelInterceptor.

В следующем примере используется конфигурация на стороне сервера для регистрации кастомного перехватчика аутентификации. Обратите внимание, что перехватчику необходимо лишь пройти аутентификацию и установить заголовок пользователя в Message вида CONNECT. Spring отмечает и сохраняет аутентифицированного пользователя и связывает его с последующими сообщениями STOMP в той же сессии. В следующем примере показано, как зарегистрировать кастомный перехватчик аутентификации:

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    Authentication user = ... ; // access authentication header(s)
                    accessor.setUser(user);
                }
                return message;
            }
        });
    }
}

Также обратите внимание, что при использовании авторизации Spring Security для сообщений в настоящее время необходимо удостовериться, что конфигурация ChannelInterceptor для аутентификации находится по порядку перед конфигурацией Spring Security. Это лучше всего сделать, объявив пользовательский перехватчик в собственной реализации WebSocketMessageBrokerConfigurer, которая помечена аннотацией @Order(Ordered.HIGHEST_PRECEDENCE + 99).

Авторизация

Spring Security предусматривает авторизацию по субпротоколу WebSocket, который использует ChannelInterceptor для авторизации сообщений на основе содержащегося в них заголовка пользователя. Кроме того, Spring Session обеспечивает интеграцию протокола WebSocket, которая гарантирует, что HTTP-сессия пользователя не истечет, пока WebSocket-сессия все еще активна.

Пользовательские адреса назначения

Приложение может отправлять сообщения, адресованные конкретному пользователю, и для этого средства поддержки STOMP в Spring распознают адреса назначения с префиксом /user/. Например, клиент может подписаться на адрес назначения /user/queue/position-updates. UserDestinationMessageHandler обрабатывает этот адрес назначения и преобразует его в адрес назначения, уникальный для сессии пользователя (например, /queue/position-updates-user123). Это обеспечивает удобство подписки на общий адрес назначения и в то же время гарантирует отсутствие конфликтов с другими пользователями, подписавшимися на тот же адрес назначения, так что каждый пользователь сможет получать уникальные обновления биржевых позиций.

При работе с кастомными адресами назначения важно сконфигурировать префиксы адресов назначения брокера и приложения, иначе брокер будет обрабатывать сообщения с префиксом "/user", которые должны обрабатываться только UserDestinationMessageHandler.

На стороне передачи сообщения могут отправлять по адресу назначения, такому как /user/{username}/queue/position-updates, который в свою очередь преобразуется UserDestinationMessageHandler в один или несколько адресов назначения, по одному для каждой сессии, связанной с пользователем. Это позволяет любому компоненту приложения отправлять сообщения, предназначенные для конкретного пользователя, не зная ничего, кроме его имени и общего адреса назначения. Это также можно сделать при помощи аннотации и шаблона обмена сообщениями.

Метод обработки сообщений может отправлять сообщения пользователю, связанному с обрабатываемым сообщением, с помощью аннотации @SendToUser (также поддерживается на уровне класса для общего адреса назначения), как показано в следующем примере:

@Controller
public class PortfolioController {
    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

Если пользователь имеет более одной сессии, по умолчанию все сессии, подписанные на данный адрес назначения, являются целевыми. Однако иногда может потребоваться выбрать целью только ту сессию, которая отправила обрабатываемое сообщение. Сделать это можно, установив атрибут broadcast в false, как показано в следующем примере:

@Controller
public class MyController {
    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // здесь генерируется MyBusinessException
    }
    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}
Хотя адреса назначения пользователей обычно подразумевают аутентифицированного пользователя, это не является строго обязательным. Сессия WebSocket, не связанная с аутентифицированным пользователем, может подписаться на пользовательский адрес назначения. В таких случаях аннотация @SendToUser функционирует точно так же, как и при broadcast=false (то есть выбирает целью только ту сессию, которая отправила обрабатываемое сообщение).

Можно отправить сообщение на пользовательские адреса назначения из любого компонента приложения, например, внедрив SimpMessagingTemplate, созданный Java-конфигурацией или пространством имен XML. (Имя бина - brokerMessagingTemplate, если требуется для полного уточнения имени с помощью аннотации @Qualifier). В следующем примере показано, как это сделать:

@Service
public class TradeServiceImpl implements TradeService {
    private final SimpMessagingTemplate messagingTemplate;
    @Autowired
    public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }
    // ...
    public void afterTradeExecuted(Trade trade) {
        this.messagingTemplate.convertAndSendToUser(
                trade.getUserName(), "/queue/position-updates", trade.getResult());
    }
}
Если используются пользовательские адреса назначения с внешним брокером сообщений, то следует ознакомиться с документацией к брокеру касательно того, как управлять неактивными очередями, чтобы после завершения пользовательской сессии все уникальные очереди пользователя были удалены. Например, RabbitMQ создает очереди с автоматическим удалением, если испольщуются такие адреса назначения, как /exchange/amq.direct/position-updates. В этом случае клиент может подписаться на /user/exchange/amq.direct/position-updates. Аналогично, ActiveMQ имеет конфигурационные опции для очистки неактивных адресов назначения.

В сценарии с несколькими серверами приложений пользовательский адрес назначения может остаться неразрешенным, поскольку пользователь подключен к другому серверу. В таких случаях можно сконфигурировать адрес назначения на широковещательную рассылку неразрешенных сообщений, чтобы дргие сервера могли попытаться сделать это. Осуществить это можно при помощи свойства userDestinationBroadcast реестра MessageBrokerRegistry в Java-конфигурации и атрибута user-destination-broadcast элемента message-broker в XML.

Порядок сообщений

Сообщения от брокера публикуются через канал clientOutboundChannel, откуда они записываются в WebSocket-сессии. Поскольку канал поддерживается ThreadPoolExecutor, сообщения обрабатываются в разных потоках, а результирующая последовательность, полученная клиентом, может не совпадать с точным порядком публикации.

Если это является проблемой, то активируйте флаг setPreservePublishOrder, как показано в следующем примере:

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    protected void configureMessageBroker(MessageBrokerRegistry registry) {
        // ...
        registry.setPreservePublishOrder(true);
    }
}

В следующем примере показан XML-эквивалент конфигурации из предыдущего примера:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker preserve-publish-order="true">
        <!-- ... -->
    </websocket:message-broker>
</beans>

Если флаг установлен, сообщения в рамках одной клиентской сессии публикуются через clientOutboundChannel по одному за раз, что обеспечивает нужный порядок публикации. Обратите внимание, что это приводит к небольшому снижению производительности, поэтому необходимо активировать этот флаг только в случае необходимости.

События

Несколько событий ApplicationContext публикуются и могут быть получены путем реализации интерфейса ApplicationListener для Spring:

  • BrokerAvailabilityEvent: Указывает, когда брокер становится доступен или недоступен. Хотя "простой" брокер и становится доступным сразу при запуске и остается таковым во время работы приложения, "ретранслятор брокера" STOMP может потерять соединение с полнофункциональным брокером (например, при перезапуске брокера). Ретранслятор брокера содержит логику переподключения и восстанавливает "системное" соединение с брокером, когда он снова активируется. В результате это событие публикуется всякий раз, когда состояние меняется с подключенного на отключенное и наоборот. Компонентам, использующим SimpMessagingTemplate, нужно подписаться на это событие и избегать отправки сообщений, пока брокер недоступен. В любом случае, они должны быть подготовлены к обработке MessageDeliveryException при отправке сообщения.

  • SessionConnectEvent: Публикуется при получении нового фрейма CONNECT протокола STOMP для обозначения начала новой клиентской сессии. Событие содержит сообщение, представляющее подключение, включая идентификатор сессии, информацию о пользователе (если таковая имеется) и любые пользовательские заголовки, отправленные клиентом. Это полезно для отслеживания клиентских сессий. Компоненты, подписанные на это событие, могут обернуть содержащееся сообщение с помощью SimpMessageHeaderAccessor или StompMessageHeaderAccessor.

  • SessionConnectedEvent: Публикуется вскоре после SessionConnectEvent, когда брокер отправил фрейм CONNECTED протокола STOMP в ответ на фрейм CONNECT. На этом этапе STOMP-сессию можно считать полностью установленной.

  • SessionSubscribeEvent: Публикуется при получении нового фрейма SUBSCRIBE протокола STOMP.

  • SessionUnsubscribeEvent: Публикуется при получении нового фрейма UNSUBSCRIBE протокола STOMP.

  • SessionDisconnectEvent: Публикуется при завершении STOMP-сессии. Фрейм DISCONNECT может быть передан от клиента или автоматически сгенерирован при закрытии WebSocket-сессии. В некоторых случаях это событие публикуется более одного раза за сессию. Компоненты должны быть идемпотентными по отношению к нескольким событиям потери соединения.

Если вы используете полнофункциональный брокер, "ретранслятор брокера" STOMP автоматически повторно устанавливает "системное" соединение, если брокер становится временно недоступным. Однако клиентские соединения автоматически повторно не устанавливаются. Если heartbeat-сообщения включены, то клиент обычно реагирует на то, что брокер не отвечает в течение 10 секунд. В клиентах должна быть реализована их собственная логика повторного установления соединения.

Перехват

События передают уведомления о жизненном цикле STOMP-соединения, но не о каждом сообщении клиента. Приложения также могут регистрировать ChannelInterceptor для перехвата любого сообщения в любой части цепочки обработки. В следующем примере показано, как перехватывать входящие сообщения от клиентов:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new MyChannelInterceptor());
    }
}

Пользовательский ChannelInterceptor может использовать StompHeaderAccessor или SimpMessageHeaderAccessor для получения доступа к информации о сообщении, как показано в следующем примере:

public class MyChannelInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getStompCommand();
        // ...
        return message;
    }
}

Приложения также могут реализовать ExecutorChannelInterceptor, который является подинтерфейсом ChannelInterceptor с обратными вызовами в потоке, в котором обрабатываются сообщения. Хотя ChannelInterceptor вызывается один раз для каждого сообщения, отправленного в канал, ExecutorChannelInterceptor предусматривает перехватчики в потоке каждого MessageHandler, подписанного на сообщения из канала.

Обратите внимание, что, как и в случае с SessionDisconnectEvent, описанным ранее, сообщение DISCONNECT может быть получено от клиента, а также может быть автоматически сгенерировано при закрытии сессии WebSocket. В некоторых случаях перехватчик может перехватить это сообщение более одного раза для каждой сессии. Компоненты должны быть идемпотентными по отношению к нескольким событиям потери соединения.

Клиент STOMP

Spring предоставляет клиент STOMP поверх WebSocket и клиент STOMP поверх TCP.

Для начала можно создать и сконфигурировать WebSocketStompClient, как показано в следующем примере:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

В предыдущем примере можно заменить StandardWebSocketClient на SockJsClient, поскольку он также является реализацией WebSocketClient. SockJsClient может использовать WebSocket или механизм передачи на основе HTTP в качестве запасного варианта.

Далее можно установить соединение и предоставить обработчик для сессии STOMP, как показано в следующем примере:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

Когда сессия будет готова к использованию, обработчик получит уведомление, как показано в следующем примере:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

Как только сессия будет установлена, можно отправлять любые полезные данные, которые сериализуются с помощью сконфигурированного MessageConverter, как показано в следующем примере:

session.send("/topic/something", "payload");

Вы также можете подписаться на адреса назначения. Методы subscribe требуют наличия обработчика сообщений для подписки и возвращают дескриптор (хэндл) Subscription, который вы можете использовать для отписки. Для каждого полученного сообщения обработчик может указать целевой тип Object, в который должны быть десериализованы полезные данные, как показано в следующем примере:

session.subscribe("/topic/something", new StompFrameHandler() {
    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }
    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }
});

Чтобы включить heartbeat-сообщения STOMP, вы можете сконфигурировать WebSocketStompClient с использованием TaskScheduler и опционально настроить интервалы передачи heartbeat-сообщений (10 секунд для отсутствия действий записи, что вызовет отправку heartbeat-сообщения, и 10 секунд для отсутствия действий чтения, что вызовет закрытие соединения).

WebSocketStompClient отправляет heartbeat-сообщение только в случае бездействия, т.е. когда не отправляются другие сообщения. Это может стать проблемой при использовании внешнего брокера, поскольку сообщения с адресом назначения, не являющимся брокерским, отражают активность, но фактически не пересылаются брокеру. В этом случае можно сконфигурировать TaskScheduler при инициализации внешнего брокера, который обеспечивает пересылку heartbeat-сообщения брокеру также и в том случае, если отправляются лишь сообщения с адресом назначения, не являющимся брокерским.

Если вы используете WebSocketStompClient для тестов производительности, чтобы имитировать тысячи клиентов с одной машины, подумайте об отключении heartbeat-сообщений, поскольку каждое соединение планирует свои собственные heartbeat-задачи, а оптимизация этого процесса для большого количества клиентов, работающих на одной машине, отсутствует.

Протокол STOMP также обеспечивает поддержку подтверждения получения, если клиент должен добавить заголовок receipt, на который сервер отвечает фреймом RECEIPT после обработки отправки или подписки. Для обеспечения такой поддержки StompSession предлагает setAutoReceipt(boolean), который обеспечивает добавление заголовка receipt при каждом последующем событии отправки или подписки. Кроме того, можно вручную добавить заголовок подтверждения получения в StompHeaders. И отправка, и подписка возвращают экземпляр Receiptable, который можно использовать для регистрации обратных вызовов успешного и неудачного получения. Для этой функции необходимо сконфигурировать клиент с использованием TaskScheduler и заданием количества времени до истечения срока действия подтверждения получения (по умолчанию 15 секунд).

Обратите внимание, что StompSessionHandler сам является StompFrameHandler, что позволяет ему обрабатывать фреймы ERROR в дополнение к обратному вызову handleException, предназначенного для исключений, сгенерированных при обработке сообщений, и handleTransportError, предназначенного для ошибок на уровне передачи, включая ConnectionLostException.

Область доступности протокола WebSocket

Каждая сессия WebSocket имеет Map атрибутов. Map привязывается в качестве заголовка к входящим сообщениям клиента и может быть доступна из метода контроллера, как показано в следующем примере:

@Controller
public class MyController {
    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // ...
    }
}

Можно объявить управляемый Spring бин в области доступности websocket. Вы можете внедрять бины, входящие в область доступности WebSocket, в контроллеры и любые перехватчики канала, зарегистрированные через clientInboundChannel. Как правило, они являются одиночками, а их жизненный цикл дольше, чем каждая отдельная WebSocket-сессия. Поэтому для бинов, входящих в область доступности WebSocket, необходимо использовать режим прокси для области доступности, как показано в следующем примере:

@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
    @PostConstruct
    public void init() {
        // Вызывается после внедрения зависимостей
    }
    // ...
    @PreDestroy
    public void destroy() {
        // Вызывается после внедрения зависимостей
    }
}
@Controller
public class MyController {
    private final MyBean myBean;
    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }
    @MessageMapping("/action")
    public void handle() {
        // this.myBean из текущей сессии WebSocket
    }
}

Как и любая кастомная область доступности, Spring инициализирует новый экземпляр MyBean при первом обращении к нему из контроллера и сохраняет его в атрибутах сессии WebSocket. Этот же экземпляр впоследствии возвращается до окончания сессии. Для бинов, входящих в область доступности WebSocket, вызываются все методы жизненного цикла Spring, как показано в предыдущих примерах.

Производительность

Когда дело доходит до производительности, никакого единственно верного решения нет. На неё влияет множество факторов, включая размер и объем сообщений, выполнение прикладными методами работы, требующей блокировки, и внешние факторы (такие как скорость сети и другие детали). Цель этого раздела – кратко описать доступные варианты конфигурации, а также высказать некоторые идеи и порассуждать о масштабировании.

В приложении для обмена сообщениями сообщения передаются по каналам для асинхронного выполнения, которые поддерживаются пулами потоков. Конфигурирование такого приложения требует хорошего понимания принципов работы каналов и потока сообщений.

Очевидно, что начать следует с настройки пулов потоков, поддерживающих clientInboundChannel и clientOutboundChannel. По умолчанию обе конфигурации настроены на удвоенное количество доступных процессоров.

Если обработка сообщений в аннотированных методах в основном зависит от процессора, количество потоков для clientInboundChannel должно оставаться близким к количеству процессоров. Если выполняемая ими работа больше связана с вводом-выводом и требует блокировки или ожидания в базе данных или другой внешней системе, размер пула потоков, вероятно, нужно увеличить.

ThreadPoolExecutor имеет три важных свойства: размер основного пула потоков, максимальный размер пула потоков и вместимость очереди для хранения задач, для которых нет свободных потоков.

Часто возникает путаница: конфигурирование размера основного пула (например, 10) и максимального размера пула (например, 20) приводит к созданию пула потоков с 10-20 потоками. На самом деле, если оставить значение вместимости по умолчанию Integer.MAX_VALUE, пул потоков никогда не превысит размер основного пула, поскольку все дополнительные задачи ставятся в очередь.

Смотрите javadoc по ThreadPoolExecutor, чтобы узнать, как работают эти свойства, и ознакомиться с различными стратегиями постановки в очередь.

На стороне clientOutboundChannel речь идет об отправке сообщений клиентам WebSocket. Если клиенты находятся в быстрой сети, количество потоков должно оставаться близким к количеству доступных процессоров. Если они медленные или имеют низкую пропускную способность, то дольше потребляют сообщения и создают нагрузку на пул потоков. Поэтому требуется увеличить размер пула потоков.

Хотя рабочую нагрузку для канала clientInboundChannel возможно предсказать — в конце концов, она зависит от того, чем занимается приложение — сконфигурировать канал "clientOutboundChannel" уже сложнее, поскольку его работа основана на факторах, не зависящих от приложения. По этой причине с отправкой сообщений связаны два дополнительных свойства: sendTimeLimit и sendBufferSizeLimit. Вы можете использовать эти методы для того, чтобы сконфигурировать длительность отправки и объем данных, подлежащих буферизации, при отправке сообщений клиенту.

Общая идея заключается в том, что в любой момент времени для отправки клиенту может использоваться только один поток. Все дополнительные сообщения, тем временем, буферизируются, и вы можете использовать эти свойства, чтобы решить, сколько времени должна занимать отправка сообщения и сколько данных может быть буферизировано за это время. Важные дополнительные подробности см. в javadoc и документации к XML-схеме.

В следующем примере показана возможная конфигурация:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }
    // ...
}

В следующем примере показан XML-эквивалент конфигурации из предыдущего примера:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>
</beans>

Вы также можете использовать конфигурацию механизма передачи WebSocket, показанную ранее, для настройки максимально допустимого размера входящих сообщений STOMP. Теоретически, размер сообщения WebSocket может быть практически неограниченным. На практике серверы WebSocket устанавливают ограничения – например, 8 Кбайт для Tomcat и 64 Кбайт для Jetty. По этой причине клиенты STOMP (такие как webstomp-client из JavaScript и другие) разбивают большие сообщения STOMP при достижении размера в 16 Кбайт и отправляют их в виде нескольких сообщений WebSocket, что требует от сервера буферизации и повторного ассемблирования.

Средства поддержки STOMP поверх WebSocket в Spring осуществляют это, поэтому приложения могут конфигурировать максимальный размер STOMP-сообщений независимо от размеров сообщений, специфичных для WebSocket-сервера. Помните, что размер сообщения WebSocket автоматически корректируется, если необходимо, чтобы гарантированно передавать WebSocket-сообщения размером минимум 16 Кбайт.

В следующем примере показана одна из возможных конфигураций:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }
    // ...
}

В следующем примере показан XML-эквивалент конфигурации из предыдущего примера:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker>
        <websocket:transport message-size="131072" />
        <!-- ... -->
    </websocket:message-broker>
</beans>

Важным моментом в масштабировании является использование нескольких экземпляров приложений. В настоящее время нельзя сделать это с помощью простого брокера. Однако при использовании полнофункционального брокера (например, RabbitMQ) каждый экземпляр приложения подключается к брокеру, а сообщения, передаваемые одним экземпляром приложения, могут рассылаться через брокер клиентам WebSocket, подключенным через любые другие экземпляры приложений.

Мониторинг

Если вы используете аннотацию @EnableWebSocketMessageBroker или <websocket:message-broker>, ключевые компоненты инфраструктуры автоматически собирают статистику и счетчики, которые дают важное представление о внутреннем состоянии приложения. В конфигурации также объявляется бин типа WebSocketMessageBrokerStats, который собирает всю доступную информацию в одном месте и по умолчанию регистрирует ее на уровне INFO раз в 30 минут. Этот бин может быть экспортирован в JMX через MBeanExporter из Spring для просмотра во время выполнения (например, через jconsole из JDK). В следующем списке представлена краткая информация:

Клиентские сессии WebSocket
Текущая (Current)

Показывает, сколько клиентских сессий существует в настоящее время, с дальнейшей разбивкой по потоковой передачи данных через протокол WebSocket относительно протокола HTTP и сессиям поллинга через протокол SockJS.

Всего (Total)

Указывает, сколько всего сессий было установлено.

Аварийно закрытая
Сбои в соединении (Connect Failures)

Сессии, которые были установлены, но были закрыты после того, как в течение 60 секунд не было получено ни одного сообщения. Обычно это свидетельствует о проблемах с прокси-сервером или сетью.

Превышен лимит отправки (Send Limit Exceeded)

Сессии закрываются после превышения настроенного времени ожидания отправки или лимита буфера отправки, что может произойти с медленными клиентами (см. предыдущий раздел).

Ошибки передачи (Transport Errors)

Сессии закрываются после ошибки передачи, например, ввиду невозможности чтения или записи в WebSocket-соединение или HTTP-запрос или ответ.

Фреймы STOMP (STOMP Frames)

Общее количество обработанных фреймов CONNECT, CONNECTED и DISCONNECT, показывающее, сколько клиентов подключилось на уровне STOMP. Обратите внимание, что значение счетчика фрейма DISCONNECT может быть меньше, если сессии закрыты аварийно или если клиенты закрыты без отправки фрейма DISCONNECT.

Ретранслятор брокера STOMP (STOMP Broker Relay)
TCP-соединения

Указывает, сколько TCP-соединений от имени клиентских WebSocket-сессий установлено с брокером. Значение должно быть равно количеству клиентских WebSocket-сессий + 1 дополнительное общее "системное" соединение для отправки сообщений из приложения.

Фреймы STOMP (STOMP Frames)

Общее количество фреймов CONNECT, CONNECTED и DISCONNECT, переданных брокеру или полученных от него от имени клиентов. Обратите внимание, что фрейм DISCONNECT отправляется брокеру независимо от того, как была закрыта клиентская WebSocket-сессия. Поэтому меньшее количество фреймов DISCONNECT является признаком того, что брокер активно закрывает соединения (возможно, из-за не вовремя пришедшего heartbeat-сообщения, недействительного входного фрейма или другой проблемы).

Входящий канал клиента

Статистика из пула потоков, поддерживающего clientInboundChannel, которая дает представление о состоянии обработки входящих сообщений. Задачи, стоящие в очереди, являются признаком того, что приложение может слишком медленно обрабатывает сообщения. Если есть задачи, связанные с вводом-выводом (например, медленные запросы к базе данных, HTTP-запросы к стороннему REST API и так далее), рассмотрите возможность увеличения размера пула потоков.

Исходящий канал клиента (Client Outbound Channel)

Статистика из пула потоков, поддерживающего clientOutboundChannel, которая дает представление о работоспособности широковещательной рассылки сообщений клиентам. Задачи, стоящие в очереди, являются признаком того, что клиенты слишком медленно принимают сообщения. Один из способов решения этой проблемы – увеличить размер пула потоков, чтобы вместить ожидаемое количество одновременных медленных клиентов. Другой вариант – снизить ограничения на время ожидания отправки и размер буфера отправки (см. предыдущий раздел).

Планировщик задач SockJS (SockJS Task Scheduler)

Статистика из пула потоков планировщика задач SockJS, который используется для отправки heartbeat-сообщений. Обратите внимание, что при согласовании heartbeat-сообщений на уровне STOMP отправка heartbeat-сообщений для SockJS отключена.

Тестирование

Существует два основных подхода к тестированию приложений, когда вы используете поддержку STOMP поверх WebSocket в Spring. Первый заключается в написании тестов на стороне сервера для проверки функциональности контроллеров и их аннотированных методов обработки сообщений. Второй – это написание полных сквозных тестов, которые включают в себя запуск клиента и сервера.

Эти два подхода не являются взаимоисключающими. Напротив, каждому из них отведено свое место в общей стратегии тестирования. Тесты на стороне сервера более узконаправленные, их легче писать и сопровождать. С другой стороны, сквозные интеграционные тесты являются более полными и охватывают гораздо больше, но они также более сложны в написании и сопровождении.

Простейшей формой тестирования на стороне сервера является написание модульных тестов контроллера. Однако это недостаточно практично, поскольку многое из того, что делает контроллер, зависит от его аннотаций. Чистые модульные тесты просто не смогут протестировать его работу.

В идеале, тестируемые контроллеры должны вызываться в том виде, в котором они находятся во время выполнения программы, подобно подходу к тестированию контроллеров, обрабатывающих HTTP-запросы, с помощью фреймворка Spring MVC Test – то есть без запуска контейнера сервлетов, но с использованием Spring Framework для вызова аннотированных контроллеров. Как и в случае с Spring MVC Test, здесь у вас есть две возможные альтернативы: использовать "контекстно-ориентированную" или "автономную" конфигурацию:

  • Загрузите актуальную конфигурацию Spring с помощью фреймворка Spring TestContext, внедрите clientInboundChannel в качестве тестового поля и используйте его для отправки сообщений, которые будут обрабатываться методами контроллера.

  • Вручную установите минимальную инфраструктуру фреймворка Spring, необходимую для вызова контроллеров (а именно SimpAnnotationMethodMessageHandler) и передавайте сообщения для контроллеров непосредственно ему.

Оба этих сценария настройки продемонстрированы в тестах примера приложения для портфеля акций.

Второй подход заключается в создании сквозных интеграционных тестов. Для этого необходимо запустить WebSocket-сервер во встроенном режиме и подключиться к нему как WebSocket-клиент, который отправляет WebSocket-сообщения, содержащие STOMP-фреймы. Тесты примера приложения для портфеля акций также применяют этот подход, используя Tomcat в качестве встроенного сервера WebSocket и простого клиента STOMP для целей тестирования.