spirngcloud消息驱动stream
概述
Spring Cloud Stream Reference Documentation
Why Spring Cloud Stream消息驱动
屏蔽底层消息中间件的差异,降低切换和维护成本,统一消息的编程模型
What Spring Cloud Stream 消息驱动
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.
Spring Cloud Stream 是一个用来构建连接共享消息系统的高伸缩的基于事件驱动的微服务应用框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring方言和最佳实践上,包括对持久化的发布/订阅语义,消费分组和有状态的分区的支持。
Stream设计思想
标准MQ结构图
生产者/消费者
之间通过 消息媒介
传递消息内容
Spring Cloud Stream结构图
比如说我们用到了 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同。像 RabbitMQ 有 exchange
,Kafka 有 Topic
和 Partions
分区的概念。
这些中间件的差异性,给我们实际项目的开发造成了一定的困扰。我们如果用了两个消息队列中的其中一个,后面的业务需求如果向往另外一种消息队列进行迁移,这需求简直是灾难性的。因为它们之间的耦合性过高,导致一大堆东西都要重新推到来做,这时候 Spring Cloud Stream 无疑是一个好的选择,它为我们提供了一种解耦合的方式。
Stream为什么可以统一底层差异
在没有绑定器这个概念的情况下,我们的 Spring Boot 应用直接与消息中间件进行信息交互时,由于个消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。
通过定义绑定器(Binder)作为中间层,就可以完美的实现应用程序与消息中间件细节的隔离。
Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以更多的关注自己的业务流程。
Binder
- 生产者 -> output -> Binder -> input -> 消费者
Spring Cloud Stream 处理架构
通过定义绑定器(Binder)作为中间层,实现了应用程序与消息中间件细节的隔离
Stream中的消息通信方式遵循了发布-订阅模式
使用topic主题进行广播,在rabbitmq中就是exchange,在kafka中就是topic
Spring Cloud Stream 标准流程
Binder
绑定器,很方便的 连接中间件
,屏蔽 MQ 之间的差异
Channel
通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel 对队列进行配置;
Source/Sink
- Source 对应 output,生产者输出消息到Source通道
- Sink 对应Input,Sink通道输入消息到消费者
编程API和常用注解
案例演示
案例说明
新建3个子模块
- cloud-stream-rabbitmq-provider8801 作为生产者进行发消息模块
- cloud-stream-rabbitmq-provider8802 作为消息接收者模块
- cloud-stream-rabbitmq-provider8803 作为消息接收者模块
消息驱动之生产者
-
新建生产者moudle cloud-stream-rabbitmq-provider8801
-
pom
引入stream-rabbit依赖
1 2 3 4 5
<!--引入stream-rabbit依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
完整依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
<dependencies> <!--引入stream-rabbit依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies>
-
yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
server: port: 8801 spring: application: name: cloud-stream-rabbitmq-provider8801 # 设置rabbitmq的相关的环境配置 rabbitmq: host: localhost port: 5672 username: admin password: admin virtual-host: eh-vhost cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; eh-rabbit: # 表示定义的绑定器名称,用于和binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称,对应org.springframework.cloud.stream.messaging.Source.OUTPUT destination: exchange.eh.test # 表示要使用的exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: eh-rabbit # 设置要绑定的消息服务的具体设置,需与自定义名称一致,飘红:Settings->Editor->Inspections->Spring->Spring Boot->Spring Boot application.yml 对勾去掉
-
主启动类
-
业务类
消息发送服务接口
1 2 3 4 5
package com.eh.cloud.stream.rabbit.provider.message; public interface MessageSender { void send(String msg); }
消息发送服务实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
package com.eh.cloud.stream.rabbit.provider.message; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; @Slf4j // 绑定一个输出通道,对应于spring.cloud.stream.bindings.output @EnableBinding(Source.class) public class MessageSenderImpl implements MessageSender { // 注入消息发送管道 @Autowired @Qualifier("output") private MessageChannel messageChannel; @Override public void send(String msg) { messageChannel.send(MessageBuilder.withPayload(msg).build()); log.info("成功发送消息:" + msg); } }
controller层测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
package com.eh.cloud.stream.rabbit.provider.controller; import com.eh.cloud.stream.rabbit.provider.message.MessageSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RestController public class MessageSendController { @Autowired private MessageSender messageSender; @GetMapping("/sendMsg") public String sendMessage() { String msg = UUID.randomUUID().toString(); messageSender.send(msg); return msg; } }
-
测试
访问:http://localhost:8801/sendMsg 查看后台打印日志
成功发送消息:908ed725-5b8b-424f-bd44-6e92b364618c
rabbitmq管理后台会新建一个exchange,名字和配置文件中spring.cloud.stream.bindings.output.destination对应
消息驱动之消费者
-
新建消费者 moudle cloud-stream-rabbitmq-consumer8802
-
pom
1 2 3 4 5 6 7 8 9 10 11
<dependencies> <!--引入stream-rabbit依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
-
yml
和provider不同的地方是通道名字使用的input,与自动配置保持一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
server: port: 8802 spring: application: name: cloud-stream-rabbitmq-consumer8802 # 设置rabbitmq的相关的环境配置 rabbitmq: host: localhost port: 5672 username: admin password: admin virtual-host: eh-vhost cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; eh-rabbit: # 表示定义的绑定器名称,用于和binding整合 type: rabbit # 消息组件类型 bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称,对应org.springframework.cloud.stream.messaging.Source.INPUT destination: exchange.eh.test # 表示要使用的exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: eh-rabbit # 设置要绑定的消息服务的具体设置,需与自定义名称一致,飘红:Settings->Editor->Inspections->Spring->Spring Boot->Spring Boot application.yml 对勾去掉
-
主启动类
-
业务类
消息接收者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
package com.eh.cloud.steram.rabbitmq.consumer.message; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; @Slf4j @EnableBinding(Sink.class) public class MessageReceiver { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void receive(Message<String> msg) { log.info("消费者{},接收到消息:{}", serverPort, msg.getPayload()); } }
-
测试8801发送消息,8802能否接收到
访问:http://localhost:8801/sendMsg , 查看消费端日志,输出如下:
1
2020-11-13 15:33:41.026 INFO 94784 --- [vmFi8aLOFviNQ-1] c.e.c.s.r.c.message.MessageReceiver : 消费者8802,接收到消息:6374befa-1a28-4c56-8d18-3f9fb5914493
查看rabbitmq管理后台会发现多了一个queue,并且绑定到生产者的exchange
exchange.eh.test
上
问题和解决
重复消费问题
- 拷贝cloud-stream-rabbitmq-consumer8802,新建cloud-stream-rabbitmq-consumer8803
- 8801发送消息之后同时被8802和8803消费了
使用分组解决重复消费
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
8802和8803修改yml,设置成相同的分组
|
|
重新启动8802和8803,8801测试发送消息,此时可以看到消息只被一个客户端消费了。
查看管理后台exchange绑定的queue变化情况:
显然,分组就是绑定queue,不同组广播,同组轮询。
消息丢失问题
加了group自动支持持久化,因为如果不加group属性,每次应用重启系统自动分配一个随机的group,也就是queue,rabbitmq当然不会推送原先queue中的消息给新的queue。设置了分组以后,重启后还是相同的分组就能接收到消息推送了。