// 死信 exchange
@Bean
public DirectExchange delayExchange() {
return new DirectExchange("DELAY_EXCHANGE", true, false);
}
@Bean
public Queue deadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "DELAY_EXCHANGE");
arguments.put("x-dead-letter-routing-key", "REPEAT_TRADE_QUEUE");
Queue queue = new Queue("DEAD_LETTER_QUEUE", true, false, false, arguments);
return queue;
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(delayExchange()).with("DEAD_LETTER_QUEUE");
}
@Bean
public Queue repeatTradeQueue() {
Queue queue = new Queue("REPEAT_TRADE_QUEUE", true, false, false);
return queue;
}
// 使用方式
/**
* 通过消息队列实现延时操作
*
* @param content 放到消息队列的数据
* @param times 延时时间
*/
public void delayOperation(String content, String times) {
if (StringUtils.isNotBlank(content)) {
MessagePostProcessor processor = message -> {
message.getMessageProperties().setExpiration(times);
return message;
};
rabbitTemplate.convertAndSend("DELAY_EXCHANGE", "DEAD_LETTER_QUEUE", content, processor);
}
}
@Component
@RabbitListener(queues = "REPEAT_TRADE_QUEUE")
public class ReceiveDelayDataOperation {
@RabbitHandler
public void process(@Payload String content) {
log.info("接收转发队列的消息:" + content);
consumerMessage(content);
}
}
/**
* 说明:
* 生产者投递消息到exchange:DELAY_EXCHANGE的队列:DEAD_LETTER_QUEUE中,
* 由于DEAD_LETTER_QUEUE绑定了延时队列REPEAT_TRADE_QUEUE,
* 所以到达延时之后会被投递到REPEAT_TRADE_QUEUE中
* 消费者监听队列:REPEAT_TRADE_QUEUE 即可
*/