Stream 消息驱动

1、消息驱动 Stream 介绍

1-1、Stream 为什么被引入

常见 MQ(消息中间件):

  • ActiveMQ

  • RabbitMQ

  • RocketMQ
  • Kafka

有没有一种新的技术诞生,让我们不再关注具体 MQ 的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种 MQ 内切换。(类似于 Hibernate)

2、是什么

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

**应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 对象交互。通过我们配置来 binding(绑定),而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。**所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。 通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了**发布-订阅、消费组、分区**的三个核心概念。 目前仅支持 RabbitMQ、 Kafka。 ### 1-3、设计思想 #### 标准 MQ ![](https://jnchan-blog.oss-cn-hangzhou.aliyuncs.com/springcloudImages/18stream_1.png) - 生产者/消费者之间靠消息媒介传递信息内容 - 消息必须走特定的通道 - 消息通道 Message Channel - 消息通道里的消息如何被消费呢,谁负责收发处理 - 消息通道 MessageChannel 的子接口 SubscribableChannel,由 MessageHandler 消息处理器所订阅。 #### **为什么用 Cloud Stream** 比方说我们用到了 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同,像 RabbitMQ 有 exchange,kafka 有 Topic 和 Partitions 分区。 ![](https://jnchan-blog.oss-cn-hangzhou.aliyuncs.com/springcloudImages/18stream_2.png) 这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 Spring Cloud Stream 给我们提供了—种解耦合的方式。 #### Stream 凭什么可以统一底层差异? 在没有绑定器这个概念的情况下,我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。**通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件实现。**

一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

Stream 对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq 切换为 kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Binder

Binder 可以生成 Binding,Binding 用来绑定消息容器的生产者和消费者,它有两种类型

  • INPUT 对应于消费者

  • OUTPUT 对应于生产者

Stream 中的消息通信方式遵循了发布-订阅模式

Topic 主题进行广播

  • 在 RabbitMQ 就是 Exchange
  • 在 Kakfa 中就是 Topic

1-4、Stream 标准流程套路

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过 Channel 对队列进行配置
  • Source 和 Sink:简单的可理解为参照对象是 Spring Cloud Stream 自身,从 Stream 发布消息就是输出,接受消息就是输入

编码 API 和常用注解

组成 说明
Middleware 中间件,目前只支持 RabbitMQ 和 Kafka
Binder Binder 是应用与消息中间件之间的封装,目前实行了 Kafka 和 RabbitMQ 的 Binder,通过 Binder 可以很方便的连接中间件,可以动态的改变消息类型(对应于 Kafka 的 topic,RabbitMQ 的 exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输乎通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道 channel 和 exchange 绑定在一起

2、案例说明

RabbitMQ 环境已经 OK,工程中新建三个子模块

  • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块

  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块

  • cloud-stream-rabbitmq-consumer8803 作为消息接收模块

2-1、消息驱动之生产者 provider8801

cloud-stream-rabbitmq-provider8801

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springCloud2023</artifactId>
<groupId>com.jcvv.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<!--eureka-client 目前,这个不是必须的-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity -->
<groupId>com.jcvv.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
server:
port: 8801

spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于和binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 表示是生产者,向rabbitMQ发送消息
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次是json,文本是 "text/plain"
binder: defaultRabbit # 设置要绑定的消息服务的具体配置

eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳时间,默认是30秒
lease-expiration-duration-in-seconds: 5 # 最大心跳间隔不能超过5秒,默认90秒
instance-id: send-8801.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径变为ip地址

特别注意:⭐⭐⭐

使用了 spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.xx 来配置 rabbitmq 的环境

如果你是用的其他服务器上的 rabbitmq,比如我使用的我自己的阿里云服务器然后创建 docker 容器来运行 rabbitmq。
按照视频中的配置方式的话,启动时会试图连接两次 rabbitmq 程序
第一次试图连接访问的就是 application.yml 中配置的地址,此时已经订阅成功了
但是程序还会在之后进行第二次连接,此时访问的地址就是 localhost:5672,在我的环境中,我本地没有 rabbitmq 环境,所以直接报 IOException。

所以,如果是使用的自己的服务器来配置,则需要修改配置文件,将 rabbitmq 的配置信息移动到 application.yml 中的 spring 节点下
修改后的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
server:
port: 8801

spring:
application:
name: cloud-stream-provider
rabbitmq:
host: 119.3.211.104
port: 5672
username: cj
password: 123456
cloud:
stream:
binders: # 在此配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于和binding整合
type: rabbit # 消息组件类型

bindings: # 服务的整合处理
output: # 表示是生产者,向rabbitMQ发送消息
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次是json,文本是 "text/plain"
binder: { defaultRabbit } # 设置要绑定的消息服务的具体配置

eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳时间,默认是30秒
lease-expiration-duration-in-seconds: 5 # 最大心跳间隔不能超过5秒,默认90秒
instance-id: send-8801.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径变为ip地址

主启动类

1
2
3
4
5
6
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}

service 层

发送消息接口

1
2
3
public interface IMessageProvider {
public String send();
}

发送消息接口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
@EnableBinding(Source.class)//定义消息的推送管道 //不是和controller打交道的service,而是发送消息的推送服务类
public class MessasgeProviderImpl implements IMessageProvider {

@Resource
private MessageChannel output; //消息发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*******serial: "+serial);
return serial;
}
}

controller 层

1
2
3
4
5
6
7
8
9
10
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;

@GetMapping(value = "/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}

测试

2-2、消息驱动之消费者 consumer8802

cloud-stream-rabbitmq-consumer8802

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springCloud2023</artifactId>
<groupId>com.jcvv.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity -->
<groupId>com.jcvv.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
server:
port: 8802
spring:
application:
name: cloud-stream-provider
rabbitmq:
host: 119.3.211.104
port: 5672
username: cj
password: 123456
cloud:
stream:
binders: # 配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于和binding整合
type: rabbit # 消息组件类型
bindings: # 服务的整合处理
input: # 表示是消费者,这里是唯一和生产者不同的地方,向rabbitMQ发送消息
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次是json,文本是 "text/plain"
binder: { defaultRabbit } # 设置要绑定的消息服务的具体配置
group: ailuoA
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳时间,默认是30秒
lease-expiration-duration-in-seconds: 5 # 最大心跳间隔不能超过5秒,默认90秒
instance-id: receive-8802.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径变为ip地址

主启动类

1
2
3
4
5
6
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}

controller 类

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;

@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号,-------->接收到的消息是: "+message.getPayload()+"\t port: "+serverPort);
}

}

测试

2-3、分组消费与持久化

消息重复消费

依照 8802,clone 出来一份运行 8803

启动

  • 分组消费 RabbitMQ

  • 7001:服务注册

  • 8801:消息生产
  • 8802:消息消费
  • 8803:消息消费

运行后有两个问题:

  • 有重复消费问题
  • 消息持久化问题

目前是 8802/8803 同时都收到了,存在重复消费问题

生产实际案例:

比如在如下场景中,订单系统我们做集群部署,都会从 RabbitMQ 中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用 Stream 中的消息分组来解决

解决消息重复消费

原理

微服务应用放置于同一个 group 中,就能够保证消息只会被其中一个应用消费一次。

不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

分组

8002,8003 添加到同一分组即可

application.yml 中添加分组

1
group: ailuoA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
application:
name: cloud-stream-provider
rabbitmq:
host: 119.3.211.104
port: 5672
username: cj
password: 123456
cloud:
stream:
binders: # 配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于和binding整合
type: rabbit # 消息组件类型
bindings: # 服务的整合处理
input: # 表示是消费者,这里是唯一和生产者不同的地方,向rabbitMQ发送消息
destination: studyExchange # 表示要使用的Exchange名称
content-type: application/json # 设置消息类型,本次是json,文本是 "text/plain"
binder: { defaultRabbit } # 设置要绑定的消息服务的具体配置
group: ailuoA

持久化

通过上述,解决了重复消费问题,再看看持久化

  • 停止 8802/8803 并去除掉 8802 的分组 group: ailuoA,8803 的分组 group: ailuoA 没有去掉
  • 8801 先发送 4 条消息到 rabbitmq
  • 先启动 8802,无分组属性配置,后台没有打出来消息
  • 再启动 8803,有分组属性配置,后台打出来了 MQ 上的消息(消息持久化体现)

相关链接:Sleuth分布式请求链路跟踪