首页>>后端>>Spring->SpringCloud微服务实战——搭建企业级开发框架(三十七):使用Spring Cloud Stream实现可灵活配置消息中间件的功能

SpringCloud微服务实战——搭建企业级开发框架(三十七):使用Spring Cloud Stream实现可灵活配置消息中间件的功能

时间:2023-11-30 本站 点击:0

  在以往消息队列的使用中,我们通常使用集成消息中间件开源包来实现对应功能,而消息中间件的实现又有多种,比如目前比较主流的ActiveMQ、RocketMQ、RabbitMQ、Kafka,Stream等,这些消息中间件的实现都各有优劣。   在进行框架设计的时候,我们考虑是否能够和之前实现的短信发送、分布式存储等功能一样,抽象统一消息接口,屏蔽底层实现,在用到消息队列时,使用统一的接口代码,然后在根据自己业务需要选择不同消息中间件时,只需要通过配置就可以实现灵活切换使用哪种消息中间件。Spring Cloud Stream已经实现了这样的功能,下面我们在框架中集成并测试消息中间件的功能。

目前spring-cloud-stream官网显示已支持以下消息中间件,我们使用RabbitMQ和Apache Kafka来集成测试:

RabbitMQ

Apache Kafka

Kafka Streams

Amazon Kinesis

Google PubSub (partner maintained)

Solace PubSub+ (partner maintained)

Azure Event Hubs (partner maintained)

AWS SQS (partner maintained)

AWS SNS (partner maintained)

Apache RocketMQ (partner maintained)

一、集成RabbitMQ并测试消息收发

  RabbitMQ是使用Erlang语言实现的,这里安装需要安装Erlang的依赖等,这里为了快速安装测试,所以使用Docker安装单机版RabbitMQ。

1、拉取RabbitMQ的Docker镜像,后缀带management的是带web管理界面的镜像

docker pull rabbitmq:3.9.13-management

2、创建和启动RabbitMQ容器

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management

3、查看RabbitMQ是否启动

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq

4、访问管理控制台http://172.16.20.225:15672 ,输入设置的用户名密码 admin/123456登录。如果管理台不能访问,可以尝试使用一下命令启动:

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

5、Nacos添加配置,我们以操作日志和API日志为示例,说明自定义输入和输出通道进行消息收发,operation-log为操作日志,api-log为API日志。注意,官网有文档说明:使用multiple RabbitMQ binders 时需要排除RabbitAutoConfiguration,实际应用过程中,如果不排除,也不直接配置RabbitMQ的连接,那么RabbitMQ健康检查会默认去连接127.0.0.1:5672,导致后台一直报错。

spring:  autoconfigure:    # 使用multiple RabbitMQ binders 时需要排除RabbitAutoConfiguration    exclude:       - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration  cloud:    stream:      binders:         defaultRabbit:           type: rabbit          environment: #配置rabbimq连接环境            spring:               rabbitmq:                host: 172.16.20.225                username: admin                password: 123456                virtual-host: /       bindings:         output_operation_log:          destination: operation-log  #exchange名称,交换模式默认是topic          content-type: application/json          binder: defaultRabbit        output_api_log:          destination: api-log  #exchange名称,交换模式默认是topic          content-type: application/json          binder: defaultRabbit        input_operation_log:           destination: operation-log          content-type: application/json          binder: defaultRabbit          group: ${spring.application.name}          consumer:            concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1        input_api_log:           destination: api-log          content-type: application/json          binder: defaultRabbit          group: ${spring.application.name}          consumer:            concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1

6、在gitegg-service-bigdata中添加spring-cloud-starter-stream-rabbit依赖,这里注意,只需要在具体使用消息中间件的微服务上引入,不需要统一引入,并不是每个微服务都会用到消息中间件,况且可能不同的微服务使用不同的消息中间件。

        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>        </dependency>

7、自定义日志输出通道LogSink.java

/** * @author GitEgg */public interface LogSink {    String INPUT_OPERATION_LOG = "output_operation_log";    String INPUT_API_LOG = "output_api_log";    /**     * 操作日志自定义输入通道     * @return     */    @Input(INPUT_OPERATION_LOG)    SubscribableChannel inputOperationLog();    /**     * API日志自定义输入通道     * @return     */    @Input(INPUT_API_LOG)    SubscribableChannel inputApiLog();}

8、自定义日志输入通道LogSource.java

/** * 自定义Stream输出通道 * @author GitEgg */public interface LogSource {    String OUTPUT_OPERATION_LOG = "input_operation_log";    String OUTPUT_API_LOG = "input_api_log";    /**     * 操作日志自定义输出通道     * @return     */    @Output(OUTPUT_OPERATION_LOG)    MessageChannel outputOperationLog();    /**     * API日志自定义输出通道     * @return     */    @Output(OUTPUT_API_LOG)    MessageChannel outputApiLog();}

9、实现日志推送接口的调用, @Scheduled(fixedRate = 3000)是为了测试推送消息,每隔3秒执行一次定时任务,注意:要使定时任务执行,还需要在Application启动类添加@EnableScheduling注解。 ILogSendService.java

/** * @author GitEgg */public interface ILogSendService {    /**     * 发送操作日志消息     * @return     */    void sendOperationLog();    /**     * 发送api日志消息     * @return     */    void sendApiLog();}

LogSendImpl.java

/** * @author GitEgg */@EnableBinding(value = { LogSource.class })@Slf4j@Component@RequiredArgsConstructor(onConstructor_ = @Autowired)public class LogSendImpl implements ILogSendService {    private final LogSource logSource;    @Scheduled(fixedRate = 3000)    @Override    public void sendOperationLog() {        log.info("推送操作日志-------开始------");        logSource.outputOperationLog()                .send(MessageBuilder.withPayload(UUID.randomUUID().toString()).build());        log.info("推送操作日志-------结束------");    }    @Scheduled(fixedRate = 3000)    @Override    public void sendApiLog() {        log.info("推送API日志-------开始------");        logSource.outputApiLog()                .send(MessageBuilder.withPayload(UUID.randomUUID().toString()).build());        log.info("推送API日志-------结束------");    }}

10、实现日志消息接收接口

ILogReceiveService.java

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management0

LogReceiveImpl.java

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management1

10、启动微服务,可以看到日志打印推送和接收消息已经执行的情况

二、集成Kafka测试消息收发并测试消息中间件切换

&emsp;&emsp;使用Spring Cloud Stream的其中一项优势就是方便切换消息中间件又不需要改动代码,那么下面我们测试在Nacos的Spring Cloud Stream配置中同时添加Kafka配置,并且API日志继续使用RabbitMQ,操作日志使用Kafka,查看是否能够同时运行。这里先将配置测试放在前面方便对比,Kafka集群搭建放在后面说明。

1、Nacos添加Kafka配置,并且将operation_log的binder改为Kafka

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management2

2、登录Kafka服务器,切换到Kafka的bin目录下启动一个消费operation-log主题的消费者

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management3

3、启动微服务,查看RabbitMQ和Kafka的日志推送和接收是否能够正常运行

微服务后台日志显示能够正常推送和接收消息:

Kafka服务器显示收到了操作日志消息

三、Kafka集群搭建

1、环境准备:

&emsp;&emsp;首先准备好三台CentOS系统的主机,设置ip为:172.16.20.220、172.16.20.221、172.16.20.222。 &emsp;&emsp;Kafka会使用大量文件和网络socket,Linux默认配置的File descriptors(文件描述符)不能够满足Kafka高吞吐量的要求,所以这里需要调整(更多性能优化,请查看Kafka官方文档):

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management4

&emsp;&emsp;新建kafka的日志目录和zookeeper数据目录,因为这两项默认放在tmp目录,而tmp目录中内容会随重启而丢失,所以我们自定义以下目录:

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management5

2、zookeeper.properties配置

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management6

修改如下:

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management7

3、在各台服务器的zookeeper数据目录/data/zookeeper/data添加myid文件,写入服务broker.id属性值 在data文件夹中新建myid文件,myid文件的内容为1(一句话创建:echo 1 > myid)

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management8

4、kafka配置,进入config目录下,修改server.properties文件

docker run -d\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=123456\ --name rabbitmq\ -p 15672:15672\ -p 5672:5672\ -v `pwd`/bigdata:/var/lib/rabbitmq\ rabbitmq:3.9.13-management9

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq0

5、Kafka启动 kafka启动时先启动zookeeper,再启动kafka;关闭时相反,先关闭kafka,再关闭zookeeper。

zookeeper启动命令

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq1

后台运行启动命令:

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq2

或者

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq3

查看集群状态:

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq4

kafka启动命令

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq5

后台运行启动命令:

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq6

或者

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq7

创建topic,最新版本已经不需要使用zookeeper参数创建。

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq8

参数解释: 复制两份   --replication-factor 2 创建1个分区   --partitions 1 topic 名称   --topic test

查看已经存在的topic(三台设备都执行时可以看到)

[root@localhost ~]# docker psCONTAINER ID        IMAGE                             COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                 NAMESff1922cc6b73        rabbitmq:3.9.13-management        "docker-entrypoint.s…"   About a minute ago   Up About a minute               4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq9

启动生产者:

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management0

启动消费者:

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management1

添加参数 --from-beginning 从开始位置消费,不是从最新消息

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management2

测试:在生产者输入test,可以在消费者的两台服务器上看到同样的字符test,说明Kafka服务器集群已搭建成功。

四、完整的Nacos配置
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management3

源码地址: 

Gitee: https://gitee.com/wmz1930/GitEgg

GitHub: https://github.com/wmz1930/GitEgg

原文:https://juejin.cn/post/7103338440300691486


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Spring/4505.html