返回 登录
0

共享行业的分布式MQTT设计

阅读2964

背景
随着移动互联网慢慢进入后半场,越来越多的公司将注意力转移到物联网,希望通过早期布局来占领这个行业的制高点,比如目前流行的摩拜单车和OFO单车都是典型的物联网应用。物联网本身并不是什么新概念,随着大数据、AI等技术的发展,大家意识到传统的物联网通过一定改造,借助大数据以及AI技术可以获得很多额外的价值。这里主要介绍物联网的接入服务,物联网主流接入协议分为MQTT,CoaP,Http,XMPP等几种,本文主要是介绍MQTT协议的优缺点以及如何实现MQTT的分布式框架,至于各个协议之间的比较就不再这里详细介绍,大家可以百度相关资料去做详细了解。

MQTT协议主要特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制

MQTT主要应用场景
1.车联网
2.工业物联网
3.智能家居
4.视频直播弹幕
5.IM实时聊天 (一对一聊天,群组聊天)
6.推送服务,比如推送实时新闻
7.金融交易数据订阅推送

整体架构
单机版本的MQTT存在并发连接数上限以及处理能力的限制,主流的单机版本的MQTT服务包括ActiveMQ, RabbitMQ,Apollo,Mosquitto,分布式的MQTT服务包括知名的EMQ, VerneMQ都是采用Erlang实现的。

分布式版本的MQTT相对于单机版本最大的难点在于Session的管理,特别是持久化session,MQTT协议定义了两种Session,其中一种是transient Session,另外一种是Persistent Session,用户可以通过在发送连接协议包的时候设置clean session这个状态位来决定采用哪种session。另外一个难点就是集群的管理,这里设计的框架是每个broker都是对等,他们之间不存在什么主从关系,所以我们直接AKKA Cluster这个框架作为集群管理,每个broker都需要注册监听的时间包括MemberUp,MemberDown,MemberUnreachable,ClusterMemberState等事件,这样每个broker就很可以很好的感知其他节点的状态,对内部的session做相应的处理,broker和broker之间的消息通知采用Akka actor来实现。

图片描述

Broker内部服务框架
为了管理,以及设计方便,我们将内部服务抽象成为很多独立服务,这些服务包括:
1.Authentication and authorization service
a)该服务负责用户名,密码等认证方式的鉴权,以及每个client对于那些主题有权限进行读和写,后台数据全部保存在Mysql,通过redis做cache加速,当然也做in memory的cache加速,cache回收机制采用LRU策略
2.Session Manager
a)持久化session管理,包括session订阅什么主题,以及对应的persistent queue,该session需要在每个broker都同步一份,这样可以有效解决高可用性的问题,比如crash之后,不会受到影响
b)非持久化session管理,包括session订阅什么主题,以及对应的transient queue,该session只需要在连接机器上保持,不需要同步到其他的broker上,如果对应的client和broker失去连接之后,对应session信息就会被清除掉
3.Event Service
a)负责将连接,订阅等事件发送给每个broker,对于每个连接事件,我们都需要将该消息推送给event service,还有就是每个client的订阅主题,取消订阅主题的事件,目前event service的后端实现采用Kafka做的,当然也可以通过Akka本身提供的功能来做,考虑到需要持久化,所以采用了Kafka,后期我们减少对Kafka的依赖
4.Session State metadata service
a)负责持久化session metadata数据存储,该服务从Event Service订阅数据,然后决定哪些数据需要持久化到后端存储(采用Hbase做持久化存储),目前主要是存储持久化session相关的信息
5.Queue Service
a)管理以及分配queue,这里的queue分为两种,一种是transient queue,一种是persistent queue,transient queue是采用in memory的方式实现,persistent queue是采用Hbase实现。Transient queue是为transient session创建的,persistent queue是为persistent session创建。Persistent session的特点就是即使该session对应的连接断开了,我们也需维护该session,以及该session订阅的数据,以便下次这个client重新连接上来之后,自动恢复session的状态,还有下发没有处理完的订阅数据
6.Quota Service
a)管理包括并发连接数,上行带宽,下行带宽的限制
7.Metric Service
a)监控服务的并发连接数,并发消息数,当前流量,服务运行指标,包括CPU,memory,网络等相关指标

图片描述

MQTT Codec Stack结构

连接层采用Netty NIO框架,关于Netty NIO的详细设计,这里我们就不做介绍了。支持4种形式的接入方式,TCP,TLS,websocket over TLS,以及websocket,各个接入方式的codec层级关系可以参考下图。

图片描述

持久化Session

对于持久化session,需要将该session信息同步到每台机器,每台机器都有所有持久化session信息的全集,这样做的好处就是当某台broker无法工作了,连接在这个异常broker上的client不会丢失消息,每条publish的消息都是直接写入hbase的,当broker恢复,或者client连接到其他broker之后,可以继续从hbase获取数据,然后发送给订阅的client。

订阅消息处理流程
订阅消息会发往event service,每个broker都会订阅来自event service的数据,对于持久化session,每个broker都会创建对应session的订阅信息以及virtual queue,这个virtual queue分为client和server两部分,client端的virtual queue负责保证写入顺序,以及批量写入(提升效率),server这边的queue保证来自不同broker的消息的有序性.

图片描述

发送消息流程

C2 往C1订阅的主题发送一条数据,router会直接将数据写入C1对应的hbase queue,然后通知C1,告诉他有新的数据可以消费了,这个时候broker直接从hbase读取数据,然后发往C1

图片描述

如果Mqtt Broker 2出现crash了,比如这个进程挂了,或者Mqtt Broker 2所在的机器断电了,或者网络出现故障了,C1本来应该收到的数据并不会减少,由于Mqtt Broker 1会继续往Hbase写入数据,等C1重新连接之后,可以继续从Hbase消费数据

图片描述

Event service数据的Compaction

考虑持久化session相关的数据都是写入到kafka的,如果一个新的broker加入集群,首先就需要将持久化session的信息全部加载,如果加载都是从kafka主题的头部开始消费数据的话,可能会花费很久,为此我们需要将kafka的数据做compaction,这些compaction的数据写入到hbase,如何加载全量信息了,全量信息就是hbase数据的集合和备份checkpoint之后kafka数据集合merge结果就是最终的全量信息。

图片描述

非持久化Session

当非持久化session的client连接上来的时候,如果订阅主题,我们会直接在改client所在机器创建session以及session对应的queue
订阅消息流程:

发送消息流程:
当C2发送一条消息的时候,broker 1会把消息转发给broker 2, broker 2会先把消息写入到C1对应的in memory queue,然后发送一个有数据的event给C1,这个时候broker 2会从queue读取数据,然后发往给C1

图片描述

基于HBase的分布式消息队列

Hbase本身不提供queue这个功能,但是我们可以利用hbase特性来实现virtual queue的概念,通过设计好rowkey来保证消息的有序性,然后将数据的读取转化为scan操作,下图有4个client,我们为每个client分配一个unique的queue ID,然后每个queue的数据通过queue ID和单调递增的ID来组合成为一个unique的rowkey,为了保证写入的均匀性,我们需要合理设计unique ID的prefix来保证将这些rowkey均匀的分布到不同的region。

为了实现queue的功能,我们在Hbase上定义了一个新的coprocessor,这个coprocessor用来创建queue,管理queue的数据,以及删除queue,同时还可以修改queue的配额等等。下图是我们的一个事例,我们有4个client,每个client都有自己的queue,通过算法把这些queue均匀的分布到不同的region上使用定制region split算法。
定义queue name为reverse{clientId}_tenantId,这里的clientID是系统生成的,是64bit的long,我们为每个client生成一个ID,这个ID是单调递增的,加入我们预期region的数目为128个的话,那么我们就取reverse{clientId}的头8bit作为region分割的条件,这样我们就可以把不同的queue均匀分布到不同的region上了,然后对region做balance。

图片描述

保证写入消息的有序性

对于持久化消息队列,需要在每个broker上都建立一个virtual queue,该virtual queue对应hbase的真实queue,每次virtual queue的数据都是batch写入hbase,假设这个queue的名字为Q的话,我们会为每个写到hbase的消息分配一个unique的ID,该ID是Q_(ID),ID是一个单调递增的数字,采用64bit的long表示,每个batch写入到hbase的coprocessor之后,需要先获取该queue的lock,然后分配ID,然后将数据写入hbase,最后释放lock,这样下一个request就可以继续写入,这里lock的粒度是queue级别,就是每个queue都会有自己的一个lock,这样可以保证并发性。

读取queue的数据

我们会为每个queue保存该queue在Hbase的最小ID,以及最大ID,如果该queue的最小ID和最大ID由于cache失效,导致内存不存在的话,我们就通过hbase的scan操作,来获取最小的ID,以及最大ID,然后将数据保留到cache里面,这样可以加速下次查找,每次读取特定长度的数据,下次计算便于继续读取,读数据的时候并不需要获取锁,由于读数据只会来自一台机器的一个client,就是任何时刻只有一个client在读数据

删除queue的数据

这里的删除已经读取的数据,由于我们的数据都是有序的,所以删除的时候,只需要告诉queue需要删除多长的数据即可,然后我们根据最小ID,以及offset可以算出需要删除rowkey的ID,然后执行一个batch delete操作,这样就可以将数据删除了,删除数据也不会需要获取锁,由于删除请求只会来自一台机器的一个client,就是任何时刻只有一个client在删除数据

图片描述

Notes
同时由于Hbase目前并不存在官方的async的library来往hbase写入数据,或者读取数据,目前只有opentsdb提供一个版本,考虑我们是利用coprocessor增加了一个新的endpoint,但是opentsdb的async library并不支持coprocessor,为了我们需要扩展async的library,这样就可以async library的coprocessor库来处理数据。

优化
如何利用in memory compaction来优化hbase queue的性能指标,由于mqtt的消息写入hbase之后,基本马上就会被读取出来,然后发送给client,所以说mqtt的消息都是属于short lived的数据,如果这些数据都在in memory做compaction的话,那就意味我们不需要将这些数据写入HFile,只需要写WAL日志,这样可以极大的降低HDFS文件系统的IO,对于我们这种场景的话,Hbase的瓶颈就出在HDFS文件系统的读写上,目前in memory compaction已经在hbase 2.0上实现,不过没有正式release。

更多in memory compaction的资料可以参考:
Accordion: HBase Breathes with In-Memory Compaction
https://blogs.apache.org/hbase/entry/accordion-hbase-breathes-with-in
Internal design:
https://blogs.apache.org/hbase/entry/accordion-developer-view-of-in

更多queue插件

每种queue都有自己的优缺点,为此我们提供了多种queue可以供用户选择,额外提供redis以及kafka的queue,kafka的queue是一种很popular的方式,主要是用在大规模扇入场景,比如说100w个client都往同一个主题发送消息,如果采用in memory的queue或者hbase的queue,那么瓶颈就会出在订阅端(只有一个TCP连接来处理数据),如果采用kafka queue,可以将数据发往kafka的主题,然后调用kafka的client来消费数据,这样就可以完美解决大扇入的场景。

多租户架构

目前MQTT服务是一个分布式多租户的服务,一个IotHub上面会有很多租户的MQTT Broker,每个MQTT broker对应一个tenant,每个broker有自己的authentication service, session manager, Queue service,以及很多其他服务,包括unique Id generator,backend storage service,以及router服务,当一个client的通过TCP和我们的服务建立连接之后,首先我们会为该client创建一个session,这个session会检查该client是否合法,包括tenant名字,用户名,密码,如果所有的都合法的话,我们会把这个client的session添加到session manager,如果不是合法的,我们会直接把这个client的连接给断开。

MQTT采用TCP的方式和云端建立连接,我们通过用户名来区分这个client对应的是那个tenant,所以我们对用户名有严格的规定,用户名必须是{tenant Name}/{clientName},拿到用户名和密码之后,我们先算出该client对应的tenant name,然后获取该tenant对应broker实例,后去该broker的auth服务来认证用户名和密码组合。

图片描述

测试数据
Baidu IoT Hub vs EMQTT
MPS: message per seconds
消息payload大小: 1024 bytes
场景:一半pub和一半sub,每一个pub对应一个sub,也就是说通过唯一主题关联起来,这种场景是对MQTT协议最严格的考验,其他场景相对来说CPU消耗会少一些
测试Queue类型:In memory queue

Notes:
由于Pub和Sub是一一对应的,所这里的MPS是指PUB的QPS,所以实际QPS 是这个数字的两倍。

图片描述

可用MPS(无丢包,latency小于0.5s):
图片描述

图片描述

结论:同等连接数下,IoT Hub的可用最大吞吐量在EMQTT的1~2倍之间。

部署broker机器配置信息:
vendor_id : GenuineIntel
cpu family : 6
model : 45
model name : Intel(R) Xeon(R) CPU E5-2620 0 @ 2.00GHz
core: : 12

Memory:
MemTotal: 132137288 kB

更多关于百度IoT Hub使用信息可以访问官网:
https://cloud.baidu.com/product/iot.html

评论