Spring Cloud Stream-核心概念
主要概念Spring Cloud Stream提供了许多抽象和原语,简化了消息驱动的微服务应用程序的编写。本节概述了以下内容:Spring Cloud Stream的应用程序模型Binder抽象持久的发布 - 订阅支持消费者群体支持分区支持可插拔的Binder API1.应用模型应用程序通过 inputs 或者 ou...
主要概念
Spring Cloud Stream提供了许多抽象和原语,简化了消息驱动的微服务应用程序的编写。本节概述了以下内容:
-
Spring Cloud Stream的应用程序模型
-
Binder抽象
-
持久的发布 - 订阅支持
-
消费者群体支持
-
分区支持
-
可插拔的Binder API
1.应用模型
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中Binder 交互,通过我们配置来绑定,而 Spring Cloud Stream 的 Binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
2.绑定器
Spring Cloud Stream为Kafka和Rabbit MQ提供Binder实现。Spring Cloud Stream还包括一个TestSupportBinder,它保留了一个未经修改的通道,以便测试可以直接与通道交互,并可靠地断言收到的内容。您可以使用可扩展的API编写自己的Binder。
Spring Cloud Stream使用Spring Boot进行配置,Binder抽象使Spring Cloud Stream应用程序可以灵活地连接到中间件。例如,部署者可以在运行时动态地选择通道连接的目的地(例如,Kafka主题或RabbitMQ交换)。可以通过外部配置属性以及Spring Boot支持的任何形式(包括应用程序参数,环境变量和/ application.yml
或application.properties
文件)来提供此类配置。在“ 引入Spring云流”部分的接收器示例中,将application属性设置spring.cloud.stream.bindings.input.destination
为raw-sensor-data
将使其从raw-sensor-data
Kafka主题或绑定到raw-sensor-data
RabbitMQ交换的队列中读取。
Spring Cloud Stream会自动检测并使用类路径中找到的活页夹。您可以使用相同的代码轻松使用不同类型的中间件:只需在构建时包含不同的绑定器。对于更复杂的用例,您还可以将多个绑定器与应用程序打包在一起,让它选择绑定器,甚至是在运行时是否为不同的通道使用不同的绑定器。
3. 发布-订阅
如下图是经典的Spring Cloud Stream的 发布-订阅 模型,生产者 生产消息发布在shared topic(共享主题)上,然后 消费者 通过订阅这个topic来获取消息
其中topic对应于Spring Cloud Stream中的destinations(Kafka 的topic,RabbitMQ的 exchanges)
4.消费组
尽管发布-订阅 模型通过共享的topic连接应用变得很容易,但是通过创建特定应用的多个实例的来扩展服务的能力同样重要,但是如果这些实例都去消费这条数据,那么很可能会出现重复消费的问题,我们只需要同一应用中只有一个实例消费该消息,这时我们可以通过消费组来解决这种应用场景, 当一个应用程序不同实例放置在一个具有竞争关系的消费组中,组里面的实例中只有一个能够消费消息
设置消费组的配置为spring.cloud.stream.bindings.<channelName>.group
下面举一个例子:
下图中,通过网络传递过来的消息通过主题,按照分组名进行传递到消费者组中
此时可以通过spring.cloud.stream.bindings.input.group=Group-A
或spring.cloud.stream.bindings.input.group=Group-B
进行指定消费组
- 消费者类型
支持有两种消费者类型:
Message-driven (消息驱动型,有时简称为异步)
Polled (轮询型,有时简称为 同步)
在Spring Cloud 2.0版本前只支持 Message-driven这种异步类型的消费者,消息一旦可用就会传递,并且有一个线程可以处理它;当你想控制消息的处理速度时,可能需要用到同步消费者类型。
- 消息持久化
一般来说所有拥有订阅主题的消费组都是持久化的,除了匿名消费组。 Binder的实现确保了所有订阅关系的消费订阅是持久的,一个消费组中至少有一个订阅了主题,那么被订阅主题的消息就会进入这个组中,无论组内是否停止。
注意: 匿名订阅本身是非持久化的,但是有一些Binder的实现(比如RabbitMQ)则可以创建非持久化的组订阅
通常情况下,当有一个应用绑定到目的地的时候,最好指定消费消费组。扩展Spring Cloud Stream应用程序时,必须为每个输入绑定指定一个使用者组。这样做可以防止应用程序的实例接收重复的消息(除非需要这种行为,这是不寻常的)。
5.消息分区
Spring Cloud Stream支持在给定应用程序的多个实例之间对数据进行分区。在分区场景中,物理通信介质(例如,代理主题)被视为被构造为多个分区。一个或多个生产者应用程序实例将数据发送到多个消费者应用程序实例,并确保由共同特征标识的数据由同一个消费者实例处理。
Spring Cloud Stream提供了一种通用抽象,用于以统一的方式实现分区处理用例。因此,无论代理本身是否被自然分区(例如,Kafka)(例如,RabbitMQ),都可以使用分区。
分区是有状态处理中的一个关键概念,由于性能或一致性原因,确保所有相关数据一起处理是至关重要的。例如,在时间窗口平均值计算示例中,重要的是来自任何给定传感器的所有测量值都由同一应用程序实例处理。
要设置分区处理方案,您必须配置数据生成和数据消耗两端。
更多推荐
所有评论(0)