Bus 消息总线

1、SpringCloud Bus 介绍

1-1、消息总线的由来

回顾上一篇文章 Config 分布式配置中心存在的问题

  • 假如有多个微服务客户端 3355/3366/3377……,每个微服务都要执行一次 post 请求,手动刷新?

  • 可否广播,一次通知,处处生效?

  • 我们想大范围的自动刷新,求方法

以上就是消息总线的产生由来

Spring Cloud Bus 配合 Spring Cloud Config 使用可以实现配置的动态刷新

1-2、是什么

Spring Cloud Bus 是用来将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了 Java 的事件处理机制和消息中间件的功能。

Spring Cloud Bus 目前支持 RabbitMQ 和 Kafka。

1-3、能干嘛

Spring Cloud Bus 能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当作微服务间的通信通道。

1-4、总线

什么是总线

在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。

在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。

基本原理

ConfigClient 实例都监听 MQ 中同一个 topic(默认是 springCloudBus)。当一个服务刷新数据的时候,它会把这个信息放入到 Topic 中,这样其它监听同一 Topic 的服务就能得到通知,然后去更新自身的配置。

相关链接:Stream消息驱动

2、RabbitMQ 环境配置

安装 Erlang,下载地址:http://erlang.org/download/otp_win64_21.3.exe

安装 RabbitMQ,下载地址:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.14/rabbitmq-server-3.7.14.exe

进入 RabbitMQ 安装目录下的 sbin 目录,输入以下命令启动管理功能

1
rabbitmq-plugins enable rabbitmq_management

访问地址查看是否安装成功:http://localhost:15672/

输入账号密码并登录:guest guest

3、动态刷新全局广播

3-1、搭建客户端微服务 3366

必须先具备良好的 RabbitMQ 环境先,演示广播效果,增加复杂度,再以 3355 为模板再制作一个 3366

新建 cloud-config-client-3366

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springCloud2023</artifactId>
<groupId>com.jcvv.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloud-config-client-3366</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

bootstrap.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
server:
port: 3366

spring:
application:
name: config-client
cloud:
#Config客户端配置
config:
label: master #分支名称
name: config #配置文件名称
profile: test #读取后缀名称 上述3个综合:master分支上config-test.yml的配置文件被读取
uri: http://localhost:3344 #配置中心地址
# 综合上面四个 即读取配置文件地址为: http://config-3344.com:3344/master/config-test.yml

#服务注册到eureka
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka

# 暴露监控端点
management:
endpoints:
web:
exposure:
include: '*'

主启动类

1
2
3
4
5
6
7
@SpringBootApplication
@EnableEurekaClient
public class ConfigClientMain3366 {
public static void main(String[] args) {
SpringApplication.run(ConfigClientMain3366.class,args);
}
}

controller 层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RefreshScope
public class ConfigClientController {
@Value("${server.port}")
private String serverPort;

@Value("${config.info}") //gitee里的yml文件里的内容
private String configInfo;

@RequestMapping("/configInfo")
public String getConfigInfo(){
return "serverPort: "+serverPort+"\t\n\n configInfo: "+configInfo;
}
}

3-2、设计思想

(1)利用消息总线触发一个客户端/bus/refresh,而刷新所有客户端的配置

(2)利用消息总线触发一个服务端 ConfigServer 的/bus/refresh 端点,而刷新所有客户端的配置

图二的架构显然更加适合,图一不适合的原因如下:

  • 打破了微服务的职责单一性,因为微服务本身是业务模块,它本不应该承担配置刷新的职责。
  • 破坏了微服务各节点的对等性。
  • 有一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,那就会增加更多的修改

3-3、配置中心 3344 添加消息总线支持

pom.xml

添加消息总线支持

1
2
3
4
5
<!--添加消息总线RabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

application.yml

添加以下内容,配置 rabbitmq 并且暴露端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
rabbitmq:
host: 119.3.211.104
port: 5672
username: cj
password: 123456

# 暴露bus刷新配置的端点 actuator刷新配置
management:
endpoints:
web:
exposure:
include: 'bus-refresh' #Post /bus/refresh 官网架构图

说明:

因为手动刷新需要自己调用一个类似于健康检查的端点(接口),所以呢,我们需要把这个端点给暴露出来,以便外部可访问。

3-4、客户端 3355,3366 添加消息总线支持

pom.xml

添加消息总线支持

1
2
3
4
5
<!--添加消息总线RabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

bootstrap.yml

添加 Rabbitmq 相关配置

1
2
3
4
5
6
7
spring:
#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
rabbitmq:
host: 119.3.211.104
port: 5672
username: cj
password: 123456

3-5、测试

修改 Gitee 上配置文件增加版本号

发送 POST 请求

1
curl -X POST "http://localhost:3344/actuator/bus-refresh"

测试结果:一次修改,广播通知,处处生效

4、动态刷新定点通知

不想全部通知,只想定点通知:只通知 3355,不通知 3366

简单一句话:指定具体某一个实例生效而不是全部

公式

1
http://localhost:配置中心的端口号/actuator/bus-refresh/{destination}

/bus-refresh 请求不再发送到具体的服务实例上,而是发给 config server 并通过 destination 参数类指定需要更新配置的服务或实例

4-1、案例

我们这里以刷新运行在 3355 端口上的 config-client 为例:只通知 3355,不通知 3366

1
curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"

通知总结 All

5、额外内容:Springboot Actuator 的说明

5-1、是什么

监控中心是针对微服务期间

  • 查看服务器内存变化(对内存,线程,日志管理等)

  • 检测服务配置连接池地址是否可用(模拟访问,懒加载)

  • 统计现在有多个 bean(是 Spring 容器中的 bean)
  • 统计 SpringMVC@RequestMapping(统计 http 接口)

使用 Actuator 来查看这些信息,它是没有界面的返回的是 json 格式的数据

AdminUi 底层使用的是 Actuator 实现的,只不过给它加了个可视化界面

5-2、应用场景

生产环境

使用它的原因:它是 springboot 的一个附加功能,可帮助你在应用程序生产环境时监控和管理应用程序,可使用 Http 的各种请求来监管,审计,收集应用的运行情况,特别对于微服务管理十分有意义。

建议使用 springboot2.0.5,因为它里面返回的信息更加全面。

springboot 提供了对项目的监控功能。

5-3、如何使用

导入依赖包

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>

配置端点

在 application.properties 中配置端点,

1
2
3
4
5
6
7
8
9
10
11
暴露部分端点

management.endpoints.web.exposure.include=info,health,beans,env

暴露所有端点

management.endpoints.web.exposure.include=*

不暴露beans端点

management.endpoints.web.exposure.exclude=beans

在上述配置中,首先使用 management.endpoints.web.exposure.include 暴露所有的端点,接着使用 management.endpoints.web.exposure.exclud 排除 beans 端点,这样就能够暴露除 beans 外的所有 actuator 端点了。

浏览器访问

1
http://127.0.0.1:8080/actuator/health

访问项目监控需要加前缀 /actuator

5-4.端点说明

5-5、总结

上面利用 SpringCloud Config 刷新客户端配置,以及消息总线触发服务端 ConfigServer 的/bus-refresh 端点,而刷新所有客户端的配置,利用的就是 actuator 刷新配置文件。

因为手动刷新需要自己调用一个类似于健康检查的端点(接口),所以呢,我们需要把这个端点给暴露出来,以便外部可访问。

1
curl -X POST "http://localhost:3355/actuator/refresh"
1
curl -X POST "http://localhost:3344/actuator/bus-refresh"

Actuator 参考文章:https://blog.csdn.net/munangs/article/details/123639480

相关链接:Stream消息驱动