Протокол WebSocket, RFC 6455, передбачає стандартизований спосіб встановлення повнодуплексного двостороннього каналу зв'язку між клієнтом і сервером поверх одного TCP-з'єднання. Це відмінний від HTTP протокол TCP, але він призначений для роботи поверх HTTP, використовує порти 80 і 443 і дозволяє повторно використовувати існуючі правила брандмауера.

Взаємодія WebSocket починається з HTTP-запиту, який використовує HTTP-заголовок Upgrade для оновлення або, у цьому випадку, переходу до протоколу WebSocket. У цьому прикладі показано таку взаємодію:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket 
Connection: Upgrade 
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
  1. Заголовок Upgrade.
  2. Використання з'єднання Upgrade.

Замість звичайного коду стану 200 сервер з підтримкою WebSocket видає повідомлення, схоже на наступне:

HTTP/1.1 101 Switching Protocols 
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
  1. Перемикач протоколів

Після успішного підтвердження встановлення зв'язку сокет TCP, що лежить в основі запиту на оновлення HTTP, залишається відкритим, щоб клієнт і сервер могли продовжувати надсилати та отримувати повідомлення.

Повний вступ в роботу вебсокетів протоколу WebSocket виходить за межі цього документа. Див. "RFC 6455", розділ, присвячений WebSocket в HTML5, або будь-який з численних описів та навчальних посібників в Інтернеті.

Зверни увагу, що якщо сервер WebSocket працює за вебсервером (наприклад, nginx), то, швидше за все, потрібно налаштувати його на передачу запитів на оновлення WebSocket серверу. Аналогічно, якщо програма працює в хмарному середовищі, перевір інструкції постачальника хмарних послуг щодо підтримки WebSocket.

HTTP проти WebSocket

Незважаючи на те, що протокол WebSocket розроблений як HTTP-сумісний і виходить з HTTP-запиту, важливо розуміти, що ці два протоколи спричиняють абсолютно різні архітектури та моделі програмування додатків.

У HTTP і REST додаток моделюється як безліч URL-адрес. Щоб взаємодіяти з програмою, клієнти звертаються до цих URL-адрес у стилі "запит-відповідь". Сервери надсилають запити до відповідного обробника на основі URL-адреси, методу та заголовків HTTP.

Натомість у веб-сокетах WebSocket для початкового підключення зазвичай використовується лише одна URL-адреса. Згодом всі повідомлення програми передаються цим же TCP-з'єднання. Це вказує на зовсім іншу асинхронну, керовану подіями архітектуру обміну повідомленнями.

WebSocket також є низькорівневим транспортним протоколом, який, на відміну від HTTP, не передбачає жодної семантики вмісту повідомлень. Це означає, що не існує способу маршрутизації або обробки повідомлення, доки між клієнтом та сервером не буде узгоджено семантику повідомлення.

Клієнти та сервери на WebSocket можуть узгодити використання протоколу обміну повідомленнями вищого рівня (наприклад, STOMP) з за допомогою заголовка Sec-WebSocket-Protocol у HTTP-запиті підтвердження встановлення зв'язку. За відсутності цього їм потрібно вигадати власні угоди.

Коли слід використовувати вебсокети протоколу WebSocket?

Вебсокети протоколу WebSocket можуть зробити вебсторінку динамічною та інтерактивною. Однак у багатьох випадках поєднання Ajax і HTTP-потоку або довгого поллінгу (опитування) може бути простим і ефективним рішенням. З іншого боку, програми для спільної роботи, ігри та фінансові програми повинні частіше працювати в реальному часі.

Сама по собі затримка не є вирішальним фактором. Якщо обсяг повідомлень відносно невеликий (наприклад, при моніторингу мережних збоїв), ефективним рішенням може стати потокова передача або полінг по протоколу HTTP. Саме поєднання низької затримки, високої частоти і великого об'єму є найкращим аргументом на користь використання WebSocket.

Також пам'ятайте, що в інтернеті обмежувальні проксі-сервери, які знаходяться поза вашим контролем, можуть чинити перепону взаємодії WebSocket, або тому, що вони не налаштовані на передачу заголовка Upgrade, або тому, що вони закривають довгострокові з'єднання, які, здається, неактивні. Це означає, що використання WebSocket для внутрішніх програм у межах брандмауера є більш простим рішенням, ніж для публічних програм.

WebSocket API

Spring Framework надає API для протоколу WebSocket, який можна використовувати для написання клієнтських та серверних додатків, що обробляють повідомлення WebSocket.

Сервер

Щоб створити WebSocket-сервер, можна спочатку створити WebSocketHandler. У цьому прикладі показано, як це зробити:

Java
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}

Потім можна порівняти його з URL-адресою:

Java
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}

Якщо використовується конфігурація WebFlux, то більше нічого робити не потрібно, інакше, якщо конфігурація WebFlux не використовується, потрібно оголосити WebSocketHandlerAdapter, як це показано нижче:

Java
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

Метод handle у WebSocketHandler приймає WebSocketSession і повертає Mono<Void>, щоб позначити, що обробка сесії програмою завершена. Сесія обробляється через два потоки, один для вхідних та один для вихідних повідомлень. У наступній таблиці описано два методи, які працюють з потоками:

Метод WebSocketSession Опис

Flux<WebSocketMessage> receive()

Надає доступ до потоку вхідних повідомлень і завершує свою роботу при закритті з'єднання.

Mono<Void> send(Publisher<WebSocketMessage>)

WebSocketHandler повинен об'єднувати вхідний і вихідний потоки в єдиний потік і повернути Mono<Void> ;, який відображає завершення цього потоку. Залежно від вимог програми, єдиний потік завершується, коли:

Приймає джерело вихідних повідомлень, записує повідомлення та повертає Mono<Void>, який завершує свою роботу, коли джерело припиняє свою роботу, а запис закінчено.

  • Завершується або вхідний, або вихідний потік повідомлень.

  • Вхідний потік завершується (тобто з'єднання закривається), а вихідний потік нескінченний.

  • У певний момент через метод close для WebSocketSession.

Якщо вхідний та вихідний потоки повідомлень скомпоновані разом, немає необхідності перевіряти, чи відкрито з'єднання, оскільки Reactive Streams сигналізує про завершення активності. Вхідний потік отримує сигнал про завершення або помилку, а вихідний потік отримує сигнал про скасування. У наступному прикладі показано таку реалізацію:

Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()            
        .doOnNext(message -> {
            // ...                  (2)
        })
        .concatMap(message -> {
            // ...                  (3)
        })
        .then();                    
}
  1. Отримуємо доступ до потоку вхідних повідомлень.
  2. Здійснюємо якісь дії над кожним повідомленням.
  3. Виконуємо вкладені асинхронні операції, які використовують вміст повідомлення.
  4. Повертаємо Mono<Void>, який завершується при отриманні "completes".
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive()            
        .doOnNext {
            // ...                  (2)
        }
        .concatMap {
            // ...                  (3)
        }
        .then()                     
}
}
  1. Отримуємо доступ до потоку вхідних повідомлень.
  2. Здійснюємо якісь дії над кожним повідомленням.
  3. Виконуємо вкладені асинхронні операції, які використовують вміст повідомлення.
  4. Повертаємо Mono<Void>, який завершується при отриманні "completes".
Для вкладених асинхронних операцій може знадобитися виклик message.retain() на основних серверах, які використовують об'єднані буфери даних (наприклад, Netty). В іншому випадку буфер даних може бути звільнений до того, як будуть прочитані дані.

Наступна реалізація рішення щодо потоків, що надходять і вихідних:

Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()                     
        .doOnNext(message -> {
            // ...
        })
        .concatMap(message -> {
            // ...
        })
        .map(value -> session.textMessage("Echo " + value));    
return session.send(output);                         
}
}
  1. Обробляємо потік вхідних повідомлень.
  2. Створюємо вихідне повідомлення, створивши об'єднаний потік.
  3. Повертаємо Mono<Void>, який не буде завершено, поки ми продовжимо отримувати дані.
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()                                 
        .doOnNext(message -> {
            // ...
        })
        .concatMap(message -> {
            // ...
        })
        .then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage));   
return Mono.zip(input, output).then();                                
}
}
  1. Обробляємо потік вхідних повідомлень.
  2. Створюємо вихідне повідомлення, здійснивши об'єднаний потік.
  3. Повертаємо Mono<Void>, який не буде завершено, поки ми продовжимо отримувати дані.

Вхідний і вихідний потоки можуть бути незалежними і об'єднуватися тільки для завершення, як це показано в наступному прикладі:

Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()                                  
        .doOnNext(message -> {
            // ...
        })
        .concatMap(message -> {
            // ...
        })
        .then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage));     
return Mono.zip(input, output).then();                           
}
}
  1. Обробляємо потік вхідних повідомлень.
  2. Надсилаємо вихідні повідомлення.
  3. Об'єднуємо потоки та поверненням Mono<Void>, який завершується, якщо будь-який із потоків припиняється.
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()                                                                     
        .doOnNext {
            // ...
        }
        .concatMap {
            // ...
        }
        .then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage))          
return Mono.zip(input, output).then()                                                      
}
}
  1. Обробляємо потік вхідних повідомлень.
  2. Надсилаємо вихідні повідомлення.
  3. Об'єднуємо потоки та поверненням Mono<Void>, який завершується, якщо будь-який із потоків припиняється.

DataBuffer

DataBuffer — це подання для байтового буфера в WebFlux. Важливо розуміти, що на деяких серверах, таких як Netty, байтові буфери об'єднані в пул і підраховуються за посиланнями, і повинні бути звільнені при споживанні, щоб уникнути витоку пам'яті.DataBufferUtils.retain(dataBuffer), якщо потрібно утримувати буфери вхідних даних без їх звільнення, а потім використовувати DataBufferUtils.release(dataBuffer), коли дані з буферів будуть спожиті.

Підтвердження встановлення зв'язку

WebSocketHandlerAdapter делегує повноваження WebSocketService. За замовчуванням це екземпляр HandshakeWebSocketService, який виконує основну перевірку WebSocket-запиту, а потім використовує RequestUpgradeStrategy для сервера, що використовується. В даний час є вбудована підтримка Reactor Netty, Tomcat, Jetty і Undertow. Для вилучення атрибутів з WebSession та вставки їх у атрибути WebSocketSession.

Серверна конфігурація

RequestUpgradeStrategy для кожного сервера відкриває конфігурацію, специфічну для основного серверного механізму WebSocket. При використанні Java-конфігурації WebFlux можна налаштовувати такі властивості, або якщо конфігурація WebFlux не використовується, то можна використовувати ті властивості, що наведені нижче:

Java
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
    WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
    setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}

Дивися стратегію оновлення для твого сервера, щоб дізнатися, які варіанти доступні. Нині лише Tomcat і Jetty надають такі опції.

CORS

Найпростіший спосіб налаштувати CORS і обмежити доступ до кінцевої точки WebSocket — це змусити ваш WebSocketHandler реалізовувати CorsConfigurationSource та повертати CorsConfiguration з використанням допустимих джерел, заголовків та іншої інформації. Якщо це зробити неможливо, також можна встановити властивість corsConfigurations для SimpleUrlHandler, щоб встановити параметри CORS по URL-шаблону. Якщо вказані обидва, вони об'єднуються за допомогою combine для CorsConfiguration.

Клієнт

Spring WebFlux передбачає абстракцію WebSocketClient з реалізаціями для Reactor Netty, Tomcat, Jetty, Undertow та стандартного Java (тобто JSR-356).

Клієнт Tomcat фактично є розширенням стандартного Java-клієнта з деякою додатковою функціональністю в частині обробки WebSocketSession, що дозволяє використовувати API, специфічний для Tomcat, щоб призупиняти отримання повідомлень для забезпечення зворотної реакції.

Щоб розпочати WebSocket-сесію, можна створити екземпляр клієнта та використовувати його методи execute:

Java
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
        .doOnNext(System.out::println)
        .then());
Kotlin
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
    session.receive()
            .doOnNext(::println)
    .then()
}

Деякі клієнти, такі як Jetty, реалізують Lifecycle та їх потрібно зупиняти та запускати ще до того, як можна буде використовувати. Усі клієнти мають параметри конструктора, пов'язані зі зміною базового клієнта WebSocket.