Python操作ElasticSearch
发布日期:2021-08-17 10:07:54 浏览次数:45 分类:技术文章

本文共 6136 字,大约阅读时间需要 20 分钟。

Python批量向ElasticSearch插入数据

Python 2的多进程不能序列化类方法, 所以改为函数的形式.

直接上代码:

#!/usr/bin/python# -*- coding:utf-8 -*-import osimport reimport jsonimport timeimport elasticsearchfrom elasticsearch.helpers import bulkfrom multiprocessing import Pooldef write_file(doc_type, action_list):    """"""    with open("/home/{}_error.json".format(doc_type), "a") as f:        for i in action_list:            f.write(str(i))def add_one(file_path, doc_type, index):    """准备插入一条"""    print doc_type, index    es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])    with open(file_path, "r") as f:        for line in f:            try:                line = re.sub("\n", "", line)                dict_obj = json.loads(line)                es_client.index(index=index, doc_type=doc_type, body=dict_obj)            except Exception as e:                print "出错了, 错误信息: {}".format(e)def add_bulk(doc_type, file_path, bulk_num, index):    """"""    es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])    action_list = []    # 文件过大, 先插入5000万试水    total = 50000000    num = 0    with open(file_path, "r") as f:        for line in f:            num += 0            if num >= total:                break                        # 去除每一行数据中的"\n"字符, 也可以替换为"\\n"            line = line.replace("\n", "")            dict_obj = json.loads(line)            # 根据bulk_num的值发送一个批量插入请求            # action = {            #     "_index": index,            #     "_type": doc_type,            #     "_source": {            #         "ip": dict_obj.get("ip", "None"),            #         "data": str(dict_obj.get("data", "None"))            #     }            # }            # 如果动态插入,字段过长,会报错,导致插不进去, 转为字符串就可以            action = {                '_op_type': 'index',                "_index": index,                "_type": doc_type,                "_source": dict_obj            }            action_list.append(action)            if len(action_list) >= bulk_num:                try:                    print "Start Bulk {}...".format(doc_type)                    success, failed = bulk(es_client, action_list, index=index, raise_on_error=True)                    print "End Bulk {}...".format(doc_type)                except Exception as e:                    print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])                    write_file(doc_type, action_list)                finally:                    del action_list[0:len(action_list)]        # 如果不是bulk_num的等值, 那么就判断列表是否为空, 再次发送一次请求        if len(action_list) > 0:                try:                    success, failed = bulk(es_client, action_list, index=index, raise_on_error=True)                except Exception as e:                    print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])                    write_file(doc_type, action_list)                finally:                    del action_list[0:len(action_list)]def mulit_process(path, index, bulk_num, data):    """"""    # 多进程执行    pool = Pool(10)    results = []    for i in data:        doc_type = i["doc_type"]        file_path = i["file_path"]        result = pool.apply_async(add_bulk, args=(doc_type, file_path, bulk_num, index))        results.append(result)        pool.close()    pool.join()def all_info(path):    data = []    for i in os.listdir(path):        file_dict = {}        if i.endswith(".json"):            doc_type = i.split("_")[0]            file_path = path + i            if doc_type == "443":                continue            file_dict["doc_type"] = doc_type            file_dict["file_path"] = file_path            data.append(file_dict)    return datadef es_insert(process_func=None):    """"""    # 库    index = "test"    # 文件路径    path="/home/data/"        # 批量插入的数量, 如果是json整条数据插入的话, 可能会出现字段过长的问题, 导致插不进去, 适当调整bulk_num的值    bulk_num = 5000    if not path.endswith("/"):        path += "/"    data = all_info(path)    if process_func == "bulk":        # 插入多条, doc_type, file_path, bulk_num, index        add_bulk("80", path + "80_result.json", bulk_num, index)    elif process_func == "one":        # 插入单条file_path, doc_type, index        add_one(path + "80_result.json", "80", index)    else:        # 多进程        mulit_process(path, index, bulk_num, data)if __name__ == "__main__":    # 计算脚本执行时间    start_time = time.time()    if not os.path.exists("/home/test"):        os.makedirs("/home/test")    # 插入数据    es_insert()    # 计算脚本执行时间    end_time = time.time()    print end_time - start_time

Python搜索ElasticSearch

示例:

#!/usr/bin/python# -*- coding:utf -*-import jsonimport elasticsearchdef es_login(host="localhost", port="9200"):    """连接es"""    return elasticsearch.Elasticsearch(hosts=[{"host": host, "port": port}])def get(es_client, _id):    """获取一条内容"""    # result = es_client.get(index="test", doc_type="80", id=_id)    result = es_client.get(index="test", id=_id)    return json.dumps(result)def search(es_client, query, field="_all"):    """聚合搜索内容"""    result = es_client.search(index="test", body={        "query": {            "bool": {                "must": [                    {                        "query_string": {                            # 指定字段                            "default_field": field,                            # 查询字段                            "query": query                        }                    },                     {                        "match_all": {}                    }                ],                "must_not": [],                "should": []            }        },        "from": 0,        "size": 10,        "sort": [],        # 聚合        "aggs": {            # "all_interests":{            #     "terms":{            #         "field":"interests"            #     }            # }        }    })    return json.dumps(result)def main():    """入口"""    # 连接es    es_client = es_login()    # result = search(es_client, query="123.125.115.110", field="_all")    result = get(es_client, "AWTv-ROzCxZ1gYRliWhu")    print resultif __name__ == "__main__":    main()

删除ElasticSearch全部数据

curl -X DELETE localhost:9200/test, test为自己的index名称

转载于:https://www.cnblogs.com/zzhaolei/p/11068106.html

转载地址:https://blog.csdn.net/weixin_30846599/article/details/99220575 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Krusal算法
下一篇:ASP.NET 5 (vNext) Linux部署

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年03月29日 21时40分29秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

boost::function_types::is_function_reference的测试程序 2019-04-26
boost::function_types::is_function用法的测试程序 2019-04-26
boost::function_types::is_member_function_pointer用法的测试程序 2019-04-26
boost::geometry::clear用法的测试程序 2019-04-26
asp 指定读取前几条记录 2019-04-26
大数据_Hbase-API访问_Java操作Hbase_MR-数据迁移-代码测试---Hbase工作笔记0017 2019-04-26
大数据_Hbase-内容回顾和补充---Hbase工作笔记0018 2019-04-26
大数据_Hbase-内容回顾_知识点补充_线程安全与wait的区别---Hbase工作笔记0019 2019-04-26
大数据_Hbase-Filter & 索引(优化)_根据column查询---Hbase工作笔记0020 2019-04-26
大数据_MapperReduce_从CSV文件中读取数据到Hbase_自己动手实现Mapper和Reducer---Hbase工作笔记0021 2019-04-26
大数据_MapperReduce_协处理器_类似Mysql的触发器---Hbase工作笔记0024 2019-04-26
大数据_MapperReduce_Hbase的优化_存数据_自动计算分区号 & 自动计算分区键---Hbase工作笔记0027 2019-04-26
大数据_MapperReduce_Hbase的优化_RowKey设计原则---Hbase工作笔记0028 2019-04-26
大数据_MapperReduce_Hbase的优化和Hbase相关面试题_以及hbase的javaapi的一部分源码---Hbase工作笔记0029 2019-04-26
大数据_MapperReduce_Hbase配置参数说明_以及部分源码说明---Hbase工作笔记0031 2019-04-26
Vue介绍---vue工作笔记0001 2019-04-26
Vue基本使用---vue工作笔记0002 2019-04-26
微信公众号介绍_以及注册订阅号---微信公众号开发工作笔记0001 2019-04-26
Vue模板语法---vue工作笔记0003 2019-04-26
Vue计算属性之基本使用---vue工作笔记0004 2019-04-26