参考一个大牛的博客:https://cloud.tencent.com/developer/article/1050321

geojson数据格式由GDAL 通过ogr2ogr工具转换而来

geojson格式并不满足es bulk api对json的格式要求: BULK API

下面用py_es client来解析geojson,并使用bulk api 导入es

py_es client : https://elasticsearch-py.readthedocs.io/en/7.6.0/

es geo 类型字段参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/geo-point.html

导入速度大概是:1500条/秒

 

# aganliang 20200519
# 使用ES python api插入geojson点数据

import json
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

def set_mapping(es, index_name="content_engine"):
    my_mapping = {
        "properties": {
            "location": {
                "type": "geo_point"  #线、面数据是geo_shape
            },
            "name": {
                "type": "text",
                "fields":{
                    "keyword":{
                        "type":"keyword",
                        "ignore_above":256
                    }
                }
            },
            "typecode": {
                "type": "text",
                "index": False
            },
            "address": {
                "type": "text"
            }
        }
    }

    # ignore 404 and 400
    # 如果索引存在,那么先删除
    es.indices.delete(index=index_name, ignore=[400, 404])
    print("delete_index")

    # ignore 400 cause by IndexAlreadyExistsException when creating an index
    create_index = es.indices.create(index=index_name, ignore=400)
    mapping_index = es.indices.put_mapping(index=index_name,body=my_mapping)
    if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:
        print("Index creation failed...")


def set_data(es, input_file, index_name):
    with open(input_file, 'r', encoding='UTF-8') as f:
        data = json.load(f)

    features = data["features"]
    ACTIONS = []
    i = 0
    count = 0

    for feature in features:
        action = {}

        if (feature["geometry"]["type"] == "Point"):  # 判断geometry类型为point
            action = {
                "_index": index_name,
                "_source": {
                    "name": feature["properties"]["name"],
                    "typecode": feature["properties"]["typecode"],
                    "address": feature["properties"]["address"],
                    "location": {
                        #"type": "point",
                        "lat":feature["geometry"]["coordinates"][1],
                        "lon": feature["geometry"]["coordinates"][0]
                        #"coordinates": feature["geometry"]["coordinates"]
                    }
                }
            }
        else:  # geometry类型为multipolygon
            action = {
                "_index": index_name,
                # "_type": doc_type_name,
                "_source": {
                    "name": feature["properties"]["name"],
                    "typecode": feature["properties"]["typecode"],
                    "address": feature["properties"]["address"],
                    "location": {
                        "type": "point",
                        "coordinates": feature["geometry"]["coordinates"]
                    }
                }
            }

        i += 1
        print("prepare insert:  %s" % feature["properties"]["name"])
        print("type:  %s" % feature["geometry"]["type"])
        ACTIONS.append(action)
        if (i == 5):
            success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
            count += success
            i = 0
            ACTIONS = []
            print("insert %s lines" % count)

    success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
    count += success
    print("insert %s lines" % count)


if __name__ == '__main__':
    # es = Elasticsearch(hosts=["127.0.0.1:9200"], http_auth=('elastic','changeme'),timeout=5000)
    es = Elasticsearch(hosts=["192.168.1.9:9200"], timeout=5000)
    set_mapping(es, "gis")

    # geojson文件为ogr2ogr生成格式
    set_data(es, "./data/ppp.json", "gis")  # point

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐