目录

引言

生产者消费者模型作用

消息队列核心概念

Broker Server 内部关键概念

Broker Server 核心 API

交换机(Exchange)类型

关于持久化

关于网络通信

总结


引言

问题:

  • 什么是消息队列(Message Queue / MQ)?

回答:

  • 阻塞队列(Blocking Queue)-> 生产者消费者模型(是在一个进程内部进行的)
  • 消息队列就是将阻塞队列这样的数据结构,单独提取成了一个程序进行独立部署 -> 生产者消费者模型(进程和进程之间 / 服务和服务之间)

注意:

  • 分布式系统 整个服务器程序不是一个单一的程序,而是由一组服务器构成的 集群

生产者消费者模型作用

1、实现了发送方和接收方之间的 解耦

  • 如上图所示,服务器A 调用服务器B
  • A 将请求转发给 B 处理,B 处理完将结果反回给 A,即 A 和 B 之间的耦合是比较大的
  • 如果 A 要调用 B ,则 A 务必要知道 B 的存在
  • 如果 B 挂了,则很容易引起 A 的 bug

  • 如果要是再加一个服务器C,此时也需要对服务器A 进行修改
  • 因此就需要针对 A 重新修改代码、重新测试、重新发布、重新部署 等,十分麻烦! 

  • 引入消息队列后,A 将请求发给消息队列,B 再从消息队列中获取到请求
  • 此时 A 和 B 之间的耦合就降低了很多
  • A 并不知道 B,A 只知道队列,即 A 中的代码没有任何一行与 B 相关
  • B 也不知道 A,B 只知道队列,即 B 中的代码没有任何一行与 A 相关
  • 如果 B 挂了,对 A 没有任何影响,因为只要队列在,A 仍可以继续给队列插入元素,如果队列满了,直接阻塞就行了
  • 如果 A 挂了,对 B 没有任何影响,因为只要队列在,B 仍可以从队列中取元素,如果队列空了,直接阻塞就行了
  • 同时当我们像要新增一个服务器C 来作为消费者时,对于 A 来说是无感知的

2、可以做到 削峰填谷,保证系统的稳定性

  • 我们进行服务器开发,也和上述这个模型是非常相似的
  • 上游就是用户发送的请求,下游就是一些执行具体业务的服务器
  • 用户发多少请求是不可控的

具体理解:

  • 比如 A 为入口服务器,A 调用 B 完成一些具体的任务
  • 如果 A 与 B 直接通信,且 A 突然收到一组用户请求的峰值,此时 B 将随之感受到峰值
  • 引入消息队列后,A 将请求发给队列,B 从队列中获取请求
  • 虽然 A 收到的请求很多,队列收到的请求也不少,但是 B 仍可按照原有节奏来取请求,不至于说一下就收到太多的并发量

  • 市面上一些比较知名的 mq:RabbitMQ、Kafka、RocketMQ、ActiveMQ

注意:

  • 这些 mq 大同小异

消息队列核心概念

一个生产者 + 一个消费者


多个生产者 + 多个消费者

  • 生产者(Producer):发布消息的客户端应用程序
  • 消费者(Consumer):订阅消息的客户端应用程序,用于处理生产者的消息
  • 中间人(Broker):消费者拿生产者的消息时,需经过中间人
  • 发布(Publish):生产者向中间人投递消息的过程
  • 订阅(Subscribe):消费者从中间人获取消息的前提为 先订阅消息
  • 消费(Consume):消费者从中间人这里取数据的操作

注意:

  • 图上画均为服务器!

Broker Server 内部关键概念


虚拟主机(Virtual Host)

  • 类似于 MySQL 中的 database,算是一个 "逻辑" 上的数据集合
  • 实际开发中,一个 Broker Server 上可组织多种不同类别的数据,即可能同时管理多组 业务线上的数据
  • 此时便可以使用 Virtual Host 做出逻辑上的区分

交换机(Exchange)

  • 实际上,当生产者将消息投递给 Broker Server 时,是先将消息交给了 Broker Server 上的某个交换机,再由交换机把消息转发给对应的队列

队列(Queue)

  • 真正用来存储处理消息的实体,后续消费者也是从对应的队列中取数据
  • 一个大的消息队列中,可以有很多具体的小的队列

绑定(Binding)

  • 将交换机和队列之间建立起关联关系
  • 可以把交换机和队列视为是 类似于 数据库 中的 多对多 这样的关系
  • 一个交换机,可以对应到多个队列
  • 一个队列,也可以对应多个交换机
  • 在数据库中,标识多对多关系,会使用一个中间表 / 关联表
  • 而在 mq 中,也是有一个这样的中间表的,所谓的 绑定 其实就是中间表中的一项

消息(Message)

  • 具体来说,可以认为 服务器A 给 B 发送的请求(通过 mq 转发),就是一个消息
  • 服务器B 给 A 返回的响应(通过 mq 转发),也是一个消息
  • 一个消息可视为是一个 字符串(二进制数据)
  • 消息中具体包含啥样的数据,都是程序员自定义的

  • RabbitMQ 就是按照上述概念来组织的(基于 AMQP 协议)

Broker Server 核心 API

  1. 创建队列(queueDeclare)
  2. 销毁队列(queueDelete)
  3. 创建交换机(exchangeDeclare)
  4. 销毁交换机(exchangeDelete)
  5. 创建绑定(queueBind)
  6. 解除绑定(queueUnbind)
  7. 发布消息(basicPublish)
  8. 订阅消息(basicConsume)
  9. 确认消息(basicAck)

问题一:

  • 创建 为啥不使用 Create 这样的术语,而是使用 Declare ?

回答:

  • Create 仅表示单纯的创建
  • Declare 起到的效果为 不存在则创建,存在就啥都不做了

问题二:

  • 我们为啥不要搞一个 api,叫做 "消费消息" 呢?
  • 让消费者通过该 api 从服务器上取走消息

回答:

  • 在当前项目中,并不打算搞一个 "消费消息" 的 api
  • mq 与 消费者之间有两种工作模式:
  • Push(推):Broker 将收到的数据主动发送给订阅的消费者
  • Pull(拉):消费者主动调用 Broker 的 api 获取数据
  • 而 RabbitMQ 仅支持 Push 的方式(Kafka 就能支持 Pull)

注意点一:

  • 确认消息(basicAck) 所起到的效果,是可以让消费者显式的告诉 Broker Server,这个消息我已经处理完毕了
  • 用于提高整个系统的可靠性,以保证消息处理没有遗漏


注意点二:

  • 消息应答模式有两种
  1. 自动应答:消费者将该消息取走了,便算作应答(相当于没应答)
  2. 手动应答:basicAck 方法属于手动应答(消费者需要主动调用这个 api 来进行应答)

注意点三:

  • 对于 RabbitMQ 来说,除了提供肯定的确认,还提供了否定的确认
  • 但此处我们主要实现肯定确认,就不实现否定确认了

注意点三:

  • 此处的项目是以 RabbitMQ 作为蓝本的
  • 即上述 API 名称以及用法均参考的 RabbitMQ

交换机(Exchange)类型

  • 交换机在转发消息时,是有着一套转发规则的!
  • 此处提供了几种不同的 交换机类型(Exchange Type)来描述这里的不同的转发规则
  • RabbitMQ 主要实现了 四种交换机类型(也是 AMQP 协议定义的)

直接交换机(Direct )

  • 生产者发送消息时,会先指定一个 目标队列 的名字
  • 交换机收到之后,就看看绑定的队列里,有没有能够匹配的队列
  • 如果有,则将消息塞进对应的队列中转发过去
  • 如果没有,则直接丢弃掉该消息

扇出交换机(Fanout )


主题交换机(Topic )

  • 有两个关键概念
  1. bindingKey:把队列和交换机绑定的时候,指定一个单词(像是个暗号一样)
  2. routingKey:生产者发送消息的时候,也指定一个单词
  • 如果当前 routingKey 和 bindingKey 能够对上暗号了
  • 此时就可把这个消息转发到对应的队列中了

  • 咱们项目仅实现上述这三种交换机类型

关于持久化

  • 虚拟主机、交换机、队列、绑定、消息等,这些概念对应的数据都需要让 Broker Server 组织管理并存储起来
  • 此时内存和硬盘上都会各自存储一份,以内存为主,硬盘为辅

在内存中存储的原因:

  • 对于 mq 来说,能够高效的转发处理数据,是非常关键的指标
  • 因此使用内存来组织上述数据得到的效率,就比放硬盘中要高很多!

在硬盘中存储的原因:

  • 为了防止内存中的数据随着 进程重启 或 主机重启 而丢失

注意:

  • 硬盘上是能持久存储,但这个持久是相对于 内存 的
  • 对于一个硬盘来说存储消息的寿命,一般为 几年到十几年(一直不通电的情况下)

关于网络通信

  • 其他的服务器(生产者 / 消费者)通过网络与 Broker Server 进行交互
  • 此处设定使用 TCP + 自定义的应用层协议 实现生产者 / 消费者和 Broker Server 之间的交互工作
  • 自定义的应用层协议 要做的主要工作就是让客户端可以通过网络调用 Broker Server 提供的编程接口
  • 因此,在客户端这边也需要提供对应的上述这些方法

  • 如上图所示,服务器版本的方法为真正干实事的,即 将管理数据进行调整
  • 而客户端版本的方法,则只是发送请求 / 接收响应的

具体理解:

  • 此处客户端调用了一个本地方法,结果该方法的背后,又给服务器发来一系列消息,由服务器完成了一系列工作
  • 站在调用者的角度,只是看到了这个功能已经完成了,却不知道这背后的细节
  • 虽然调用的是一个本地方法,但实际上好像调用另一个远端服务器的方法一样
  • 此处可视为是编写客户端服务器程序,通信过程的一种设计思想,即 远程过程调用(RPC)

  • 客户端除了提供下方这 9 个和服务器对应的方法外

  • 客户端还需再提供 4个方法来支撑其他工作
  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel

注意点一:

  • 一个 Connection 对象,就代表一个 TCP 连接

注意点二:

  • Channel ——> 通道/信道
  • 一个 Connection 里面可以包含多个 Channel
  • 每个 Channel 上面传输的数据均 互不相干

问题:

  • 为什么有了 Connection,还要搞一个 Channel ?

回答:

  • TCP 建立 / 断开一个连接的成本,其实还是比较高的
  • 因此,很多时候并不希望频繁的建立 / 断开 TCP 连接
  • 此处的 Channel 仅为一个逻辑上的概念,比起 TCP 连接的建立和断开 要轻量得多
  • Connection 和 Channel 之间的关系就如图网线一样

总结

  • 细化一下具体要做哪些工作?
  1. 需实现 生产者、Broker Server、消费者 这三个部分
  2. 针对生产者和消费者来说,主要编写的是 客户端和服务器 的网络通信部分,给客户端提供一组 api 让客户端的业务代码来调用,从而通过网络通信的方式远程调用 Broker Server 上的方法
  3. 实现 Broker Server 以及 Broker Server 内部的一些基本概念和核心 api【重点】
  4. 上述这些关键数据,如何在硬盘中存储?以啥格式存储?是存储在数据库还是文件中?后续服务器重启了,如何读取上述数据,并将内存中的内容给恢复回来呢?【持久化】

注意:

  • 生产者 的数据从哪来?消费者取到数据之后要干啥?
  • 生产者与消费者具体的业务逻辑都是通用的,无需太多关心

  • 上述工作的最终目标,就是实现一个 分布式系统下 的生产者消费者模型
  • 但是此处我们的 Broker Server 并不支持分布式部署(集群模式)
  • 我们此处实现的仅为一个能够给多个生产者消费者提供服务的单机 Broker Server
  • 但是专业的 mq ,如 RabbitMQ、kafka 这些均支持集群模式,可用性更高、可处理更高的并发、数据能够相互备份
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐