Python Elasticsearch Client 使用Bulk API把geojson数据导入es
geojson数据格式有GDAL通过ogr2ogr工具转换而来geojson格式并不满足es bulk api对json的格式要求:BULK API下面用py_es client来解析geojson,并使用bulk api导入espy_es client :https://elasticsearch-py.readthedocs.io/en/7.6.0/导入速度大概是:1500条/秒# aganl
·
参考一个大牛的博客:https://cloud.tencent.com/developer/article/1050321
geojson数据格式由GDAL 通过ogr2ogr工具转换而来
geojson格式并不满足es bulk api对json的格式要求: BULK API
下面用py_es client来解析geojson,并使用bulk api 导入es
py_es client : https://elasticsearch-py.readthedocs.io/en/7.6.0/
es geo 类型字段参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/geo-point.html
导入速度大概是:1500条/秒
# aganliang 20200519
# 使用ES python api插入geojson点数据
import json
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
def set_mapping(es, index_name="content_engine"):
my_mapping = {
"properties": {
"location": {
"type": "geo_point" #线、面数据是geo_shape
},
"name": {
"type": "text",
"fields":{
"keyword":{
"type":"keyword",
"ignore_above":256
}
}
},
"typecode": {
"type": "text",
"index": False
},
"address": {
"type": "text"
}
}
}
# ignore 404 and 400
# 如果索引存在,那么先删除
es.indices.delete(index=index_name, ignore=[400, 404])
print("delete_index")
# ignore 400 cause by IndexAlreadyExistsException when creating an index
create_index = es.indices.create(index=index_name, ignore=400)
mapping_index = es.indices.put_mapping(index=index_name,body=my_mapping)
if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:
print("Index creation failed...")
def set_data(es, input_file, index_name):
with open(input_file, 'r', encoding='UTF-8') as f:
data = json.load(f)
features = data["features"]
ACTIONS = []
i = 0
count = 0
for feature in features:
action = {}
if (feature["geometry"]["type"] == "Point"): # 判断geometry类型为point
action = {
"_index": index_name,
"_source": {
"name": feature["properties"]["name"],
"typecode": feature["properties"]["typecode"],
"address": feature["properties"]["address"],
"location": {
#"type": "point",
"lat":feature["geometry"]["coordinates"][1],
"lon": feature["geometry"]["coordinates"][0]
#"coordinates": feature["geometry"]["coordinates"]
}
}
}
else: # geometry类型为multipolygon
action = {
"_index": index_name,
# "_type": doc_type_name,
"_source": {
"name": feature["properties"]["name"],
"typecode": feature["properties"]["typecode"],
"address": feature["properties"]["address"],
"location": {
"type": "point",
"coordinates": feature["geometry"]["coordinates"]
}
}
}
i += 1
print("prepare insert: %s" % feature["properties"]["name"])
print("type: %s" % feature["geometry"]["type"])
ACTIONS.append(action)
if (i == 5):
success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
count += success
i = 0
ACTIONS = []
print("insert %s lines" % count)
success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
count += success
print("insert %s lines" % count)
if __name__ == '__main__':
# es = Elasticsearch(hosts=["127.0.0.1:9200"], http_auth=('elastic','changeme'),timeout=5000)
es = Elasticsearch(hosts=["192.168.1.9:9200"], timeout=5000)
set_mapping(es, "gis")
# geojson文件为ogr2ogr生成格式
set_data(es, "./data/ppp.json", "gis") # point
更多推荐
所有评论(0)