RabbitMQ安装和使用延时插件

RabbitMQ默认没有携带和安装延时插件,所以需要我们自己手动安装、启用。

RabbitMQ插件下载页面: https://www.rabbitmq.com/community-plugins.html

在浏览器进入下载页面后,Ctrl + F 搜索:rabbitmq_delayed_message_exchange

如上图所示,找到自己RabbitMQ对应的版本下载,然后解压下载得到类似这样的文件rabbitmq_delayed_message_exchange-x-x.x.x.ez,把解压后的文件放入RabbitMQ安装目录下的plugins目录。

然后进入进入sbin目录启用插件:

# windows
./rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange

# linux
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后检查安装的插件,看是否安装成功(判断是否有rabbitmq_delayed_message相关的插件):

# windows
./rabbitmq-plugins.bat list

#linux
./rabbitmq-plugins list

在SpringBoot(SpringCloud)下使用延时插件:

package com.molicloud.moli.mq.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 * 消息发送服务
 *
 * @author feitao <yyimba@qq.com> 2019/9/26 14:42
 */
@Component
@Slf4j
public class RabbitMQService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 发送普通消息
     *
     * @param exchange 交换机
     * @param bindingKey 队列绑定的路由key
     * @param msgContent 发送的消息内容
     */
    public void sendMsg(String exchange, String bindingKey, Object msgContent) {
        log.debug("===============队列生产消息====================");
        log.debug("交换机:{}, 路由:{}, 发送时间:{}, 发送内容:{}", exchange, bindingKey, LocalDateTime.now(), msgContent);
        amqpTemplate.convertAndSend(exchange, bindingKey, msgContent);
    }

    /**
     * 发送延时消息
     *
     * @param exchange 交换机
     * @param bindingKey 队列绑定的路由key
     * @param msgContent 发送的消息内容
     * @param delayedTime 延时时间(单位:毫秒)
     */
    public void sendDelayedMsg(String exchange, String bindingKey, Object msgContent, Long delayedTime) {
        log.debug("===============延时队列生产消息====================");
        log.debug("交换机:{}, 路由:{}, 发送时间:{}, 发送内容:{}", exchange, bindingKey, LocalDateTime.now(), msgContent);
        amqpTemplate.convertAndSend(exchange, bindingKey, msgContent, message -> {
                message.getMessageProperties().setHeader("x-delay", delayedTime);
                return message;
            }
        );
        log.debug("{}ms后执行", delayedTime);
    }

}

定义交换机、队列和绑定关系(举例):

package com.molicloud.moli.mq.define;

/**
 * RabbitMQ 常量定义
 *
 * @author feitao <yyimba@qq.com> 2019/9/25 20:44
 */
public interface RabbitMQConstants {

    /**
     * 发送交换机
     */
    String TOPIC_TEST_EXCHANGE = "topic.test.exchange";

    /**
     * 发送队列
     */
    String TEST_QUEUE = "test.queue";

    /**
     * 发送队列绑定到topic交换机
     */
    String TEST_QUEUE_TOPIC = "test.queue.topic";

}
package com.molicloud.moli.mq.config;

import com.molicloud.moli.mq.define.RabbitMQConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 消息配置
 *
 * @author feitao <yyimba@qq.com> 2019/9/26 9:16
 */
@Configuration
public class RabbitMQConfig {

    @Bean
    public TopicExchange topicTestExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(RabbitMqConstants.TOPIC_TEST_EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue testQueue() {
        return new Queue(RabbitMQConstants.TEST_QUEUE, true);
    }

    @Bean
    public Binding testQueueTopic(Queue testQueue, TopicExchange topicTestExchange) {
        return BindingBuilder.bind(testQueue).to(topicTestExchange).with(RabbitMqConstants.TEST_QUEUE_TOPIC).noargs();
    }

}

具体发送的时候,直接注入RabbitMQService,然后调用sendDelayedMsg就行:

@Autowired
private RabbitMQService mqService;

public void send() {
    // 发送延时消息(延时3秒发送)
    mqService.sendDelayedMsg(RabbitMQConstants.TOPIC_TEST_EXCHANGE, RabbitMQConstants.TEST_QUEUE_TOPIC, mail, 3000L);
}

未经允许不得转载:小茉莉 » RabbitMQ安装和使用延时插件

赞 (3) 打赏

评论

4+5=

觉得文章有用就打赏一下小茉莉

支付宝扫一扫打赏

微信扫一扫打赏