如何心跳感知与ActiveMQ服务器的连接状态
发布网友
发布时间:2022-04-19 14:12
我来回答
共1个回答
热心网友
时间:2023-07-11 18:07
当我们需要部署一套安全稳定的ActiveMQ时,可以使用集群方式,这需要部署至少两套的ActiveMQ服务,但是如果ActiveMQ整个集群也宕机,这时消息都无法发送,这可怎么办,还好ActiveMQ提供了消息传输监听(transportListener),可以对ActiveMQConnectionFactory添加一个Activemq的消息传输监听,该监听实现 Activemq的TransportListener接口。该接口实现的监听方法有onCommand(),onException(), transportResumed () ,transportInterupted()等监听方法。拥有这些方法就足以实时感知ActiveMQ服务器的状态了,当发现服务器无法连接时,就采取相应措施,如把消息存储在本地,当服务器恢复时再进行发送。
package com.jms.failover;
import java.io.IOException;
import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ActiveMQTransportListener implements TransportListener{
protected final Logger logger = LoggerFactory.getLogger(ActiveMQTransportListener.class);
/**
* 对消息传输命令进行监控
* @param command
*/
@Override
public void onCommand(Object o) {
}
/**
* 对监控到的异常进行触发
* @param error
*/
@Override
public void onException(IOException error) {
logger.error("onException -> 消息服务器连接错误......");
}
/**
* 当failover时触发
*/
@Override
public void transportInterupted() {
logger.error("transportInterupted -> 消息服务器连接发生中断......");
//这里就可以状态进行标识了
}
/**
* 监控到failover恢复后进行触发
*/
@Override
public void transportResumed() {
logger.info("transportResumed -> 消息服务器连接已恢复......");
//这里就可以进行状态标识了
}
}
<!-- ActiveMQ 连接工厂,用于连接远程broker -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" destroy-method="stop">
<property name="brokerURL" value="${activemq.broker.failover.uri}" />
<property name="userName" value="${activemq.username}" />
<property name="password" value="${activemq.password}" />
<property name="useAsyncSend" value="true" />
<property name="alwaysSessionAsync" value="true" />
<property name="redeliveryPolicy" ref="redeliveryPolicy" />
<property name="prefetchPolicy" ref="prefetchPolicy" />
<!-- 消息传输* -->
<property name="transportListener">
<bean class="com.jms.failover.ActiveMQTransportListener" />
</property>
<property name="dispatchAsync" value="true" />
</bean>