RabbitMQ消息队列

简介

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

相关概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

img

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。
  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
    这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

  • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置header attribute参数类型的交换机
  • Fanout:转发消息到所有绑定队列

Direct Exchange
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

img

第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。

具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");复制代码

topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

  • *表示一个词.
  • #表示零个或多个词.

Headers Exchange

headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型.
在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。

springboot集成RabbitMQ

springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。

简单使用

配置pom.xml

主要是添加spring-boot-starter-amqp的支持

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

配置rabbitmq的安装地址、端口以及账户信息

rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 虚拟空间地址
    virtual-host: /
    # 配置发布消息确认回调
    publisher-confirms: true
    publisher-returns: true

队列枚举类 QueueEnum

package com.example.rabbitMQ;

import lombok.Getter;

/**
 * @author yanzt
 * @date 2018/8/8 13:46
 * @describe
 */
@Getter
public enum QueueEnum {

    HELLO1("exchange","topic.hello1","hello1"),
    HELLO2("exchange","topic.hello2","hello2"),
    REGISTERY1("exchange","topic.registery1","registery1"),
    REGISTERY2("exchange","topic.registery2","registery2"),
    /**
     * 消息通知队列
     */
    MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
    /**
     * 消息通知ttl队列
     */
    MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");

    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}

配置类 RabbitMQConfig.java

package com.example.rabbitMQ;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author yanzt
 * @date 2018/8/6 11:05
 * @description 队列配置
 */
@Configuration
public class RabbitMQConfig {

    @Bean(name = "hello1")
    public Queue hello1(){
        return new Queue(QueueEnum.HELLO1.getName());
    }

    @Bean(name = "hello2")
    public Queue hello2(){
        return new Queue(QueueEnum.HELLO2.getName());
    }

    @Bean(name = "registery1")
    public Queue registery1(){
        return new Queue(QueueEnum.REGISTERY1.getName());
    }

    @Bean(name = "registery2")
    public Queue registery2(){
        return new Queue(QueueEnum.REGISTERY2.getName());
    }

    @Bean(name = "exchange")
    TopicExchange exchange(){
        //new 一个topic exchange (交换机)
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(@Qualifier("hello1") Queue queue,@Qualifier("exchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(QueueEnum.HELLO1.getRouteKey());
    }

    @Bean
    Binding bindingExchangeMessage2(@Qualifier("hello2") Queue queue,@Qualifier("exchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(QueueEnum.HELLO2.getRouteKey());
    }

    @Bean
    Binding bindingExchangeMessage3(@Qualifier("registery1") Queue queue,@Qualifier("exchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(QueueEnum.REGISTERY1.getRouteKey());
    }

    @Bean
    Binding bindingExchangeMessage4(@Qualifier("registery2") Queue queue,@Qualifier("exchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(QueueEnum.REGISTERY2.getRouteKey());
    }

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }



    /**
     * 消息中心实际消费队列交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心延迟消费交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心实际消费队列配置
     *
     * @return
     */
    @Bean
    public Queue messageQueue() {
        return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
    }


    /**
     * 消息中心TTL队列
     * x-dead-letter-exchange、x-dead-letter-routing-key两个参数,
     * 而这两个参数就是配置延迟队列过期后转发的Exchange、RouteKey,只要在创建队列时对应添加了这两个参数
     * @return
     */
    @Bean
    Queue messageTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
                // 配置到期后转发的交换
                .withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
                // 配置到期后转发的路由键
                .withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
                .build();
    }

    /**
     * 消息中心实际消息交换与队列绑定
     *
     * @param messageDirect 消息中心交换配置
     * @param messageQueue  消息中心队列
     * @return
     */
    @Bean
    Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
        return BindingBuilder
                .bind(messageQueue)
                .to(messageDirect)
                .with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
    }

    /**
     * 消息中心TTL绑定实际消息中心实际消费交换机
     *
     * @param messageTtlQueue
     * @param messageTtlDirect
     * @return
     */
    @Bean
    public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
        return BindingBuilder
                .bind(messageTtlQueue)
                .to(messageTtlDirect)
                .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
    }

}

生产者

package com.example.rabbitMQ;

import com.example.model.ssm.UserInf;
import com.example.utils.CodecUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.UUID;

/**
 * @author yanzt
 * @date 2018/8/6 11:39
 * @description 生产者
 */
@Component(value = "helloSender")
@Slf4j
public class HelloSender{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        Object context = "hello1" + new Date();
        /*pringboot以及完美的支持对象的发送和接收,不需要格外的配置*/
//        UserInf context = new UserInf();
//        context.setUname("root");
//        context.setPasswd(CodecUtil.BCrypt("123"));
        log.info("Sender1 : " + context);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        this.rabbitTemplate.convertAndSend(QueueEnum.HELLO1.getExchange(),QueueEnum.HELLO1.getRouteKey(),context,correlationData);
    }

    @PostConstruct
    public void init(){
        this.rabbitTemplate.setConfirmCallback(new msgSendConfirmCallBack());
        this.rabbitTemplate.setReturnCallback(new MsgSendReturnCallback());
    }

    /**
     * 消息回调确认
     */
    public class msgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback{

        /**
         * 当消息发送到交换机(exchange)时,该方法被调用.
         * 1.如果消息没有到exchange,则 ack=false
         * 2.如果消息到达exchange,则 ack=true
         * @param correlationData
         * @param ack
         * @param cause
         * */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("消息ID : " + correlationData == null ? null:correlationData.getId());
            if (ack) {
                log.info("消息发送成功");
            } else {
                log.info("消息发送失败:" + cause);
            }
        }
    }

    public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback{

        /**
         * 当消息从交换机到队列失败时,该方法被调用。(若成功,则不调用)
         * 需要注意的是:该方法调用后,MsgSendConfirmCallBack中的confirm方法也会被调用,且ack = true
         * */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("MsgSendReturnCallback [消息从交换机到队列失败]  message:"+message);
            log.info("message : " + message + "replyCode : " + replyCode + "replyText : " + replyText + "exchange : " + exchange + "routingKey : " + routingKey);
        }
    }

} 

消费者

package com.example.rabbitMQ;

import com.example.model.ssm.UserInf;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author yanzt
 * @date 2018/8/6 11:42
 * @description 消费者
 */
@Component
@RabbitListener(queues = "topic.hello1")
@Slf4j
public class HelloReceiver {

    @RabbitHandler
    public void process(Message message, Channel channel) throws IOException {
        try {
            log.info("Receiver1 : " + message);
            log.info("Receiver1.class : " + message.getClass());
            /**
             * 第一个参数 deliveryTag:就是接受的消息的deliveryTag,可以通过msg.getMessageProperties().getDeliveryTag()获得
             * 第二个参数 multiple:如果为true,确认之前接受到的消息;如果为false,只确认当前消息。
             * 如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,
             * 能够提高效率。
             *
             * 同样的,如果要nack或者拒绝消息(reject)的时候,
             * 也是调用channel里面的basicXXX方法就可以了(要指定tagId)。
             *
             * 注意:如果抛异常或nack(并且requeue为true),消息会重新入队列,
             * 并且会造成消费者不断从队列中读取同一条消息的假象。
             */
            // 确认消息
            // 如果 channel.basicAck   channel.basicNack  channel.basicReject 这三个方法都不执行,消息也会被确认
            // 所以,正常情况下一般不需要执行 channel.basicAck
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            log.error("OrderConsumer  handleMessage {} , error:",message,e);
            /*
             * 消息的标识,false只确认当前一个消息收到,true确认consumer获得的所有消息
             * channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
             *
             * ack返回false,并重新回到队列
             * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
             *
             * 拒绝消息
             * channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
             *
             */
            // 处理消息失败,将消息重新放回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }
    }

}

发送延迟消息

RabbitMQConfig中配置好messageQueue、messageTtlQueue两个队列及其交换机以及绑定关系

MessageSender.java

package com.example.rabbitMQ.lazy;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Date;
import java.util.UUID;

/**
 * @author yanzt
 * @date 2018/8/8 14:30
 * @describe
 */
@Component
@Slf4j
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送延迟消息
     *
     * @param messageContent 消息内容
     * @param exchange       队列交换
     * @param routerKey      队列交换绑定的路由键
     * @param delayTimes     延迟时长,单位:毫秒
     */
    public void send(Object messageContent, String exchange, String routerKey, final long delayTimes){
        if (!StringUtils.isEmpty(exchange)) {
            log.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, messageContent);
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            this.rabbitTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
                // 设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }, correlationData);
        }else {
            log.error("未找到队列消息:{},所属的交换机", exchange);
        }
    }
}

MessageReceiver.java

package com.example.rabbitMQ.lazy;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author yanzt
 * @date 2018/8/8 14:30
 * @description 消费者
 */
@Component
@RabbitListener(queues = "message.center.create")
@Slf4j
public class MessageReceiver {

    @RabbitHandler
    public void process(String hello){
        log.info("message.center.create---消费内容:{}", hello);
    }
}

测试延迟消息消费

/**
* 测试延迟消息消费
* */
@Test
public void test2(){
    String messageContent = "测试延迟消费,写入时间:" + new Date();
    String exchange = QueueEnum.MESSAGE_TTL_QUEUE.getExchange();
    String routerKey = QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey();
    long delayTimes = 10000;

    messageSender.send(messageContent,exchange,routerKey,delayTimes);

}

AmqpAdmin

package com.example.rabbitMQ;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author yanzt
 * @date 2018/8/8 16:21
 * @describe Amqp管理工具类
 */
@Component
public class AmqpConfig {

    @Autowired
    private AmqpAdmin amqpAdmin;

    public void declareExchange(String exchange_name){
        //创建exchange
        amqpAdmin.declareExchange(new TopicExchange(exchange_name));
    }

    public void declareQueue(String queue_name){
        //创建queue
        amqpAdmin.declareQueue(new Queue(queue_name));
    }

    public void declareBinding(String exchange_name,String queue_name,String routeKey,Map<String, Object> arguments){
        //绑定队列
        amqpAdmin.declareBinding(new Binding(queue_name, Binding.DestinationType.QUEUE,exchange_name,routeKey,null));
    }

    public void deleteExchange(String exchange_name){
        //删除exchange
        amqpAdmin.deleteExchange(exchange_name);
    }

    public void deleteQueue(String queue_name){
        //删除queue
        amqpAdmin.deleteQueue(queue_name);
    }

}

test

@Test
    public void testAmqp(){
        amqpConfig.declareExchange("test_exchange");
        amqpConfig.declareQueue("test_queue");
        amqpConfig.declareBinding("test_queue", "test_exchange","test",null);
    }

    @Test
    public void testAmqp2(){
        amqpConfig.deleteExchange("test_exchange");
        amqpConfig.deleteQueue("test_queue");
    }

   转载规则


《RabbitMQ消息队列》 yywzt 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
Docker Docker
安装Docker安装linux虚拟机​ 1)、VMWare、VirtualBox(安装); ​ 2)、导入虚拟机文件centos7-atguigu.ova; ​ 3)、双击启动linux虚拟机;使用 root/ 1234
2018-08-16 yywzt
下一篇 
IDEA配置热部署 IDEA配置热部署
IDEA配置热部署Spring Boot Devtools(推荐)– 引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <ar
2018-08-09
  目录