strugglm
10/9/2018 - 1:41 PM

spring 使用activemq

含以下内容: 1.JMS编程模型 2.消息监听器MessageListener 3.@JmsListener注解

public class TopicMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("监听器:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
@Component
public class QueueMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        String queueName = null;
        String queueMessage = null;
        try {
            //获取监听到的消息队列
            ActiveMQDestination queues = (ActiveMQDestination) message.getJMSDestination();
            //队列名
            queueName = queues.getPhysicalName();
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}
@Service
@EnableJms
public class TopicServiceImpl implements TopicService {
    @Value("${activemq.broker}")
    private String broker;
    @Value("${activemq.user}")
    private String user;
    @Value("${activemq.password}")
    private String password;
    @Value("${activemq.destination_topic}")
    private String topic;

    @Resource(name = "jmsTemplate_topic")
    private JmsTemplate jmsTemplate;

    /**
     * JmsTemplate 发布订阅
     */
    @Override
    public void jmsPublish(String topic, final String message) {
        Topic topic1 = new ActiveMQTopic(topic);
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend(topic1, message);
    }

    /**
     * 在xml配置好JMSLIstenerContainerFactory,在@JMSListener注解中传入相应的监听器
     */
    @Override
    @JmsListener(destination = "firstTopic", containerFactory = "jmsListenerContainerFactory")
    public void jmsSubscribe(String message) {
        System.out.println("JmsListener:" + message);
    }

    /**
     * JMS编程模型
     * @param topic
     * @param message
     */
    @Override
    public void publish(String topic, String message) {
        //注入connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
        // JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        Session session = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            // Session: 一个发送或接收消息的线程,
            // Boolean.false:不开启事务,Session.Auto_acknowledge:自动确认
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // Destination :消息的目的地;消息发送给谁.
            // 获取session注意参数值my-queue是Query的名字
            Topic destination = session.createTopic(topic);
            // MessageProducer:消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 设置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //发送消息
            TextMessage textMessage = session.createTextMessage(message);
            producer.send(destination, textMessage);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅者
     */
    @Override
    public void subscribe() throws JMSException {
        // 创建连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
        // JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        //消费者
        MessageConsumer consumer = null;
        Session session = null;
        try {
            //获得连接
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //获取session,一个发送或接收消息的线程
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //设置额消息的目的地;消息发送给谁.
            Topic destination = session.createTopic(topic);
            // 创建消费者
            consumer = session.createConsumer(destination);
            //同步消费
            while (true) {
                // 设置接收信息的时间,单位为毫秒
                TextMessage message = (TextMessage) consumer.receive();
                if (message != null) {
                    System.out.println(message.getText());
                } else {
                    // 超时,结束循环
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //资源回收
            session.close();
            connection.close();
            consumer.close();
        }
    }
}
@Service
public class QueueServiceImpl implements QueueService {
    @Value("${activemq.broker}")
    private String broker;
    @Value("${activemq.user}")
    private String user;
    @Value("${activemq.password}")
    private String password;
    @Value("${activemq.destination1}")
    private String destination1;
    @Value("${activemq.springBoot.destination}")
    private String destination3;

    @Resource(name = "jmsTemplate_queue")
    private JmsTemplate jmsTemplate;

    @Override
    public String test() {
        System.out.println(broker + destination1 + user + password);
        return broker + destination1 + user + password;
    }

    /**
     * spring + jmsTemplate
     */
    @Override
    public void jmsProducer(Destination destination, final String message) {
        if (null == destination) {
            destination = jmsTemplate.getDefaultDestination();
        }
        jmsTemplate.setPubSubDomain(false);
        jmsTemplate.convertAndSend(destination, message);
    }

    /**
     * 两种消费者监听模式:
     * 1.实现MessageListener(见 filter/QueueMessageListener.java)
     * 2.JmsListener注解
     */
    @JmsListener(destination = "JmsListener")
    public void jmsListener(String msg) {
        System.out.println(msg);
    }

    /**
     * 阻塞方式消费消息
     */
    @Override
    public void jmsConsumer(String destination) {
        while (true) {
            TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
            if (textMessage == null) {
                break;
            }
            try {
                System.out.println("从队列" + destination.toString() + "收到了消息:\t"
                        + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * JMS编程模型
     */
    @Override
    public void mqProducer() {
        //注入connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
        // JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        Session session = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            // Session: 一个发送或接收消息的线程,
            // Boolean.false:不开启事务,Session.Auto_acknowledge:自动确认
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // Destination :消息的目的地;消息发送给谁.
            // 获取session注意参数值my-queue是Query的名字
            Destination destination = session.createQueue(destination1);
            // MessageProducer:消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 设置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //发送消息
            sendMessage(session, producer, "JMS实现mq");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送消息
     */
    @Override
    public void sendMessage(Session session, MessageProducer producer, String str) {
        // 创建一条文本消息
        TextMessage message = null;
        try {
            message = session.createTextMessage(str);
            // 通过消息生产者发出消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * 消费者同步获取消息
     */
    @Override
    public void syncConsumer() throws JMSException {
        // 创建连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
        // JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        //消费者
        MessageConsumer consumer = null;
        Session session = null;
        try {
            //获得连接
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //获取session,一个发送或接收消息的线程
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //设置额消息的目的地;消息发送给谁.
            Queue queue = session.createQueue(destination3);
            // 创建消费者
            consumer = session.createConsumer(queue);
            //同步消费
            while (true) {
                // 设置接收信息的时间,单位为毫秒
                TextMessage message = (TextMessage) consumer.receive();
                if (message != null) {
                    System.out.println(message.getText());
                } else {
                    // 超时,结束循环
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //资源回收
            session.close();
            connection.close();
            consumer.close();
        }
    }

    /**
     * 消费者异步获取消息
     */
    @Override
    public void asyncConsumer() throws JMSException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, broker);
        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;
        try {
            // 获得连接
            connection = connectionFactory.createConnection();
            // 开启连接
            connection.start();
            // 获得Session(不开启事务,自动确认提交)
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 设置目的地
            Queue queue = session.createQueue(destination3);
            // 创建Consumer
            consumer = session.createConsumer(queue);
            // 异步消费
            session.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text);
                            message.acknowledge();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 回收资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
}