Самый простой способ сообщение асинхронно – использовать инфраструктуру конечной точки аннотированного слушателя. Если коротко, она позволяет открывать метод управляемого бина в качестве конечной точки слушателя JMS. В следующем примере показано, как его использовать:

@Component
public class MyService {
    @JmsListener(destination = "myDestination")
    public void processOrder(String data) { ... }
}

Идея предыдущего примера заключается в том, что всякий раз, когда сообщение доступно через javax.jms.Destination myDestination, метод processOrder вызывается соответствующим образом (в данном случае с содержимым JMS-сообщения, аналогично тому, которое передает MessageListenerAdapter).

Инфраструктура аннотированных конечных точек создает контейнер слушателя сообщений "за кулисами" для каждого аннотированного метода, используя JmsListenerContainerFactory. Такой контейнер не регистрируется в контексте приложения, но его можно легко найти с целью управления при помощи бина JmsListenerEndpointRegistry.

@JmsListener является повторяющейся аннотацией в Java 8, поэтому можно связать несколько адресов назначения JMS с одним и тем же методом, добавив к нему дополнительные объявления с аннотацией @JmsListener.

Активация аннотаций конечных точек слушателей

Чтобы активировать поддержку аннотаций @JmsListener, можно добавить аннотацию @EnableJms в один из ваших классов, аннотированных @Configuration, как показано в следующем примере:

@Configuration
@EnableJms
public class AppConfig {
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setDestinationResolver(destinationResolver());
        factory.setSessionTransacted(true);
        factory.setConcurrency("3-10");
        return factory;
    }
}

По умолчанию инфраструктура ищет бин с именем jmsListenerContainerFactory в качестве источника для фабрики, которую нужно использовать для создания контейнеров слушателей сообщений. В этом случае (и не учитывая настройку инфраструктуры JMS), можно вызвать метод processOrder с размером основного пула в три потока и максимальным размером пула в десять потоков.

Можно настроить используемую фабрику контейнеров слушателей для каждой аннотации или настроить явное значение по умолчанию, реализовав интерфейс JmsListenerConfigurer. Значение по умолчанию требуется только в том случае, если хотя бы одна конечная точка зарегистрирована без определенной фабрики контейнеров. Подробности и примеры смотрите в javadoc по классам, реализующим JmsListenerConfigurer.

Если XML-конфигурация более предпочтительна, можно использовать элемент <jms:annotation-driven>, как показано в следующем примере:

<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory"
        class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="sessionTransacted" value="true"/>
    <property name="concurrency" value="3-10"/>
</bean>

Программная регистрация конечных точек

JmsListenerEndpoint передает модель конечной точки JMS и отвечает за конфигурирование контейнера для этой модели. Инфраструктура позволяет программно конфигурировать конечные точки в дополнение к тем, которые обнаруживаются аннотацией JmsListener. В следующем примере показано, как это сделать:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("myJmsEndpoint");
        endpoint.setDestination("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

В предыдущем примере мы использовали SimpleJmsListenerEndpoint, который передает фактический MessageListener для вызова. Однако можно создать свой собственный вариант конечной точки для описания кастомного механизма вызова.

Обратите внимание, что можно вообще не использовать аннотацию @JmsListener и программно регистрировать исключительно собственные конечные точки через JmsListenerConfigurer.

Аннотированная сигнатура метода конечной точки

До сих пор мы внедряли в конечную точку простую String, но на самом деле она может иметь очень гибкую сигнатуру метода. В следующем примере мы переписываем её для внедрения Order с кастомным заголовком:

@Component
public class MyService {
    @JmsListener(destination = "myDestination")
    public void processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

Основными элементами, которые можно вводить в конечные точки слушателей JMS, являются:

  • Сырой javax.jms.Message или любой из его подклассов (при условии, что он соответствует типу входящего сообщения).

  • javax.jms.Session для обеспечения дополнительного доступа к нативному JMS API (например, для отправки кастомного ответа).

  • org.springframework.messaging.Message, представляющее входящее JMS-сообщение. Обратите внимание, что это сообщение содержит как кастомные, так и стандартные заголовки (как определено в JmsHeaders).

  • Помеченные аннотацией @Header аргументы метода для извлечения конкретного значения заголовка, включая стандартные JMS-заголовки.

  • Аргумент, помеченный аннотацией @Headers, который также должен быть присвоен java.util.Map для получения доступа ко всем заголовкам.

  • Неаннотированный элемент, который не является одним из поддерживаемых типов (Message или Session), считается полезными данными. Можно явно это обозначить, аннотировав параметр с помощью @Payload. Также можно включить валидацию, добавив дополнительную аннотацию @Valid.

Возможность внедрить абстракцию Message из Spring особенно полезна, когда нужно воспользоваться всей информацией, хранящейся в специфическом для механизма передачи сообщении, не полагаясь на специфический для механизма передачи API-интерфейс. В следующем примере показано, как это сделать:

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

Обработка аргументов метода обеспечивается DefaultMessageHandlerMethodFactory, которую можно настроить, чтобы обеспечить поддержку дополнительных аргументов метода. Там же можно настроить поддержку преобразования и валидации.

Например, если необходимо убедиться, что наш Order действителен до его обработки, можно пометить полезные данные аннотацией @Valid и настроить необходимый валидатор, как показано в следующем примере:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
    }
    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}

Управление ответами

Существующие средства поддержки в MessageListenerAdapter уже позволяет методу иметь непустой возвращаемый тип. Когда это происходит, результат вызова инкапсулируется в javax.jms.Message, отправляется либо по адресу назначения, указанному в заголовке JMSReplyTo исходного сообщения, либо по адресу назначения по умолчанию, сконфигурированного для слушателя. Теперь можно установить этот адрес назначения по умолчанию с помощью аннотации @SendTo абстракции обмена сообщениями.

Предполагая, что наш метод processOrder теперь должен возвращать OrderStatus, мы можем написать его для автоматической отправки ответа, как показано в следующем примере:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // обработка заказа
    return status;
}
Если существует несколько методов с аннотацией @JmsListener, также можно разместить аннотацию @SendTo на уровне класса для совместного использования адреса назначения ответа по умолчанию.

Если требуется установить дополнительные заголовки независимым от механизма передачи способом, можно вернуть вместо них Message, используя метод, подобный следующему:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // обработка заказа
    return MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
}

Если требуется вычислить адрес назначения ответа во время выполнения программы, можно инкапсулировать свой ответ в экземпляр JmsResponse, который также передает адрес назначения для использования во время выполнения. Мы можем переписать предыдущий пример следующим образом:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
    // обработка заказа
    Message<OrderStatus> response = MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
    return JmsResponse.forQueue(response, "status");
}

Наконец, если требуется задать некоторые значения QoS для ответа, такие как приоритет или время жизни, можно настроить JmsListenerContainerFactory соответствующим образом, как показано в следующем примере:

@Configuration
@EnableJms
public class AppConfig {
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        QosSettings replyQosSettings = new QosSettings();
        replyQosSettings.setPriority(2);
        replyQosSettings.setTimeToLive(10000);
        factory.setReplyQosSettings(replyQosSettings);
        return factory;
    }
}