RabbitMQ默认没有携带和安装延时插件,所以需要我们自己手动安装、启用。
RabbitMQ插件下载页面: https://www.rabbitmq.com/community-plugins.html
在浏览器进入下载页面后,Ctrl + F 搜索:rabbitmq_delayed_message_exchange

如上图所示,找到自己RabbitMQ对应的版本下载,然后解压下载得到类似这样的文件rabbitmq_delayed_message_exchange-x-x.x.x.ez
,把解压后的文件放入RabbitMQ安装目录下的plugins
目录。
然后进入进入sbin目录启用插件:
# windows
./rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
# linux
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后检查安装的插件,看是否安装成功(判断是否有rabbitmq_delayed_message相关的插件):
# windows
./rabbitmq-plugins.bat list
#linux
./rabbitmq-plugins list
在SpringBoot(SpringCloud)下使用延时插件:
package com.molicloud.moli.mq.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 消息发送服务
*
* @author feitao <yyimba@qq.com> 2019/9/26 14:42
*/
@Component
@Slf4j
public class RabbitMQService {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 发送普通消息
*
* @param exchange 交换机
* @param bindingKey 队列绑定的路由key
* @param msgContent 发送的消息内容
*/
public void sendMsg(String exchange, String bindingKey, Object msgContent) {
log.debug("===============队列生产消息====================");
log.debug("交换机:{}, 路由:{}, 发送时间:{}, 发送内容:{}", exchange, bindingKey, LocalDateTime.now(), msgContent);
amqpTemplate.convertAndSend(exchange, bindingKey, msgContent);
}
/**
* 发送延时消息
*
* @param exchange 交换机
* @param bindingKey 队列绑定的路由key
* @param msgContent 发送的消息内容
* @param delayedTime 延时时间(单位:毫秒)
*/
public void sendDelayedMsg(String exchange, String bindingKey, Object msgContent, Long delayedTime) {
log.debug("===============延时队列生产消息====================");
log.debug("交换机:{}, 路由:{}, 发送时间:{}, 发送内容:{}", exchange, bindingKey, LocalDateTime.now(), msgContent);
amqpTemplate.convertAndSend(exchange, bindingKey, msgContent, message -> {
message.getMessageProperties().setHeader("x-delay", delayedTime);
return message;
}
);
log.debug("{}ms后执行", delayedTime);
}
}
定义交换机、队列和绑定关系(举例):
package com.molicloud.moli.mq.define;
/**
* RabbitMQ 常量定义
*
* @author feitao <yyimba@qq.com> 2019/9/25 20:44
*/
public interface RabbitMQConstants {
/**
* 发送交换机
*/
String TOPIC_TEST_EXCHANGE = "topic.test.exchange";
/**
* 发送队列
*/
String TEST_QUEUE = "test.queue";
/**
* 发送队列绑定到topic交换机
*/
String TEST_QUEUE_TOPIC = "test.queue.topic";
}
package com.molicloud.moli.mq.config;
import com.molicloud.moli.mq.define.RabbitMQConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 消息配置
*
* @author feitao <yyimba@qq.com> 2019/9/26 9:16
*/
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange topicTestExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitMqConstants.TOPIC_TEST_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue testQueue() {
return new Queue(RabbitMQConstants.TEST_QUEUE, true);
}
@Bean
public Binding testQueueTopic(Queue testQueue, TopicExchange topicTestExchange) {
return BindingBuilder.bind(testQueue).to(topicTestExchange).with(RabbitMqConstants.TEST_QUEUE_TOPIC).noargs();
}
}
具体发送的时候,直接注入RabbitMQService
,然后调用sendDelayedMsg
就行:
@Autowired
private RabbitMQService mqService;
public void send() {
// 发送延时消息(延时3秒发送)
mqService.sendDelayedMsg(RabbitMQConstants.TOPIC_TEST_EXCHANGE, RabbitMQConstants.TEST_QUEUE_TOPIC, mail, 3000L);
}
未经允许不得转载:小茉莉 » RabbitMQ安装和使用延时插件
最新评论
最多嵌套多少层?
秀,被你找到了,还没有公布
我靠666
秀得很
老哥测试一下