es配合garafana监控(python)
es配合garafana监控$cat es_grafana_monitor.py#!/usr/bin/env pythonimport datetimeimport timeimport json# import urllib2import osimport sysimport refrom elasticsearch import helpersfrom elasticsearch import
·
简介:将获取的状态数据写入es,再由grafana去查询es数据。
import datetime
from elasticsearch6 import helpers
from elasticsearch6 import Elasticsearch
# from cat_file import get_net
class ES_actions():
actions = []
def __init__(self,read_es,write_es,write_index):
self.write_index = write_index
self.read_es = read_es
self.write_es = write_es
def set_actions(self, inData):
action = {
"_index": self.write_index,
"_type": '_doc',
"_source": inData
}
return action
def get_nodes(self):
nodes = []
jsonData = self.read_es.nodes.stats()
nodeID = jsonData['nodes'].keys()
for node in nodeID:
nodes.append(jsonData['nodes'][node]['host'])
node_list = list(set(nodes))
return node_list
def fetch_clusterhealth(self):
try:
jsonData = self.read_es.cluster.health()
clusterName = jsonData['cluster_name']
jsonData['@timestamp'] = self.timestamps
if jsonData['status'] == 'green':
jsonData['status_code'] = 0
elif jsonData['status'] == 'yellow':
jsonData['status_code'] = 1
elif jsonData['status'] == 'red':
jsonData['status_code'] = 2
action = self.set_actions(jsonData)
self.actions.append(action)
return clusterName
except IOError as err:
print("IOError: Maybe can't connect to elasticsearch.")
clusterName = "unknown"
return clusterName
def get_timestamp(self):
utc_datetime = datetime.datetime.utcnow()
timestamps = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3])
return timestamps
class ES_monitor(ES_actions):
def __init__(self, read_es, write_es, write_index):
super(ES_monitor, self).__init__(read_es, write_es, write_index)
def fetch_clusterstats(self, clusterName):
jsonData = self.read_es.cluster.stats()
jsonData['@timestamp'] = self.get_timestamp()
jsonData['cluster_name'] = clusterName
jsonData['indices']['store'] = {"clusters": jsonData['indices']['store']}
action = self.set_actions(jsonData)
self.actions.append(action)
def fetch_nodestats(self, clusterName):
jsonData = self.read_es.nodes.stats()
nodeID = jsonData['nodes'].keys()
for node in nodeID:
jsonData['nodes'][node]['@timestamp'] = self.get_timestamp()
jsonData['nodes'][node]['cluster_name'] = clusterName
newJsonData = jsonData['nodes'][node]
action = self.set_actions(newJsonData)
self.actions.append(action)
def fetch_indexstats(self, clusterName):
jsonData = self.read_es.indices.stats(format='json')
indexID = jsonData['indices'].keys()
for ID in indexID:
dates = self.get_timestamp().split('T')[0]
if dates in ID:
jsonData['indices'][ID]['@timestamp'] = self.get_timestamp()
jsonData['indices'][ID]['cluster_name'] = clusterName
jsonData['indices'][ID]['index_name'] = ID
indexData = jsonData['indices'][ID]
action = self.set_actions(indexData)
self.actions.append(action)
jsonData['_all']['@timestamp'] = self.get_timestamp()
jsonData['_all']['cluster_name'] = clusterName
cluster_data = jsonData['_all']
action = self.set_actions(cluster_data)
self.actions.append(action)
def index_status(self, clusterName):
indices_mes = self.read_es.cat.indices(format='json')
for indices, avg in zip(indices_mes, self.docs_avg()):
indices['cluster_name'] = clusterName
indices['docs_count'] = indices.pop('docs.count')
indices['docs_deleted'] = indices.pop('docs.deleted')
indices['store_size'] = indices.pop('store.size')
indices['pri_store_size'] = indices.pop('pri.store.size')
indices['primary'] = indices.pop('pri')
indices['replica'] = indices.pop('rep')
indices['@timestamp'] = self.get_timestamp()
dates = self.get_timestamp().split('T')[0]
if dates in indices['index']:
action = {
"_index": self.write_index,
"_type": '_doc',
"_id": "{}_{}".format(clusterName, indices['index']),
"_source": indices
}
self.actions.append(action)
def get_nets(self, clusterName):
host_lists = self.get_nodes()
for host in host_lists:
for jsonData in get_net(host):
jsonData['cluster_name'] = clusterName
jsonData['@timestamp'] = self.get_timestamp()
self.actions(jsonData)
def docs_avg(self):
indices_docs_mes = self.read_es.cat.indices(format='json', health='green', h='pri.store.size,docs.count,index',
bytes='kb')
for docs in indices_docs_mes:
if int(docs['docs.count']) != 0:
avg_docs = int(docs['pri.store.size']) / int(docs['docs.count'])
yield round(avg_docs, 2), docs['index']
elif int(docs['docs.count']) == 0:
yield int(docs['docs.count']), docs['index']
def post_data(self):
###写入es)
helpers.bulk(client=self.write_es, actions=self.actions)
import time
import os
import sys
from elasticsearch6 import Elasticsearch
from grafana_es_monitor2 import ES_monitor
interval = int(os.environ.get('ES_METRICS_INTERVAL', '60'))
def exec_es(*args):
read_es = Elasticsearch(args[0], http_auth=(args[2], args[3]), port=args[4], timeout=300)
write_es = Elasticsearch(args[1], http_auth=("elastic", "1qaz@WSX"), port=9250)
read_es.cat.nodes(h='ip')
Data = ES_monitor(read_es, write_es, args[5])
try:
nextRun = 0
while True:
if time.time() >= nextRun:
nextRun = time.time() + interval
now = time.time()
clusterName = Data.fetch_clusterhealth()
if clusterName != "unknown":
Data.fetch_clusterstats(clusterName)
Data.fetch_nodestats(clusterName)
Data.fetch_indexstats(clusterName)
Data.index_status(clusterName)
Data.post_data()
elapsed = time.time() - now
print("ES connect: {},Total Elapsed Time: {}".format(args[0],elapsed))
timeDiff = nextRun - time.time()
if timeDiff >= 0:
time.sleep(timeDiff)
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
if __name__ == '__main__':
##多进程
import threading
from multiprocessing import Process
write_index = "elasticsearch_metrics_40"
write_es = "http://ip"
read_es_list=["http://ip:9200","http://ip:9200","http://ip:9200","http://ip:9200"]
user = 'elastic'
pwd = 'elastic'
connect_list = []
for read_es in read_es_list:
ES = (read_es, write_es, user, pwd, 9200, write_index)
connect_list.append(ES)
threads = []
files = range(len(connect_list))
for n in connect_list:
t = Process(target=exec_es, args=n)
threads.append(t)
for s in files:
threads[s].start()
for s in files:
threads[s].join()
更多推荐
已为社区贡献2条内容
所有评论(0)