Python实战案例:elasticsearch与数据库mysql的同步(下)


五、Python实现kafka生产者和消费者的逻辑

使用python操作kafka目前比较常用的库是kafka-python库。

Kafka-python库的安装可以使用下述指令。

pip3 install kafka-python

安装结束后,在kafka的生产者使用KafkaProducer类来实现,实例化时可以传入参数bootstrap_servers,也就是kafka服务器的地址。实例化KafkaProducer之后就可以send发送信息。代码如下。

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="localhost:9092")
msg = "Hello World".encode('utf-8')
producer.send('test', msg)

上面代码中这里我们使用producer发送了一个msg信息,信息中就是“Hello World”信息,现在结合前面的bin-log读取的信息,就完成了kafka生产者结合数据库日志的信息发送。代码如下。

from kafka import KafkaProducer
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent
)
MYSQL_SETTINGS={
    "host":"localhost",
    "user":"root",
    "password":"admin"
}
import json
import sys
producer=KafkaProducer(bootstrap_servers=["localhost:9092"])
stream=BinLogStreamReader(connection_settings=MYSQL_SETTINGS,server_id=3,blocking=True,only_schemas=["booksme"],only_events=[DeleteRowsEvent,WriteRowsEvent,UpdateRowsEvent])
for binlogevent in stream:
    for row in binlogevent.rows:
        event={"schema":binlogevent.schema,"table":binlogevent.table}
        if isinstance(binlogevent,DeleteRowsEvent):
            event["action"]="delete"
            event["data"]=row["values"]
        elif isinstance(binlogevent,WriteRowsEvent):
            event["action"]="insert"
            event["data"]=row["values"]
        elif isinstance(binlogevent,UpdateRowsEvent):
            event["action"]="update"
            event["data"]=row["values"]
        json_response=json.dumps(event,ensure_ascii=False).encode()
        print(json_response)
        producer.send("message",json_response)
        sys.stdout.flush()

生产者发送信息后,消费者进行信息的消费。kafka模块是通过KafkaConsumer来完成该功能的。实例化KafkaConsumer类,对producer发送过来的键名做为KafkaConsumer的第一个参数,第二个参数就是bootstrap_server的地址,也是本台电脑。当实例化KafkaConsumer后,数据的信息就存储在实例化的变量中。因为数据的变化不可能是一行,有可能是多行,所以数据的信息也是多个数据的集合,需要遍历,然后调用每个数据的value属性输出结果信息,代码如下。

from kafka import KafkaConsumer
consumer = KafkaConsumer(“message”, bootstrap_servers=['localhost:9092'])
For mess in consumer:
    print(mess.value)

现在运行生产者producer端的程序结果如下图所示。

现在运行消费者consumer端的程序结果如下图所示。

发现信息由producer发送后,consumer订阅的信息在编码上存在问题,可以在consumer端对信息进行解码。consumer消费端代码修改如下。

from kafka import KafkaConsumer
consumer=KafkaConsumer("message",bootstrap_servers=["localhost:9092"])
for mess in consumer:
    print(mess.value.decode("utf8"))

代码中把最终的输出结果decode解码成utf8编码。

最终修改后重启生产者端和消费者端程序后,消费者端输出结果如下。

从图中可以看出结果中关于《聊斋》这本书的一系列操作都被记录了下来,因为这是日志存在的。这里,《聊斋》这本书也被删除了添加,添加了删除来不断验证信息的实时传输。

六、elasticsearch获取数据库中的记录

前面已经完成了kafka实时生产和消费数据库日志中的变化信息,现在只要把这个实时的信息写入到elasticsearch中就完成了mysql与elasticsearch的同步。

现在,首先需要elasticsearch中先预先获取数据库mysql中的所有数据,然后再根据mysql中的数据变化,elasticsearch中的数据也要随时进行变化。

这里安装pymysql模块,利用pymysql模块的connect连接mysql数据库,然后获取cursor游标的位置,执行获取全部记录的sql语句,利用pymysql封装的fetchall方法来获取cursor执行sql语句后的结果集。pymysql获取数据的代码如下。

from elasticsearch import Elasticsearch
def get_data():
    conn=pymysql.connect(host="localhost",port=3306,user="root",password="admin",database="booksme")
    cursor=conn.cursor()
    sql="select * from book"
    cursor.execute(sql)
    results=cursor.fetchall()
    conn.close()
    return results
if __name__=="__main__":
    print(get_data())

上述代码的运行结果如下图所示。

接下来,把数据存储到elasticsearch中,python完成对elasticsearch的操作需要安装elasticsearch模块,命令如下 。

pip3 install elasticsearch

elasticsearch模块安装成功后,可以直接实例化elasticsearch,实例化后,调用其变量的index方法完成elasticsearch的数据添加。在Elasticsearch中存储数据的行为就叫做索引(indexing),在index方法中传入参数index索引名称,同时还要指定文档归属于一种类型(type)。这种类型也可以自己定义。body参数中传入具体存储的数据。这样,就可以调用前面pymysql读取数据方法,将其利用实例化的Elasticsearch类的index方法加入到elasticsearch中。代码如下。

import pymysql
from elasticsearch import Elasticsearch
def get_data():
    conn=pymysql.connect(host="localhost",port=3306,user="root",password="admin",database="booksme")
    cursor=conn.cursor()
    sql="select * from book"
    cursor.execute(sql)
    results=cursor.fetchall()
    conn.close()
    return results

def create_es_data():
    es=Elasticsearch()
    try:
        results=get_data()
        for row in results:
            print(row)
            message={
                "name":row[0],
                "price":row[1]
            }
            es.index(index="bookstores",doc_type="test-type",body=message)
    except Exception as e:
        print("Error:"+str(e))
if __name__=="__main__":
    create_es_data()

上述代码中es.index建立elasticsearch数据时index参数传入"bookstores",doc_type传入自定义的“test-type”,body传入了读取的数据库数据信息字典。

程序执行后,就把数据写入到了elasticsearch中,可以通过Chrome浏览器安装elasticsearch-header扩展程序,elasticsearch-header可以从网络上下载,下载后,安装的过程第一步从Chrome浏览器中找到“扩展程序”。如下图所示。

第二步 点击出现的“加载已解压的程序”,如下图所示。

加载后,扩展程序中就会有”elasticsearch-header”,具体结果如下图所示。

七、Elasticsearch增删改的封装

对于一个Elastricsearch的索引文档来说,实现与数据库mysql同步也就是完成数据库的增删改影响了Elastricsearch的增删改。Elasticsearch作为全文检索,是指计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。

当数据库mysql中的数据添加一条,相当于也会在Elastricsearch索引数据中添加一条数据。通过前面对Elastricsearch插入索引数据时,实例化Elastricsearch类,调用index方法就可以插入一条数据,具体格式如下。

es.index(index="area_index",doc_type="test_type",id=0,body={"name":"python","addr":"深圳"})

这里的index就是索引名字,doc_type指明Elasticsearch文档类型,id的指明可以唯一标识一条数据。在这个索引名字中,还可以继续添加新的id值的指向另外一条数据。代码如下。

es.index(index="area_index",doc_type="test_type",id=1,body={"name":"python","addr":"深圳"})

如果对建立的数据进行删除,可以指定index,type和id使用delete方法进行删除单条数据,代码如下:

es.delete(index='area_index', doc_type='test_type', id=1)

也可以直接使用index的索引名字,直接删除index索引下的所有数据。代码语句如下。

es.indices.delete(index='news', ignore=[400, 404])

对elasticsearch的数据进行更新需要指定index,type,id参数去更新指定的单条数据,update方法可以完成这样的更新操作。代码如下。

es.update(index="area_index",doc_type="test_type",id=1,body={"doc":{"name":"python1","addr":"深圳1"}})

增删改是对elasticsearch中的数据进行变动的操作,现在可以把这几种操作进行面向对象的封装,在kafka客户端进行消费时,直接调用封装类的具体方法,把实际的Elasticsearch的实现进行隐藏。这样,实现elasticsearch增删改封装的类具体代码如下。

from elasticsearch import Elasticsearch
class MyElasticSearch():
    def __init__(self,index_name,index_type):
        self.es=Elasticsearch();
        self.index_name=index_name
        self.index_type=index_type
    def insert_one(self,doc):
        self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
    def insert_array(self,docs):
        for doc in docs:
            self.es.index(index=self.index_name,doc_type=self.index_type,body=doc)
    def update_one(self,doc,uid):
        self.es.update(index=self.index_name,doc_type=self.index_type,id=uid,body=doc)
    def delete_one(self,uid):
        self.es.delete(index=self.index_name,doc_type=self.index_type,id=uid)

从代码上看,定义了一个类MyElasticSearch类,该类中有init初始化方法,实例化该类时指明具体的索引名字和索引类型,这也是因为elasticsearch的增删改都是围绕着索引名称index和类型doctype来进行的。接下来对es操作的index,update,delete等方法进行封装,实现了insertone插入一条Elasticsearch数据的方法,updateone更新一条Elasticsearch数据的方法,deleteone删除一条Elasticsearch数据的方法,如果需要插入多条数据就可以使用insert_array方法来实现,把每条数据遍历,分别调用index方法进行数据的添加。

八、mysql数据库同步到elasticsearch中

把Elasticsearch的增删改操作封装到新建类MyElasticSearch中后,就可以在Kafka消费者代码添加对elasticsearch全文检索的操作了。把KafkaConsumer对象的实例化后的结果进行遍历,对遍历的每一个结果value数据值进行获取并解码,继而转换成为json数据,json数据中的“action”键对应了KafkaProducer类对象生成数据时数据库发生的增删改信号,根据不同的信号值,“insert”信号对应的就是elasticsearch中的添加操作,“update”信号对应的就是elasticsearch中的更新操作,“delete”信号对应的就是elsticsearch中的删除操作。具体代码如下。

from kafka import KafkaConsumer
from elasticsearch_class import MyElasticSearch
import json
consumer=KafkaConsumer("message",bootstrap_servers=["localhost:9092"])
es=MyElasticSearch("bookstores","test-type")
for mess in consumer:
    mess_str=mess.value.decode()
    mess_str=json.loads(mess_str)
    print(type(mess_str))
    flag=mess_str["action"]
    if flag=="insert":
        print(mess_str["data"])
        es.insert_one(mess_str["data"])
    elif flag=="delete":
        es.delete_one(mess_str["data"])
    elif flag=="update":
        es.update_one(mess_str["data"])

这样,KafkaProducer生产者端对应的是binlog日志的变动监控,当有数据发生变化时,KafkaConsumer消费者端接收日志的变化数据,根据信号的不同,调用封装好的elasticsearch全文检索中的增删改对应的操作。

调试程序可以在确保服务器启动的情况下,也就是zookeeper服务器和kafka服务器均在启动状态,启动KafkaProducer代码程序,再启动KafkaConsumer代码程序。

然后在数据库中执行一句insert语句的操作。如下图示。

由图可知,数据库端发生了插入数据的操作,在KafkaProducer端产生数据如下图所示。

图中的数据没有被正确解码,仍然是unicode数据,这是因为生产者这一端不需要进行解码,只是查看了一下生产者端产生的json数据格式。在消费者这一端最终会被正确解码。在消费者这一端最终产生结果如下。

由图中可以看出,数据库添加的数据被传递到了Kafka消费者这一端,kafka消费者也会把数据写入到elasticsearch中,可以使用Chrom浏览器通过elasticsearch-header插件查找elasticsearch中的数据。如下图所示。

由上图中可以看到elasticsearc中同步了一个“笑傲江湖”的记录。

这就实现了mysql数据库与elasticsearch全文索引的同步。

代码对应的github地址:https://github.com/wawacode/mysqlelasticsearchsynchronism

教程对应的视频地址:

      mysql与elasticsearch同步2-kafka生产者消费者模式消费binlog

           https://www.bilibili.com/video/BV1cV411e7F9/
     
mysql与elasticsearch同步3-elasticsearch的增删改同步数据库

           https://www.bilibili.com/video/BV1ty4y1E7n6/

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐