Синхронный прием

Хотя JMS обычно ассоциируется с асинхронной обработкой, потреблять сообщения можно и синхронно. Перегруженные методы receive(..) обеспечивают эту функциональность. При синхронном получении вызывающий поток блокируется до тех пор, пока сообщение не станет доступным. Эта операция может быть опасной, поскольку вызывающий поток потенциально может заблокироваться на неопределенное время. Свойство receiveTimeout определяет, как долго получателю требуется ожидать, прежде чем прекратить ожидание сообщения.

Асинхронный прием: POJO, управляемые сообщениями

Spring также поддерживает конечные точки с слушателями, аннотированными с помощью аннотации @JmsListener, и предусматривает открытую инфраструктуру для программной регистрации конечных точек. Это, безусловно, самый удобный способ настройки асинхронного получателя.

По аналогии с управляемыми сообщениями бинами (MDB) в контексте EJB, управляемый сообщениями POJO-объект (MDP) действует как получатель JMS-сообщений. Единственным ограничением для MDP-объекта является то, что ему необходимо реализовывать интерфейс javax.jms.MessageListener. Обратите внимание, что если POJO-объект получает сообщения в нескольких потоках, важно удостовериться, что реализация является потокобезопасной.

В следующем примере показана простая реализация MDP-объекта:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ExampleListener implements MessageListener {
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

После того, как был реализован MessageListener, наступает время создать контейнер слушателя сообщений.

В следующем примере показано, как определить и сконфигурировать один из контейнеров слушателей сообщений, поставляемых с Spring (в данном случае DefaultMessageListenerContainer):

<!-- это управляемый сообщениями POJO-объект (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- а это контейнер слушателя сообщений -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

Полное описание функций, поддерживаемых каждой реализацией, смотрите в javadoc Spring по различным контейнерам слушателей сообщений (все они реализуют MessageListenerContainer).

Использование интерфейса SessionAwareMessageListener

Интерфейс SessionAwareMessageListener – это ориентированный на Spring интерфейс, который предусматривает контракт, аналогичный интерфейсу MessageListener из JMS, но также предоставляет методу обработки сообщения доступ к Session по стандарту JMS, из которой было получено Message. В следующем листинге показано определение интерфейса SessionAwareMessageListener:

package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
    void onMessage(Message message, Session session) throws JMSException;
}

Вы можете выбрать для своих MDP-объектов реализацию этого интерфейса (отдавая предпочтение перед стандартным интерфейсом MessageListener из JMS), если хотите, чтобы MDP-объекты имели возможность отвечать на любые полученные сообщения (используя Session, предоставленную в методе onMessage(Message, Session)). Все реализации контейнеров слушателей сообщений, поставляемые с Spring, поддерживают MDP-объекты, которые реализуют интерфейс MessageListener или SessionAwareMessageListener. Нюанс классов, реализующих SessionAwareMessageListener, состоит в том, что они привязаны к Spring через интерфейс. Использовать его или нет – решать вам, как разработчику или архитектору приложений.

Обратите внимание, что метод onMessage(..) интерфейса SessionAwareMessageListener генерирует JMSException. В отличие от стандартного интерфейса MessageListener из JMS, при использовании интерфейса SessionAwareMessageListener за обработку любых сгенерированных исключений отвечает клиентский код.

Использование MessageListenerAdapter

Класс MessageListenerAdapter является последним компонентом средств поддержки асинхронного обмена сообщениями в Spring. Если в двух словах, то он позволяет представить практически любой класс в виде MDP-объекта (хотя существуют некоторые ограничения).

Consider the following interface definition:

public interface MessageDelegate {
    void handleMessage(String message);
    void handleMessage(Map message);
    void handleMessage(byte[] message);
    void handleMessage(Serializable message);
}

Обратите внимание, что, хотя этот интерфейс не расширяет ни MessageListener, ни интерфейс SessionAwareMessageListener, все равно можно использовать его в качестве MDP-объекта при помощи класса MessageListenerAdapter. Учтите также то, что различные методы обработки сообщений сильно типизированы в соответствии с содержимым различных типов Message, которые они могут получать и обрабатывать.

Теперь рассмотрим следующую реализацию интерфейса MessageDelegate:

public class DefaultMessageDelegate implements MessageDelegate {
    // реализация опущена для упрощения
}

В частности, обратите внимание, что предыдущая реализация интерфейса MessageDelegate (класс DefaultMessageDelegate) вообще не имеет зависимостей от JMS. Это действительно POJO-объект, который можно превратить в MDP-объект при помощи следующей конфигурации:

<!-- это управляемый сообщениями POJO-объект (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>
<!-- а это контейнер слушателя сообщений... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

В следующем примере показан другой MDP-объект, который может обрабатывать только получение JMS-сообщений TextMessage. Обратите внимание, что метод обработки сообщений на самом деле называется receive (имя метода обработки сообщений в MessageListenerAdapter по умолчанию handleMessage), но этот метод настраиваемый (как вы увидите далее в этом разделе). Обратите внимание также на то, что метод receive(..) строго типизирован для получения и ответа только на JMS-сообщения TextMessage. В следующем листинге показано определение интерфейса TextMessageDelegate:

public interface TextMessageDelegate {
    void receive(TextMessage message);
}

В следующем листинге показан класс, реализующий интерфейс TextMessageDelegate:

public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // реализация опущена для упрощения
}

Тогда конфигурация MessageListenerAdapter будет выглядеть следующим образом:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- нам не требуется автоматическое извлечение контекста сообщения -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

Обратите внимание, что если messageListener получает JMS-тип Message, отличающийся от TextMessage, будет сгенерировано (и впоследствии "проглочено") исключение IllegalStateException. Еще одной особенностью класса MessageListenerAdapter является способность автоматически отправлять ответное Message, если метод обработчика возвращает не пустое значение. Рассмотрим следующий интерфейс и класс:

public interface ResponsiveTextMessageDelegate {
    // обратите внимание на возвращаемый тип тип...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // реализация опущена для упрощения
}

Если вы используете делегат DefaultResponsiveTextMessageDelegate в сочетании с MessageListenerAdapter, любое не-null значение, возвращаемое при выполнении метода 'receive(..)', преобразуется (в конфигурации по умолчанию) в TextMessage. Полученное TextMessage отправляется в Destination (если существует), определенный в JMS-свойстве Reply-To исходного или в Destination по умолчанию, установленный в MessageListenerAdapter (если был сконфигурирован). Если Destination не обнаружен, генерируется InvalidDestinationException (обратите внимание, что это исключение не "проглатывается" и распространяется вверх по стеку вызовов).

Обработка сообщений внутри транзакций

Для вызова слушателя сообщений в рамках транзакции требуется лишь реконфигурировать контейнер слушателя.

Вы можете активировать локальные транзакции ресурсов с помощью флага sessionTransacted в определении контейнера слушателя. Каждый вызов слушателя сообщений работает в рамках активной транзакции JMS, при этом прием сообщений откатывается в случае сбоя при выполнении слушателя. Отправка сообщения-ответа (через SessionAwareMessageListener) является частью той же локальной транзакции, но любые другие операции с ресурсами (например, доступ к базе данных) работают независимо. Обычно это требует дублирования обнаружения сообщений в реализации слушателя, чтобы охватить случай, когда обработка базы данных зафиксирована, а обработка сообщений – нет.

Рассмотрим следующее определение бина:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="sessionTransacted" value="true"/>
</bean>

Для задействования во внешне управляемой транзакции необходимо сконфигурировать диспетчер транзакций и использовать контейнер слушателя, который поддерживает внешне управляемые транзакции (обычно это DefaultMessageListenerContainer).

Чтобы сконфигурировать контейнер слушателя сообщений для задействования в XA-транзакциях, необходимо сконфигурировать JtaTransactionManager (который по умолчанию делегируется подсистеме транзакций сервера Java EE). Обратите внимание, что базовая JMS-фабрика ConnectionFactory должна быть XA-совместимой и должным образом зарегистрирована в координаторе транзакций JTA. (Проверьте конфигурацию ресурсов JNDI вашего сервера Java EE). Это позволит принимать сообщения и (например) обращаться к базе данных в рамках одной транзакции (с единой семантикой фиксации, за счет затрат вычислительных ресурсов на журнал XA-транзакций).

Следующее определение бина создает диспетчер транзакций:

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

Затем необходимо добавить его в нашу предыдущую конфигурацию контейнера. Контейнер займется всем остальным. В следующем примере показано, как это сделать:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>
  1. Наш диспетчер транзакций.