电子商务网站建设规划报告书,wordpress模板导航,做头像的网站有哪些,优化关键词排名推广引言
在当前的微服务架构下#xff0c;使用消息队列#xff08;MQ#xff09;技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性#xff0c;我们需要避免对单一开源技术的依赖。
市面上有多种消息队列技术#xff0c;如 Kafka、RocketMQ、Rabbit…引言
在当前的微服务架构下使用消息队列MQ技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性我们需要避免对单一开源技术的依赖。
市面上有多种消息队列技术如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系中实现这些MQ组件的无缝切换以减少代码修改需求。
Spring Cloud Stream 通过其与主流消息中间件的灵活集成实现了通过仅修改配置文件的方式来切换不同的MQ实现从而提高了系统的适应性和可维护性。 什么是 Spring Cloud Stream
Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。
基于 Spring Boot 构建用于创建独立的生产级 Spring 应用程序并使用 Spring Integration 提供与消息代理的连接。它提供了来自多个供应商的中间件的固定配置引入了持久发布-订阅语义、消费者组和分区的概念。
简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并。 图一 主要概念
1. application model应用模型 图二.Spring Cloud Stream 应用程序 由中间件提供的 Binder 来处理绑定。 应用程序通过绑定这个 Binder 与其建立联系发送消息时应用程序通过 outputs 通道将消息传递给 BinderBinder 再把消息给消息中间件。接收消息时消息中间件将消息传递给 BinderBinder 再把消息通过 inputs 通道传递给应用程序。
比如 Kafka Binder 依赖如下图 图三 spring cloud stream kafka依赖 2. The Binder AbstractionBinder抽象
Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。
Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现。
Binder 抽象也是该框架的扩展点之一我们可以在 Spring Cloud Stream 之上实现自定义 Binder。 3. Programming Model编程模型
核心概念 Destination Binders目标绑定器负责提供与外部消息传递系统集成的组件。 Bindings绑定外部消息系统和应用程序之间的桥梁提供消息的生产者和消费者由目标绑定器创建。 Message消息生产者和消费者用于与目标绑定器以及通过外部消息系统与其他应用程序通信的规范数据结构。 图四 环境搭建
本文环境 Java17 Spring Boot3.0.2 Spring Cloud2022.0.2 Spring Cloud Alibaba2022.0.0.0 maven依赖配置
pom.xml依赖如下
消息驱动jar用哪个mq引入哪个即可。dependenciesdependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-kafka/artifactId/dependency
/dependenciesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion${spring-boot.version}/versiontypepom/typescopeimport/scope/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-alibaba-dependencies/artifactIdversion${spring-cloud-alibaba.version}/versiontypepom/typescopeimport/scope/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversion${spring-cloud.version}/versiontypepom/typescopeimport/scope/dependency/dependencies
/dependencyManagement配置文件
application.yml RocketMq 配置信息
spring:cloud:stream:stream:rocketmq:binder:name-server: 127.0.0.1:9876;127.0.0.1:9877function:# 组装和绑定definition: myTopicCbinders:default:type: rocketmqbindings:## 生产者 新版本固定格式 函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式 函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topicapplication.yml Kafka 配置信息
spring:cloud:stream:stream:kafka:binder:brokers: 127.0.0.1:9092function:# 组装和绑定definition: myTopicCbinders:default:type: kafkabindings:## 生产者 新版本固定格式 函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式 函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic消息生产者
创建一个简单的消息生产者
RestController
Slf4j
public class ProducerStream {Autowiredprivate StreamBridge streamBridge;GetMapping(/test-stream)public String testStream() {streamBridge.send(demoChannel-out-0,MessageBuilder.withPayload(消息体).build());return success;}
}消息消费者
创建一个消息消费者来接收消息
Slf4j
Configuration
public class TestStreamConsumer {Beanpublic ConsumerString demoChannel() {return message - {log.info(demoChannel接到消息{}, message);};}
}假如需要从 Kafka 替换成 RocketMq 只需要修改pom文件和配置文件即可。 在之前的 Spring Cloud Stream 版本中是采用注解的方式来实现绑定在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想简化了应用程序配置。 具体可见官方文档https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names Spring Cloud Stream 发送消息流程 图五 spring cloud stream消息流程图 消息模型
通过图三可以看到 Sping Cloud Stream 的依赖关系。 Sping Cloud Stream - Spring Integration - Spring Messaging 可以看出来 Sping Cloud Stream 是基于 Spring Integration 做了一层封装是依赖于 Spring Integration 这个组件的而 Spring Integration 则依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。 Spring Integration 是对 Spring Messaging 的扩展设计目标是系统集成因此内部提供了大量的集成化端点方便应用程序直接使用。各个异构系统相互集成时Spring Integration 通过通道之间的消息传递让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。Spring Messaging 是 Spring 框架中的一个底层模块用于提供统一的消息编程模型。 消息 Message 接口定义
public interface MessageT {//消息体T getPayload();//消息头MessageHeaders getHeaders();
}消息通道 MessageChannel 接口定义
FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT -1;//发送消息无限期阻塞default boolean send(Message? message) {return send(message, INDEFINITE_TIMEOUT);}//发送消息阻塞直到到达指定超时时间boolean send(Message? message, long timeout);
}消息通道 MessageChannel 接收消息调用send()方法将消息发送至该消息通道。
消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。 Spring message 把通道抽象成两种基本表现形式 支持轮询的 PollableChannel 实现发布-订阅模式的 SubscribableChannel
这两个通道都继承自具有消息发送功能的 MessageChannel。
public interface SubscribableChannel extends MessageChannel {//通过注册回调函数MessageHandler来实现事件响应//注册消息处理器boolean subscribe(MessageHandler handler);//取消注册消息处理器boolean unsubscribe(MessageHandler handler);
}public interface PollableChannel extends MessageChannel {//通过轮询操作主动获取消息//从通道中接收消息NullableMessage? receive();//指定超时时间从通道中接收消息NullableMessage? receive(long timeout);
}MessageHandler接口定义
FunctionalInterface
public interface MessageHandler {//处理消息方法void handleMessage(Message? message) throws MessagingException;
}再回到图五流程图中我们最终可以看到 Kafka 和 RocketMQ 通过继承 AbstractMessageHandler 抽象类 AbstractMessageHandler 抽象类是实现了 MessageHandler 接口来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder 代码中来实现。 结论
回到我们的主题Spring Cloud Stream 如何屏蔽不同 MQ 带来的差异性 统一的编程模型发送和接收代码一致开发者专注于业务逻辑即可。不用管底层消息中间件的实现。 Binder 抽象封装与消息队列的交互逻辑每种队列有自己的 Binder 实现。 自动配置和约定优于配置采用约定大于配置的思想极少的改动配置文件实现消息队列的切换而代码不用变动。 高级特性的抽象如分区、消息分组、持久性订阅等高级特性Spring Cloud Stream 提供了抽象层由不同的消息队列去实现。 参考资料 官方文档Spring Cloud Stream Reference Guide 《Spring核心技术和案例实战》