含以下内容: 1.JMS编程模型 2.消息监听器MessageListener 3.@JmsListener注解
public class TopicMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听器:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Component
public class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
String queueName = null;
String queueMessage = null;
try {
//获取监听到的消息队列
ActiveMQDestination queues = (ActiveMQDestination) message.getJMSDestination();
//队列名
queueName = queues.getPhysicalName();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Service
@EnableJms
public class TopicServiceImpl implements TopicService {
@Value("${activemq.broker}")
private String broker;
@Value("${activemq.user}")
private String user;
@Value("${activemq.password}")
private String password;
@Value("${activemq.destination_topic}")
private String topic;
@Resource(name = "jmsTemplate_topic")
private JmsTemplate jmsTemplate;
/**
* JmsTemplate 发布订阅
*/
@Override
public void jmsPublish(String topic, final String message) {
Topic topic1 = new ActiveMQTopic(topic);
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend(topic1, message);
}
/**
* 在xml配置好JMSLIstenerContainerFactory,在@JMSListener注解中传入相应的监听器
*/
@Override
@JmsListener(destination = "firstTopic", containerFactory = "jmsListenerContainerFactory")
public void jmsSubscribe(String message) {
System.out.println("JmsListener:" + message);
}
/**
* JMS编程模型
* @param topic
* @param message
*/
@Override
public void publish(String topic, String message) {
//注入connectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
// JMS 客户端到JMS Provider 的连接
Connection connection = null;
Session session = null;
try {
connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程,
// Boolean.false:不开启事务,Session.Auto_acknowledge:自动确认
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Topic destination = session.createTopic(topic);
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//发送消息
TextMessage textMessage = session.createTextMessage(message);
producer.send(destination, textMessage);
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 订阅者
*/
@Override
public void subscribe() throws JMSException {
// 创建连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
// JMS 客户端到JMS Provider 的连接
Connection connection = null;
//消费者
MessageConsumer consumer = null;
Session session = null;
try {
//获得连接
connection = connectionFactory.createConnection();
//开启连接
connection.start();
//获取session,一个发送或接收消息的线程
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//设置额消息的目的地;消息发送给谁.
Topic destination = session.createTopic(topic);
// 创建消费者
consumer = session.createConsumer(destination);
//同步消费
while (true) {
// 设置接收信息的时间,单位为毫秒
TextMessage message = (TextMessage) consumer.receive();
if (message != null) {
System.out.println(message.getText());
} else {
// 超时,结束循环
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
//资源回收
session.close();
connection.close();
consumer.close();
}
}
}
@Service
public class QueueServiceImpl implements QueueService {
@Value("${activemq.broker}")
private String broker;
@Value("${activemq.user}")
private String user;
@Value("${activemq.password}")
private String password;
@Value("${activemq.destination1}")
private String destination1;
@Value("${activemq.springBoot.destination}")
private String destination3;
@Resource(name = "jmsTemplate_queue")
private JmsTemplate jmsTemplate;
@Override
public String test() {
System.out.println(broker + destination1 + user + password);
return broker + destination1 + user + password;
}
/**
* spring + jmsTemplate
*/
@Override
public void jmsProducer(Destination destination, final String message) {
if (null == destination) {
destination = jmsTemplate.getDefaultDestination();
}
jmsTemplate.setPubSubDomain(false);
jmsTemplate.convertAndSend(destination, message);
}
/**
* 两种消费者监听模式:
* 1.实现MessageListener(见 filter/QueueMessageListener.java)
* 2.JmsListener注解
*/
@JmsListener(destination = "JmsListener")
public void jmsListener(String msg) {
System.out.println(msg);
}
/**
* 阻塞方式消费消息
*/
@Override
public void jmsConsumer(String destination) {
while (true) {
TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
if (textMessage == null) {
break;
}
try {
System.out.println("从队列" + destination.toString() + "收到了消息:\t"
+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/**
* JMS编程模型
*/
@Override
public void mqProducer() {
//注入connectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
// JMS 客户端到JMS Provider 的连接
Connection connection = null;
Session session = null;
try {
connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程,
// Boolean.false:不开启事务,Session.Auto_acknowledge:自动确认
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Destination destination = session.createQueue(destination1);
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//发送消息
sendMessage(session, producer, "JMS实现mq");
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 发送消息
*/
@Override
public void sendMessage(Session session, MessageProducer producer, String str) {
// 创建一条文本消息
TextMessage message = null;
try {
message = session.createTextMessage(str);
// 通过消息生产者发出消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 消费者同步获取消息
*/
@Override
public void syncConsumer() throws JMSException {
// 创建连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
// JMS 客户端到JMS Provider 的连接
Connection connection = null;
//消费者
MessageConsumer consumer = null;
Session session = null;
try {
//获得连接
connection = connectionFactory.createConnection();
//开启连接
connection.start();
//获取session,一个发送或接收消息的线程
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//设置额消息的目的地;消息发送给谁.
Queue queue = session.createQueue(destination3);
// 创建消费者
consumer = session.createConsumer(queue);
//同步消费
while (true) {
// 设置接收信息的时间,单位为毫秒
TextMessage message = (TextMessage) consumer.receive();
if (message != null) {
System.out.println(message.getText());
} else {
// 超时,结束循环
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
//资源回收
session.close();
connection.close();
consumer.close();
}
}
/**
* 消费者异步获取消息
*/
@Override
public void asyncConsumer() throws JMSException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
// 获得连接
connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 获得Session(不开启事务,自动确认提交)
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 设置目的地
Queue queue = session.createQueue(destination3);
// 创建Consumer
consumer = session.createConsumer(queue);
// 异步消费
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收资源
consumer.close();
session.close();
connection.close();
}
}
}