目录

springboot与消息队列

消息概述

  1. 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力

  2. 消息服务中两个重要概念:

    消息代理(message broker)和目的地(destination);

    当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

  3. 消息队列应用场景

    异步处理

    http://img.cana.space/picStore/20201026122220.png

    应用解耦

    http://img.cana.space/picStore/20201026122234.png

    流量削锋

    http://img.cana.space/picStore/20201026122248.png

    秒杀,设置消息队列的长度为1万,超过长度则丢弃告诉用户秒杀失败。

  4. 消息队列主要有两种形式的目的地

    1. 队列(queue):点对点消息通信(point-to-point)
    2. 主题(topic):发布(publish)/订阅(subscribe)消息通信
  5. 两种消息通信机制

    1. 点对点式
      • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容, 消息读取后被移出队列
      • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
    2. 发布订阅式
      • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么 就会在消息到达时同时收到消息
  6. 两个消息代理规范

    1. JMS(Java Message Service)JAVA消息服务:

      基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

    2. AMQP(Advanced Message Queuing Protocol)

      • 高级消息队列协议,也是一个消息代理的规范,兼容JMS
      • RabbitMQ是AMQP的实现
    3. 两者对比

      http://img.cana.space/picStore/20201026122659.png

  7. Spring对消息的支持

    • spring-jms提供了对JMS的支持
    • spring-rabbit提供了对AMQP的支持
    • 需要ConnectionFactory的实现来连接消息代理
    • 提供JmsTemplate、RabbitTemplate来发送消息
    • @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
    • @EnableJms、@EnableRabbit开启支持
  8. Spring Boot自动配置

    • JmsAutoConfiguration
    • RabbitAutoConfiguration

RabbitMQ简介

简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

核心概念

http://img.cana.space/picStore/20201026124804.png

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组 成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出 该消息可能需要持久性存储)等。

eg:topic类型的exchange:queue.exchange;binding是eh.# ;绑定的queue是queue.eh,那么发送消息是提供routing-key是eh.xxx就能发送到queue.eh队列。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。所以生产者想要发送消息,首先必须要声明一个Exchange和该Exchange对应的Binding。可以通过 ExchangeDeclare和BindingDeclare完成。Exchange在接到该RoutingKey以后,会判断该ExchangeType。

在Rabbit MQ中,声明一个Exchange需要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在创建Binding和生产者通过publish推送消息时需要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不同的Exchange会表现出不同路由行为。Durable是该Exchange的持久化属性。

声明一个Binding需要提供一个QueueName,ExchangeName和BindingKey。

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息 可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表

Exchange 和Queue的绑定可以是多对多的关系。

Connection

网络连接,比如一个TCP连接。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚 拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这 些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所 以引入了信道的概念,以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有 自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定, RabbitMQ 默认的 vhost 是 / 。

Broker

表示消息队列服务器实体

RabbitMQ运行机制

AMQP 中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被 消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

http://img.cana.space/picStore/20201026125913.png

Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型: direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多, 目前几乎用不到了,所以直接看另外三种类型:

Direct

http://img.cana.space/picStore/20201026130018.png

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为 “dog”,则只转发 routing key 标记为“dog”的消息,不会转 发“dog.puppy”,也不会转发“dog.guard”等等。它是完全 匹配、单播的模式。

Fanout

http://img.cana.space/picStore/20201026130134.png

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键, 只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定 的所有队列上。很像子网广播,每台子网内的 主机都获得了一份复制的消息。fanout 类型转发 消息是最快的。

Topic

http://img.cana.space/picStore/20201026130215.png

topic 交换器通过模式匹配分配消息的路由键属 性,将路由键和某个模式进行匹配,此时队列 需要绑定到一个模式上。它将路由键和绑定键 的字符串切分成单词,这些单词之间用点隔开。 它同样也会识别两个通配符:符号“#”和符号 “*”#匹配0个或多个单词,*匹配一个单词。

RabbitMQ整合

rabbitmq安装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 查看版本
$ dt rabbitmq
"3-alpine"
"3-management-alpine"
"3.8-alpine"
"3.8-management-alpine"
"3.8.9-alpine"
"3.8.9-management-alpine"
"alpine"
"latest"
"management"
"management-alpine"
# 下载
$ docker pull rabbitmq
# 使用镜像
# -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号)
# --hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
# -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
# 68898be27496为IMAGE ID
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
-v /tmp/docker/rabbitmq:/var/lib/rabbitmq \
--hostname eh-rabbit \
-e RABBITMQ_DEFAULT_VHOST=eh-vhost  \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
68898be27496
98b10c4c1ba65d0efa683c4c751e45cb5ed50de871bfa5e905df272ce8568a2a
# 登录管理后台
localhost:15672
用户名/密码: admin/admin

试玩rabbitmq

  1. 登录管理后台,按照下图配置exchange、queue和routingkey(binding),binding可以通过点击exchange或者queue都可以进入设置界面。

    http://img.cana.space/picStore/20201026144123.png

  2. exchange发送消息

    进入到exchange界面设置routing,key发送消息,fanout类型exchange设置不设置都无所谓

    以topic类型为例

    http://img.cana.space/picStore/20201026150225.png

  3. queue接受消息

    http://img.cana.space/picStore/20201026150326.png

springboot整合rabbitmq

快速演示

完整示例地址

引入rabbitmq

1
2
3
4
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置连接属性:

1
2
3
4
5
6
7
spring:
  rabbitmq:
    host: localhost
    username: admin
    password: admin
    port: 5672
    virtual-host: eh-vhost

测试:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.eh.springbootamqp;

import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@SpringBootTest
class SpringbootAmqpApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        Map<String, Object> map = new HashMap<>();
        map.put("hello", "你好");
        map.put("world", "世界");
        map.put("names", Lists.newArrayList("张三", "李四"));
        rabbitTemplate.convertAndSend("exchange.direct", "atguigu", map);
    }

    @Test
    void receive() {
        Object o = rabbitTemplate.receiveAndConvert("atguigu");
        log.info("class:{}", o.getClass());
        log.info("o:{}", o);

    }


}

默认使用jdk序列化,消息体在管理界面是乱码

http://img.cana.space/picStore/20201026160726.png

我们也可以自定义序列化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package com.eh.springbootamqp.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

再来测试

http://img.cana.space/picStore/20201026160949.png

自动配置原理

  • RabbitAutoConfiguration

  • 有自动配置了连接工厂ConnectionFactory

  • RabbitProperties 封装了 RabbitMQ的配置

  • RabbitTemplate :给RabbitMQ发送和接受消息;

  • @EnableRabbit + @RabbitListener 监听消息队列的内容

  • AmqpAdmin : RabbitMQ系统管理功能组件;

    AmqpAdmin:创建和删除 Queue,Exchange,Binding

使用注解监听queue

  1. 开启基于注解的RabbitMQ模式

    1
    2
    3
    4
    5
    6
    7
    8
    
    @EnableRabbit
    @SpringBootApplication
    public class SpringbootAmqpApplication {
       
        public static void main(String[] args) {
            SpringApplication.run(SpringbootAmqpApplication.class, args);
        }
    }
    
  2. 监听queue

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    package com.eh.springbootamqp.service;
       
    import com.eh.springbootamqp.bean.Book;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
       
    @Component
    public class BookListener {
       
        @RabbitListener(queues = "anguigu.new")
        public void receive(Book book) {
            System.out.println("收到消息:" + book);
        }
       
        @RabbitListener(queues = "atguigu")
        public void receive02(Message message) {
            System.out.println(message.getBody());
            System.out.println(message.getMessageProperties());
        }
    }
    

AmqpAdmin

RabbitMQ系统管理功能组件; AmqpAdmin:创建和删除 Queue,Exchange,Binding

测试程序:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.eh.springbootamqp;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


@Slf4j
@SpringBootTest
class SpringbootAmqpApplicationTests {

    @Autowired
    private AmqpAdmin amqpAdmin;


    /**
     * 测试发送消息
     */
    @Test
    void contextLoads() {
        // 创建exchange
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        // 创建queue
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
        // 创建binding
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,
                "amqpadmin.exchange", "amqpadmin.routingKey", null));
        // 解绑
        amqpAdmin.removeBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,
                "amqpadmin.exchange", "amqpadmin.routingKey", null));
        // 删除exchange
        amqpAdmin.deleteExchange("amqpadmin.exchange");
        // 删除queue
        amqpAdmin.deleteQueue("amqpadmin.queue");

    }
}