简介:将获取的状态数据写入es,再由grafana去查询es数据。
在这里插入图片描述

import datetime
from elasticsearch6 import helpers
from elasticsearch6 import Elasticsearch
# from cat_file import get_net


class ES_actions():
    actions = []
    def __init__(self,read_es,write_es,write_index):
        self.write_index = write_index
        self.read_es = read_es
        self.write_es = write_es

    def set_actions(self, inData):
        action = {
            "_index": self.write_index,
            "_type": '_doc',
            "_source": inData
        }
        return action

    def get_nodes(self):
        nodes = []
        jsonData = self.read_es.nodes.stats()
        nodeID = jsonData['nodes'].keys()
        for node in nodeID:
            nodes.append(jsonData['nodes'][node]['host'])
        node_list = list(set(nodes))
        return node_list

    def fetch_clusterhealth(self):
        try:
            jsonData = self.read_es.cluster.health()
            clusterName = jsonData['cluster_name']
            jsonData['@timestamp'] = self.timestamps
            if jsonData['status'] == 'green':
                jsonData['status_code'] = 0
            elif jsonData['status'] == 'yellow':
                jsonData['status_code'] = 1
            elif jsonData['status'] == 'red':
                jsonData['status_code'] = 2
            action = self.set_actions(jsonData)
            self.actions.append(action)
            return clusterName
        except IOError as err:
            print("IOError: Maybe can't connect to elasticsearch.")
            clusterName = "unknown"
            return clusterName

    def get_timestamp(self):
        utc_datetime = datetime.datetime.utcnow()
        timestamps = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
        return timestamps

class ES_monitor(ES_actions):
    def __init__(self, read_es, write_es, write_index):
        super(ES_monitor, self).__init__(read_es, write_es, write_index)

    def fetch_clusterstats(self, clusterName):
        jsonData = self.read_es.cluster.stats()
        jsonData['@timestamp'] = self.get_timestamp()
        jsonData['cluster_name'] = clusterName
        jsonData['indices']['store'] = {"clusters": jsonData['indices']['store']}
        action = self.set_actions(jsonData)
        self.actions.append(action)

    def fetch_nodestats(self, clusterName):
        jsonData = self.read_es.nodes.stats()
        nodeID = jsonData['nodes'].keys()
        for node in nodeID:
            jsonData['nodes'][node]['@timestamp'] = self.get_timestamp()
            jsonData['nodes'][node]['cluster_name'] = clusterName
            newJsonData = jsonData['nodes'][node]
            action = self.set_actions(newJsonData)
            self.actions.append(action)

    def fetch_indexstats(self, clusterName):
        jsonData = self.read_es.indices.stats(format='json')
        indexID = jsonData['indices'].keys()
        for ID in indexID:
            dates = self.get_timestamp().split('T')[0]
            if dates in ID:
                jsonData['indices'][ID]['@timestamp'] = self.get_timestamp()
                jsonData['indices'][ID]['cluster_name'] = clusterName
                jsonData['indices'][ID]['index_name'] = ID
                indexData = jsonData['indices'][ID]
                action = self.set_actions(indexData)
                self.actions.append(action)
        jsonData['_all']['@timestamp'] = self.get_timestamp()
        jsonData['_all']['cluster_name'] = clusterName
        cluster_data = jsonData['_all']
        action = self.set_actions(cluster_data)
        self.actions.append(action)

    def index_status(self, clusterName):
        indices_mes = self.read_es.cat.indices(format='json')
        for indices, avg in zip(indices_mes, self.docs_avg()):
            indices['cluster_name'] = clusterName
            indices['docs_count'] = indices.pop('docs.count')
            indices['docs_deleted'] = indices.pop('docs.deleted')
            indices['store_size'] = indices.pop('store.size')
            indices['pri_store_size'] = indices.pop('pri.store.size')
            indices['primary'] = indices.pop('pri')
            indices['replica'] = indices.pop('rep')
            indices['@timestamp'] = self.get_timestamp()
            dates = self.get_timestamp().split('T')[0]
            if dates in indices['index']:
                action = {
                    "_index": self.write_index,
                    "_type": '_doc',
                    "_id": "{}_{}".format(clusterName, indices['index']),
                    "_source": indices
                }
                self.actions.append(action)

    def get_nets(self, clusterName):
        host_lists = self.get_nodes()
        for host in host_lists:
            for jsonData in get_net(host):
                jsonData['cluster_name'] = clusterName
                jsonData['@timestamp'] = self.get_timestamp()
                self.actions(jsonData)

    def docs_avg(self):
        indices_docs_mes = self.read_es.cat.indices(format='json', health='green', h='pri.store.size,docs.count,index',
                                                    bytes='kb')
        for docs in indices_docs_mes:
            if int(docs['docs.count']) != 0:
                avg_docs = int(docs['pri.store.size']) / int(docs['docs.count'])
                yield round(avg_docs, 2), docs['index']
            elif int(docs['docs.count']) == 0:
                yield int(docs['docs.count']), docs['index']

    def post_data(self):
        ###写入es)
        helpers.bulk(client=self.write_es, actions=self.actions)
import time
import os
import sys
from elasticsearch6 import Elasticsearch
from grafana_es_monitor2 import ES_monitor

interval = int(os.environ.get('ES_METRICS_INTERVAL', '60'))


def exec_es(*args):
    read_es = Elasticsearch(args[0], http_auth=(args[2], args[3]), port=args[4], timeout=300)
    write_es = Elasticsearch(args[1], http_auth=("elastic", "1qaz@WSX"), port=9250)
    read_es.cat.nodes(h='ip')
    Data = ES_monitor(read_es, write_es, args[5])
    try:
        nextRun = 0
        while True:
            if time.time() >= nextRun:
                nextRun = time.time() + interval
                now = time.time()
                clusterName = Data.fetch_clusterhealth()
                if clusterName != "unknown":
                    Data.fetch_clusterstats(clusterName)
                    Data.fetch_nodestats(clusterName)
                    Data.fetch_indexstats(clusterName)
                    Data.index_status(clusterName)
                    Data.post_data()
                    elapsed = time.time() - now
                    print("ES connect: {},Total Elapsed Time: {}".format(args[0],elapsed))
                    timeDiff = nextRun - time.time()
                    if timeDiff >= 0:
                        time.sleep(timeDiff)
    except KeyboardInterrupt:
       print('Interrupted')
       try:
           sys.exit(0)
       except SystemExit:
           os._exit(0)

if __name__ == '__main__':
    ##多进程
    import threading
    from multiprocessing import Process
    write_index = "elasticsearch_metrics_40"
    write_es = "http://ip"
    read_es_list=["http://ip:9200","http://ip:9200","http://ip:9200","http://ip:9200"]
    user = 'elastic'
    pwd = 'elastic'
    connect_list = []
    for read_es in read_es_list:
        ES = (read_es, write_es, user, pwd, 9200, write_index)
        connect_list.append(ES)
    threads = []
    files = range(len(connect_list))
    for n in connect_list:
        t = Process(target=exec_es, args=n)
        threads.append(t)
    for s in files:
        threads[s].start()
    for s in files:
        threads[s].join()
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐