Протокол WebSocket, RFC 6455, предусматривает стандартизированный способ установления полнодуплексного двустороннего канала связи между клиентом и сервером поверх одного TCP-соединения. Это отличный от HTTP протокол TCP, но он предназначен для работы поверх HTTP, использует порты 80 и 443 и позволяет повторно использовать существующие правила брандмауэра.
Взаимодействие WebSocket начинается с HTTP-запроса, который использует HTTP-заголовок Upgrade
для обновления или, в данном случае, для перехода на протокол WebSocket. В следующем примере показано такое взаимодействие:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
- Заголовок
Upgrade
. - Использование соединения
Upgrade
.
Вместо обычного кода состояния 200 сервер с поддержкой WebSocket выдает сообщение, похожее на следующее:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
- Переключатель протоколов
После успешного подтверждения установления связи сокет TCP, лежащий в основе запроса на обновление HTTP, остается открытым, чтобы клиент и сервер могли продолжать отправлять и получать сообщения.
Полное введение в работу веб-сокетов протокола WebSocket выходит за рамки этого документа. См. "RFC 6455", главу, посвященную WebSocket в HTML5, или любое из многочисленных описаний и учебных пособий в Интернете.
Обратите внимание, что если сервер WebSocket работает за веб-сервером (например, nginx), то, скорее всего, потребуется сконфигурировать его на передачу запросов на обновление WebSocket серверу. Аналогичным образом, если приложение работает в облачной среде, проверьте инструкции поставщика облачных услуг, касающиеся поддержки WebSocket.
HTTP против WebSocket
Несмотря на то, что протокол WebSocket разработан как HTTP-совместимый и исходит из HTTP-запроса, важно понимать, что эти два протокола влекут за собой совершенно разные архитектуры и модели программирования приложений.
В HTTP и REST приложение моделируется как множество URL-адресов. Чтобы взаимодействовать с приложением, клиенты обращаются к этим URL-адресам в стиле "запрос-ответ". Серверы направляют запросы к соответствующему обработчику на основе URL-адреса, метода и заголовков HTTP.
Напротив, в веб-сокетах протокола WebSocket для первоначального подключения обычно используется только один URL-адрес. Впоследствии все сообщения приложения передаются по этому же TCP-соединению. Это указывает на совершенно иную асинхронную, управляемую событиями архитектуру обмена сообщениями.
WebSocket также является низкоуровневым транспортным протоколом, который, в отличие от HTTP, не предписывает никакой семантики содержимому сообщений. Это означает, что не существует способа маршрутизации или обработки сообщения, пока между клиентом и сервером не будет согласована семантика сообщения.
Клиенты и серверы на WebSocket могут согласовать использование протокола обмена сообщениями более высокого уровня (например, STOMP) с помощью заголовка Sec-WebSocket-Protocol
в HTTP-запросе подтверждения установления связи. В отсутствие этого им нужно придумать свои собственные соглашения.
Когда следует использовать веб-сокеты протокола WebSocket?
Веб-сокеты протокола WebSocket могут сделать веб-страницу динамичной и интерактивной. Однако во многих случаях сочетание Ajax и HTTP-потока или длинного поллинга (опроса) может быть простым и эффективным решением.
Например, новости, почта и социальные ленты должны обновляться динамически, но вполне допустимо делать это раз в несколько минут. С другой стороны, приложения для совместной работы, игры и финансовые приложения должны чаще работать в реальном времени.
Сама по себе задержка не является решающим фактором. Если объем сообщений относительно невелик (например, при мониторинге сетевых сбоев), эффективным решением может стать потоковая передача или поллинг по HTTP-протоколу. Именно сочетание низкой задержки, высокой частоты и большого объема является наилучшим аргументом в пользу использования WebSocket.
Помните также, что в Интернете ограничительные прокси-серверы, которые находятся вне вашего контроля, могут препятствовать взаимодействию WebSocket, либо потому что они не настроены на передачу заголовка Upgrade
, либо потому что они закрывают долговременные соединения, которые, как представляется, неактивны. Это означает, что использование WebSocket для внутренних приложений в рамках брандмауэра является более простым решением, чем для публичных приложений.
WebSocket API
Spring Framework предоставляет API для протокола WebSocket, который можно использовать для написания клиентских и серверных приложений, обрабатывающих сообщения WebSocket.
Сервер
Чтобы создать WebSocket-сервер, можно сначала создать WebSocketHandler
. В следующем примере показано, как это сделать:
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
Затем можно сопоставить его с URL-адресом:
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
Если используется конфигурация WebFlux, то больше ничего делать не потребуется, в противном случае, если конфигурация WebFlux не используется, потребуется объявить WebSocketHandlerAdapter
, как это показано ниже:
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
Метод handle
в WebSocketHandler
принимает WebSocketSession
и возвращает Mono<Void>
, чтобы обозначить, что обработка сессии приложением завершена. Сессия обрабатывается через два потока, один для входящих и один для исходящих сообщений. В следующей таблице описаны два метода, которые работают с потоками:
Метод WebSocketSession |
Описание |
---|---|
|
Предоставляет доступ к потоку входящих сообщений и завершает свою работу при закрытии соединения. |
|
|
Принимает источник исходящих сообщений, записывает сообщения и возвращает Mono<Void>
, который завершает свою работу, когда источник прекращает свою работу, а запись закончена.
-
Завершается либо входящий, либо исходящий поток сообщений.
-
Входящий поток завершается (то есть соединение закрывается), а исходящий поток бесконечен.
-
В выбранный момент, через метод
close
дляWebSocketSession
.
Если входящий и исходящий потоки сообщений скомпонованы вместе, нет необходимости проверять, открыто ли соединение, поскольку Reactive Streams сигнализирует о завершении активности. Входящий поток получает сигнал о завершении или ошибке, а исходящий поток получает сигнал об отмене.
Самая базовая реализация обработчика – это та, которая обрабатывает входящий поток. В следующем примере показана такая реализация:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then();
}
}
- Получаем доступ к потоку входящих сообщений.
- Осуществляем какие-нибудь действия над каждым сообщением.
- Выполняем вложенные асинхронные операции, которые используют содержимое сообщения.
- Возвращаем
Mono<Void>
, который завершается при получении "completes".
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive()
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then()
}
}
- Получаем доступ к потоку входящих сообщений.
- Осуществляем какие-нибудь действия над каждым сообщением.
- Выполняем вложенные асинхронные операции, которые используют содержимое сообщения.
- Возвращаем
Mono<Void>
, который завершается при получении "completes".
message.retain()
на основных серверах, использующих объединенные буферы данных (например, Netty). В противном случае буфер данных может быть освобожден до того, как будут прочитаны данные.Следующая реализация объединяет входящий и исходящий потоки:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value));
return session.send(output);
}
}
- Обрабатываем поток входящих сообщений.
- Создаем исходящее сообщение, произведя объединенный поток.
- Возвращаем
Mono<Void>
, который не будет завершен, пока мы продолжим получать данные.
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive()
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") }
return session.send(output)
}
}
- Обрабатываем поток входящих сообщений.
- Создаем исходящее сообщение, произведя объединенный поток.
- Возвращаем
Mono<Void>
, который не будет завершен, пока мы продолжим получать данные.
Входящий и исходящий потоки могут быть независимыми и объединяться только для завершения, как это показано в следующем примере:
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage));
return Mono.zip(input, output).then();
}
}
- Обрабатываем поток входящих сообщений.
- Отправляем исходящие сообщения.
- Объединяем потоки и возвращением
Mono<Void>
, который завершается, если любой из потоков прекращается.
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage))
return Mono.zip(input, output).then()
}
}
- Обрабатываем поток входящих сообщений.
- Отправляем исходящие сообщения.
- Объединяем потоки и возвращением
Mono<Void>
, который завершается, если любой из потоков прекращается.
DataBuffer
DataBuffer
- это представление для байтового буфера в WebFlux. Важно понимать, что на некоторых серверах, таких как Netty, байтовые буферы объединены в пул и подсчитываются по ссылкам, и должны быть освобождены при потреблении, чтобы избежать утечки памяти.
При работе на Netty приложениям нужно использовать DataBufferUtils.retain(dataBuffer)
, если требуется удерживать буферы входных данных без их освобождения, а затем использовать DataBufferUtils.release(dataBuffer)
, когда данные из буферов будут потреблены.
Подтверждение установления связи
WebSocketHandlerAdapter
делегирует полномочия WebSocketService
. По умолчанию это экземпляр HandshakeWebSocketService
, который выполняет основную проверку WebSocket-запроса, а затем использует RequestUpgradeStrategy
для используемого сервера. В настоящее время имеется встроенная поддержка Reactor Netty, Tomcat, Jetty и Undertow.
HandshakeWebSocketService
открывает свойство sessionAttributePredicate
, которое позволяет установить Predicate<String>
для извлечения атрибутов из WebSession
и вставки их в атрибуты WebSocketSession
.
Серверная конфигурация
RequestUpgradeStrategy
для каждого сервера открывает конфигурацию, специфичную для основного серверного механизма WebSocket. При использовании Java-конфигурации WebFlux можно настраивать такие свойства, или же, если конфигурация WebFlux не используется, то можно использовать те свойства, что приведены ниже:
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
Смотрите стратегию обновления для вашего сервера, чтобы узнать, какие варианты доступны. В настоящее время лишь Tomcat и Jetty предоставляют такие опции.
CORS
Самый простой способ настроить CORS и ограничить доступ к конечной точке WebSocket – это заставить ваш WebSocketHandler
реализовывать CorsConfigurationSource
и возвращать CorsConfiguration
с использованием допустимых источников, заголовков и другой информации. Если это сделать невозможно, также можно установить свойство corsConfigurations
для SimpleUrlHandler
, чтобы задать настройки CORS по URL-шаблону. Если заданы оба, они объединяются с помощью метода combine
для CorsConfiguration
.
Клиент
Spring WebFlux предусматривает абстракцию WebSocketClient
с реализациями для Reactor Netty, Tomcat, Jetty, Undertow и стандартного Java (то есть JSR-356).
WebSocketSession
, позволяющей использовать API, специфичный для Tomcat, чтобы приостанавливать получение сообщений для обеспечения обратной реакции.Чтобы начать WebSocket-сессию, можно создать экземпляр клиента и использовать его методы execute
:
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
Некоторые клиенты, такие как Jetty, реализуют Lifecycle
и их нужно останавливать и запускать еще до того, как можно будет использовать. Все клиенты имеют параметры конструктора, связанные с конфигурацией базового клиента WebSocket.