zhang-minglei
5/11/2020 - 3:50 AM

rabbitmq 私信队列使用

// 死信 exchange
@Bean
public DirectExchange delayExchange() {
    return new DirectExchange("DELAY_EXCHANGE", true, false);
}

@Bean
public Queue deadLetterQueue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", "DELAY_EXCHANGE");
    arguments.put("x-dead-letter-routing-key", "REPEAT_TRADE_QUEUE");
    Queue queue = new Queue("DEAD_LETTER_QUEUE", true, false, false, arguments);
    return queue;
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(delayExchange()).with("DEAD_LETTER_QUEUE");
}

@Bean
public Queue repeatTradeQueue() {
    Queue queue = new Queue("REPEAT_TRADE_QUEUE", true, false, false);
    return queue;
}

// 使用方式
/**
 * 通过消息队列实现延时操作
 *
 * @param content 放到消息队列的数据
 * @param times   延时时间
 */
public void delayOperation(String content, String times) {
    if (StringUtils.isNotBlank(content)) {
        MessagePostProcessor processor = message -> {
            message.getMessageProperties().setExpiration(times);
            return message;
        };
        rabbitTemplate.convertAndSend("DELAY_EXCHANGE", "DEAD_LETTER_QUEUE", content, processor);
    }
}

@Component
@RabbitListener(queues = "REPEAT_TRADE_QUEUE")
public class ReceiveDelayDataOperation {

    @RabbitHandler
    public void process(@Payload String content) {
        log.info("接收转发队列的消息:" + content);
        consumerMessage(content);
    }
}

/**
 * 说明:
 * 生产者投递消息到exchange:DELAY_EXCHANGE的队列:DEAD_LETTER_QUEUE中,
 * 由于DEAD_LETTER_QUEUE绑定了延时队列REPEAT_TRADE_QUEUE,
 * 所以到达延时之后会被投递到REPEAT_TRADE_QUEUE中
 * 消费者监听队列:REPEAT_TRADE_QUEUE 即可
 */