# SpringCloud Bus 消息总线
作者:小傅哥
博客:https://bugstack.cn (opens new window)
沉淀、分享、成长,让自己和他人都能有所收获!😄
大家好,我是技术UP主小傅哥。
在互联网公司中开发的项目经常有一种场景,是在不重启应用的情况下,变更应用中某个属性信息的值。比如,我们为系统新增加允许外部调用接入的SC渠道值,测试阶段验证名单PIN、持续发布上线后的切量。这些东西都是不重启应用的情况下,动态做配置变更,那这样的东西在 SpringCloud 有什么现成的组件可以使用呢?
在大厂这个组件叫什么?
这个东西它不同于 Redis,而是把配置写到本地类对应的属性上。而不是像 Redis 那样从一个统一的地方每次去取使用。在大厂中我们管这个组件叫统一配置中,专门应对分布式工程中类对应属性的值的控制。
在小傅哥的大营销项目中,也带着大家实现过这样一款组件,叫DCC (opens new window),基于 Zookeeper + AOP 切面实现。
那么我们本节来看看 SpringCloud 是如何来处理这样一个场景的。
# 一、组件介绍
Spring Cloud Bus 将分布式系统的节点与轻量级消息代理相链接。然后可以使用它来广播状态更改(例如配置更改)或其他管理指令。该项目包含 AMQP 和 Kafka 代理实现。
在微服务架构中,通常使用轻量级的消息代理来创建一个共享的消息主题,让所有微服务实例都可以连接到这个主题上。因为这个主题中的消息会被所有实例监听和消费,因此通常称之为“消息总线”。连接到总线的每个实例都可以轻松地广播消息,以便所有其他连接该主题的实例能够接收到这些信息。
# 二、测试工程
小傅哥这里搭建了一套用于测试验证 SpringCloud Bus 消息总线的服务。
**工程**:[https://github.com/fuzhengwei/xfg-dev-tech-springcloud-bus](https://github.com/fuzhengwei/xfg-dev-tech-springcloud-bus) - `你可以把工程fork到你的github仓库,之后做后面的操作。`- 环境要求;jdk 1.8、maven 3.8.x、kafka - 提供了 docker 安装脚本在 docs 下。之后还有一个 natapp (opens new window) 做内网穿透。
- 模块职责;config-bus 配置了整套消息总线所需的服务模块,一个是 eureka 的 registry 注册中心,一个是 SpringCloud Bus 消息总线的服务 server。kafka 是通用的模块,便于统一配置。xfg-dev-tech-app 是测试工程模块。
# 三、环境安装
本节的案例工程会需要用到 Kafka、RabbitMQ,所以需要安装这两套环境。
- Mac 电脑会比较好安装一些,直接在 IntelliJ IDEA 点击小绿色按钮即可完成安装。安装完成后进入 http://localhost:9000/#!/2/docker/containers (opens new window)- 可看到 Kafka、RabbitMQ 运行。
- Windows 需要开启 wsl2 在安装 Docker 之后就可以安装 docker 使用了。
- 如果本机电脑配合低或者比较旧不好安装,推荐使用云服务器进行操作。云服务器就相当于你的一个远程电脑了,非常适合部署这些环境,同时怎么这套都不会影响你的本地环境。[https://618.gaga.plus - 推荐2c4g云服务。
# 四、功能实现
# 1. config-bus-kafka
@Configuration
@PropertySource("classpath:system.properties")
public class KafkaConfig {
}
2
3
4
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=springcloud-config-bus-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
2
3
4
5
6
7
8
9
10
11
12
13
14
- 做一个统一的 kafka 配置 model,让其他模块引入。
# 2. config-bus-registry
package cn.bugstack.xfg.dev.tech;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
public class ConfigBusRegistryApplication {
public static void main(String[] args) {
SpringApplication.run( ConfigBusRegistryApplication.class, args );
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 7397
spring:
application:
name: eureka-server
eureka:
instance:
# 使用 ip 代替实例名
prefer-ip-address: true
# 实例的主机名
hostname: ${spring.cloud.client.ip-address}
# 实例的 ID 规则
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
client:
# 是否向注册中心注册自己
registerWithEureka: false
# 是否向注册中心获取注册信息
fetchRegistry: false
serviceUrl:
# 注册中心地址
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- 这部分是一个 eureka 的服务端,让注册中心和客户端,都被 eureka 管理。
# 3. config-bus-server
package cn.bugstack.xfg.dev.tech;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableConfigServer
@EnableEurekaClient
public class ConfigBusServerApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigBusServerApplication.class, args);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 端口
server:
port: 8000
spring:
application:
name: config-bus-server
cloud:
config:
server:
git:
# 仓库地址
uri: https://github.com/fuzhengwei/xfg-dev-tech-springcloud-bus
# 对应 {label} 部分,即 Git 的分支
label: master
# 仓库文件夹名称,多个以逗号分隔
search-paths: config-bus/config-repo
# git 仓库用户名(公开库可以不用填写)
username:
# git 仓库密码(公开库可以不用填写)
password:
bus:
# 开启消息跟踪
enabled: true
trace:
enabled: true
kafka:
consumer:
group-id: config-bus-server-group
eureka:
instance:
prefer-ip-address: true
hostname: ${spring.cloud.client.ip-address}
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
client:
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:7397/eureka/
management:
endpoints:
web:
exposure:
# 开启刷新端点
include: bus-refresh
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
- git 部分的配置,如注释说明。之后你要修改为自己的 Github 地址,这样你在修改配置时候,才能做 webhook 调用变更。
- kafka 是默认的消费id,不需要修改。
- management 需要开启 bus-refresh 刷新断点。
# 4. config-repo
system-dev.properties
hello=I'm xfg dev config 09
hi=I'm xfg dev config 08
2
- 这一层是配置文件,后面在你提交代码修改的时候,工程里也会一起修改。
# 5. xfg-dev-tech-app
# 5.1 动态配置
@RestController
@RefreshScope
public class ConfigClientController {
@Value("${hello}")
private String hi;
@RequestMapping("/hi")
public String hi() {
return this.hi;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
验证时访问地址;http://127.0.0.1:9000/hi (opens new window)
# 5.2 刷新配置
package cn.bugstack.xfg.dev.tech.trigger;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
@Slf4j
@RestController
public class GitHubWebhookController {
@PostMapping("/webhook")
public String handleGitWebhook(@RequestBody String payload) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode content = mapper.readTree(payload);
log.info("收到 webhook {} 更新配置通知", content.get("pusher"));
// 创建URL对象
URL url = new URL("http://127.0.0.1:8000/actuator/bus-refresh");
// 打开连接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法为POST
connection.setRequestMethod("POST");
// 开启输入输出流
connection.setDoOutput(true);
// 设置请求头,如果需要,可以设置Content-Type等
connection.setRequestProperty("Content-Type", "application/json");
// 获取输出流
try (OutputStream os = connection.getOutputStream()) {
// 如果有请求体数据,也可以在这里写入
// String jsonInputString = "{\"key\": \"value\"}";
// os.write(jsonInputString.getBytes("utf-8"));
os.flush();
}
// 发送请求并获取响应码
int responseCode = connection.getResponseCode();
log.info("调用 actuator/bus-refresh 更新全局配置完成 code:{}", responseCode);
} catch (Exception e) {
e.printStackTrace();
}
return "done";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
- 做一个 webhook 接口,github 回调后,调用
http://127.0.0.1:8000/actuator/bus-refresh
刷新配置。 - 也可以手动访问
http://127.0.0.1:8000/actuator/bus-refresh
自己刷新配置验证。
# 五、功能验证
# 1. 前置配置
# 1.1 内网穿透
获取 natapp 免费隧道 authtoken,https://natapp.cn/tunnel/lists (opens new window) 配置到工程中。
- 注意:免费隧道配置端口为9000,因为是要把本地这个 9000 端口的服务,映射出去。
- 启动 natapp 后,会得到一个公网域名地址。这个地址免费的会不断地变化,测试的时候注意。
# 1.2 webhook 配置
进入 GitHub 工程中,Settings -> Webhooks 页面。地址:https://github.com/fuzhengwei/xfg-dev-tech-springcloud-bus/settings/hooks/517530722 (opens new window) - 你的和我的不同
- 拿到公网地址后,配置 webhook。如图配置完点击下面完成。更新的时候点击 update webhook。
# 2. 启动服务
陆续的启动;config-bus-registry、config-bus-server、xfg-dev-tech-app。
# 3. 服务测试
# 3.1 第1次,访问配置接口
地址:http://127.0.0.1:9000/hi (opens new window)
I'm xfg dev config 09
# 3.2 更新线上配置
- 你可以在线更新配置,也可以本地更新配置后提交代码到 github。
- 变更后点击 commit changes
- 查看到 webhook 推送的记录。是成功的。
- 查看日志变更记录。
webhook {"name":"fuzhengwei","email":"fuzhengwei@users.noreply.github.com"} 更新配置通知
# 3.2 第2次,访问配置接口
地址:http://127.0.0.1:9000/hi (opens new window)
I'm xfg dev config 10
- 配置已经从09变更为10,代表测试成功了。
- 另外你还可以访问 http://127.0.0.1:8000/system/dev (opens new window) 查看整体的配置信息。
{
"name": "system",
"profiles": [
"dev"
],
"label": null,
"version": "fccaf3233af6d0ae16571d2c907ff87eaf1c8946",
"state": null,
"propertySources": [
{
"name": "https://github.com/fuzhengwei/xfg-dev-tech-springcloud-bus/config-bus/config-repo/system-dev.properties",
"source": {
"hello": "I'm xfg dev config 10",
"hi": "I'm xfg dev config 08"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18