Протокол WebSocket, RFC 6455, предусматривает стандартизированный способ установления полнодуплексного двустороннего канала связи между клиентом и сервером поверх одного TCP-соединения. Это отличный от HTTP протокол TCP, но он предназначен для работы поверх HTTP, использует порты 80 и 443 и позволяет повторно использовать существующие правила брандмауэра.

Взаимодействие WebSocket начинается с HTTP-запроса, который использует HTTP-заголовок Upgrade для обновления или, в данном случае, для перехода на протокол WebSocket. В следующем примере показано такое взаимодействие:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
  1. Заголовок Upgrade.
  2. Использование соединения Upgrade.

Вместо обычного кода состояния 200 сервер с поддержкой WebSocket выдает сообщение, похожее на следующее:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
  1. Переключатель протоколов

После успешного подтверждения установления связи сокет TCP, лежащий в основе запроса на обновление HTTP, остается открытым, чтобы клиент и сервер могли продолжать отправлять и получать сообщения.

Полное введение в работу веб-сокетов протокола WebSocket выходит за рамки этого документа. См. "RFC 6455", главу, посвященную WebSocket в HTML5, или любое из многочисленных описаний и учебных пособий в Интернете.

Обратите внимание, что если сервер WebSocket работает за веб-сервером (например, nginx), то, скорее всего, потребуется сконфигурировать его на передачу запросов на обновление WebSocket серверу. Аналогичным образом, если приложение работает в облачной среде, проверьте инструкции поставщика облачных услуг, касающиеся поддержки WebSocket.

HTTP против WebSocket

Несмотря на то, что протокол WebSocket разработан как HTTP-совместимый и исходит из HTTP-запроса, важно понимать, что эти два протокола влекут за собой совершенно разные архитектуры и модели программирования приложений.

В HTTP и REST приложение моделируется как множество URL-адресов. Чтобы взаимодействовать с приложением, клиенты обращаются к этим URL-адресам в стиле "запрос-ответ". Серверы направляют запросы к соответствующему обработчику на основе URL-адреса, метода и заголовков HTTP.

Напротив, в веб-сокетах протокола WebSocket для первоначального подключения обычно используется только один URL-адрес. Впоследствии все сообщения приложения передаются по этому же TCP-соединению. Это указывает на совершенно иную асинхронную, управляемую событиями архитектуру обмена сообщениями.

WebSocket также является низкоуровневым транспортным протоколом, который, в отличие от HTTP, не предписывает никакой семантики содержимому сообщений. Это означает, что не существует способа маршрутизации или обработки сообщения, пока между клиентом и сервером не будет согласована семантика сообщения.

Клиенты и серверы на WebSocket могут согласовать использование протокола обмена сообщениями более высокого уровня (например, STOMP) с помощью заголовка Sec-WebSocket-Protocol в HTTP-запросе подтверждения установления связи. В отсутствие этого им нужно придумать свои собственные соглашения.

Когда следует использовать веб-сокеты протокола WebSocket?

Веб-сокеты протокола WebSocket могут сделать веб-страницу динамичной и интерактивной. Однако во многих случаях сочетание Ajax и HTTP-потока или длинного поллинга (опроса) может быть простым и эффективным решением.

Например, новости, почта и социальные ленты должны обновляться динамически, но вполне допустимо делать это раз в несколько минут. С другой стороны, приложения для совместной работы, игры и финансовые приложения должны чаще работать в реальном времени.

Сама по себе задержка не является решающим фактором. Если объем сообщений относительно невелик (например, при мониторинге сетевых сбоев), эффективным решением может стать потоковая передача или поллинг по HTTP-протоколу. Именно сочетание низкой задержки, высокой частоты и большого объема является наилучшим аргументом в пользу использования WebSocket.

Помните также, что в Интернете ограничительные прокси-серверы, которые находятся вне вашего контроля, могут препятствовать взаимодействию WebSocket, либо потому что они не настроены на передачу заголовка Upgrade, либо потому что они закрывают долговременные соединения, которые, как представляется, неактивны. Это означает, что использование WebSocket для внутренних приложений в рамках брандмауэра является более простым решением, чем для публичных приложений.

WebSocket API

Spring Framework предоставляет API для протокола WebSocket, который можно использовать для написания клиентских и серверных приложений, обрабатывающих сообщения WebSocket.

Сервер

Чтобы создать WebSocket-сервер, можно сначала создать WebSocketHandler. В следующем примере показано, как это сделать:

Java
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}

Затем можно сопоставить его с URL-адресом:

Java
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}

Если используется конфигурация WebFlux, то больше ничего делать не потребуется, в противном случае, если конфигурация WebFlux не используется, потребуется объявить WebSocketHandlerAdapter, как это показано ниже:

Java
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

Метод handle в WebSocketHandler принимает WebSocketSession и возвращает Mono<Void>, чтобы обозначить, что обработка сессии приложением завершена. Сессия обрабатывается через два потока, один для входящих и один для исходящих сообщений. В следующей таблице описаны два метода, которые работают с потоками:

Метод WebSocketSession Описание

Flux<WebSocketMessage> receive()

Предоставляет доступ к потоку входящих сообщений и завершает свою работу при закрытии соединения.

Mono<Void> send(Publisher<WebSocketMessage>)

WebSocketHandler должен объединять входящий и исходящий потоки в единый поток и вернуть Mono<Void>, который отражает завершение этого потока. В зависимости от требований приложения, единый поток завершается, когда:

Принимает источник исходящих сообщений, записывает сообщения и возвращает Mono<Void>, который завершает свою работу, когда источник прекращает свою работу, а запись закончена.

  • Завершается либо входящий, либо исходящий поток сообщений.

  • Входящий поток завершается (то есть соединение закрывается), а исходящий поток бесконечен.

  • В выбранный момент, через метод close для WebSocketSession.

Если входящий и исходящий потоки сообщений скомпонованы вместе, нет необходимости проверять, открыто ли соединение, поскольку Reactive Streams сигнализирует о завершении активности. Входящий поток получает сигнал о завершении или ошибке, а исходящий поток получает сигнал об отмене.

Самая базовая реализация обработчика – это та, которая обрабатывает входящий поток. В следующем примере показана такая реализация:

Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
        .doOnNext(message -> {
            // ...                  (2)
        })
        .concatMap(message -> {
            // ...                  (3)
        })
        .then();
}
}
  1. Получаем доступ к потоку входящих сообщений.
  2. Осуществляем какие-нибудь действия над каждым сообщением.
  3. Выполняем вложенные асинхронные операции, которые используют содержимое сообщения.
  4. Возвращаем Mono<Void>, который завершается при получении "completes".
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive()
        .doOnNext {
            // ...                  (2)
        }
        .concatMap {
            // ...                  (3)
        }
        .then()
}
}
  1. Получаем доступ к потоку входящих сообщений.
  2. Осуществляем какие-нибудь действия над каждым сообщением.
  3. Выполняем вложенные асинхронные операции, которые используют содержимое сообщения.
  4. Возвращаем Mono<Void>, который завершается при получении "completes".
Для вложенных асинхронных операций может потребоваться вызов message.retain() на основных серверах, использующих объединенные буферы данных (например, Netty). В противном случае буфер данных может быть освобожден до того, как будут прочитаны данные.

Следующая реализация объединяет входящий и исходящий потоки:

Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
        .doOnNext(message -> {
            // ...
        })
        .concatMap(message -> {
            // ...
        })
        .map(value -> session.textMessage("Echo " + value));
return session.send(output);
}
}
  1. Обрабатываем поток входящих сообщений.
  2. Создаем исходящее сообщение, произведя объединенный поток.
  3. Возвращаем Mono<Void>, который не будет завершен, пока мы продолжим получать данные.
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive()
        .doOnNext {
            // ...
        }
        .concatMap {
            // ...
        }
        .map { session.textMessage("Echo $it") }
return session.send(output)
}
}
  1. Обрабатываем поток входящих сообщений.
  2. Создаем исходящее сообщение, произведя объединенный поток.
  3. Возвращаем Mono<Void>, который не будет завершен, пока мы продолжим получать данные.

Входящий и исходящий потоки могут быть независимыми и объединяться только для завершения, как это показано в следующем примере:

Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()
        .doOnNext(message -> {
            // ...
        })
        .concatMap(message -> {
            // ...
        })
        .then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage));
return Mono.zip(input, output).then();
}
}
  1. Обрабатываем поток входящих сообщений.
  2. Отправляем исходящие сообщения.
  3. Объединяем потоки и возвращением Mono<Void>, который завершается, если любой из потоков прекращается.
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
        .doOnNext {
            // ...
        }
        .concatMap {
            // ...
        }
        .then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage))
return Mono.zip(input, output).then()
}
}
  1. Обрабатываем поток входящих сообщений.
  2. Отправляем исходящие сообщения.
  3. Объединяем потоки и возвращением Mono<Void>, который завершается, если любой из потоков прекращается.

DataBuffer

DataBuffer - это представление для байтового буфера в WebFlux. Важно понимать, что на некоторых серверах, таких как Netty, байтовые буферы объединены в пул и подсчитываются по ссылкам, и должны быть освобождены при потреблении, чтобы избежать утечки памяти.

При работе на Netty приложениям нужно использовать DataBufferUtils.retain(dataBuffer), если требуется удерживать буферы входных данных без их освобождения, а затем использовать DataBufferUtils.release(dataBuffer), когда данные из буферов будут потреблены.

Подтверждение установления связи

WebSocketHandlerAdapter делегирует полномочия WebSocketService. По умолчанию это экземпляр HandshakeWebSocketService, который выполняет основную проверку WebSocket-запроса, а затем использует RequestUpgradeStrategy для используемого сервера. В настоящее время имеется встроенная поддержка Reactor Netty, Tomcat, Jetty и Undertow.

HandshakeWebSocketService открывает свойство sessionAttributePredicate, которое позволяет установить Predicate<String> для извлечения атрибутов из WebSession и вставки их в атрибуты WebSocketSession.

Серверная конфигурация

RequestUpgradeStrategy для каждого сервера открывает конфигурацию, специфичную для основного серверного механизма WebSocket. При использовании Java-конфигурации WebFlux можно настраивать такие свойства, или же, если конфигурация WebFlux не используется, то можно использовать те свойства, что приведены ниже:

Java
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
    WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
    setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}

Смотрите стратегию обновления для вашего сервера, чтобы узнать, какие варианты доступны. В настоящее время лишь Tomcat и Jetty предоставляют такие опции.

CORS

Самый простой способ настроить CORS и ограничить доступ к конечной точке WebSocket – это заставить ваш WebSocketHandler реализовывать CorsConfigurationSource и возвращать CorsConfiguration с использованием допустимых источников, заголовков и другой информации. Если это сделать невозможно, также можно установить свойство corsConfigurations для SimpleUrlHandler, чтобы задать настройки CORS по URL-шаблону. Если заданы оба, они объединяются с помощью метода combine для CorsConfiguration.

Клиент

Spring WebFlux предусматривает абстракцию WebSocketClient с реализациями для Reactor Netty, Tomcat, Jetty, Undertow и стандартного Java (то есть JSR-356).

Клиент Tomcat фактически является расширением стандартного Java-клиента с некоторой дополнительной функциональностью в части обработки WebSocketSession, позволяющей использовать API, специфичный для Tomcat, чтобы приостанавливать получение сообщений для обеспечения обратной реакции.

Чтобы начать WebSocket-сессию, можно создать экземпляр клиента и использовать его методы execute:

Java
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
        .doOnNext(System.out::println)
        .then());
Kotlin
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
    session.receive()
            .doOnNext(::println)
    .then()
}

Некоторые клиенты, такие как Jetty, реализуют Lifecycle и их нужно останавливать и запускать еще до того, как можно будет использовать. Все клиенты имеют параметры конструктора, связанные с конфигурацией базового клиента WebSocket.