使用Python将MongoDB数据同步到Elasticsearch
发布日期:2022-02-27 02:38:02 浏览次数:59 分类:技术文章

本文共 5354 字,大约阅读时间需要 17 分钟。

使用Python将MongoDB数据同步到Elasticsearch

版本说明:Python 3.7 PyMongo:3.11.0 Elasticsearch:5.5.3

话不多说直接鲁码(如遇到什么问题欢迎留言讨论)

# coding:utf8# 将mongodb中的数据同步到Es中from pymongo import MongoClientfrom elasticsearch import Elasticsearch, helpersimport jsonimport logging# mongodb 数据库地址CONN_ADDR1 = '更换为自己的MongoDB地址'USERNAME = 'MongoDB用户名'PASSWORD = 'MongoDB密码'DB = "MongoDB的库"COLLECTION = "MongoDB集合"# Es 数据库地址class ElasticObj:    def __init__(self, index_name, index_type, ip):        """        :param index_name: 索引名称        :param index_type: 索引类型        """        self.index_name = index_name        self.index_type = index_type        # 无用户名密码状态        # self.es = Elasticsearch([ip])        # 用户名密码状态        # 连接ES        self.es = Elasticsearch([ip], http_auth=('ES用户名', 'ES密码'), port=9200)    # def chaxun(self):#查询所有数据    #     db = self.client['xcc_company_name']    #     collection = db['name_A']    #     data_qiyes = collection.find({}, no_cursor_timeout=True)    #     return data_qiyes    # 创建索引    def create_index(self):        '''        创建索引,创建索引名称为ott,类型为ott_type的索引        :param ex: Elasticsearch对象        :return:        '''        # 创建映射        _index_mappings = {
"mappings": {
self.index_type: {
"properties": {
"name": {
'type': 'text' }, "password": {
'type': 'text' }, "birthplace": {
'type': 'text' } } } } } if self.es.indices.exists(index=self.index_name) is not True: res = self.es.indices.create(index=self.index_name, body=_index_mappings, ignore=400) print(res) # 打印详细的日志 def detailedlog(self): logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.DEBUG) logging.debug('debug 信息') logging.info('info 信息') logging.warning('warning 信息') logging.error('error 信息') logging.critical('critial 信息') # 插入数据 def insert_data(self, p=0): ACTIONS = [] p = 1 bulk_num = 200 # 连接mongo库 client = MongoClient(CONN_ADDR1, port=27017) client.admin.authenticate(USERNAME, PASSWORD) mydb = client[DB] mycol = mydb[COLLECTION] # find()最大限制是101条 使用mycol.find({}, no_cursor_timeout=True) 查询所有 Cursor = mycol.find({
}, no_cursor_timeout=True) n = 0 for mongoRecordRes in Cursor: data = [] # for mongoRecordRes in list(mycol.find({}, no_cursor_timeout=True)): n = n + 1 print("开始处理数据" + str(n) + "条") data_value = [] data_name = ["corporateAnnualInfoId", "companyId", "nbYear", "annualCount", "basicInfo", "onlineStoreInfo", "shareContributive", "frimAssetInfo", "socialInfo" ] 。。。(这里编写自己处理数据的逻辑) # print(data_value) # print(data_name) data_name_value = dict(zip(data_name, data_value)) data.append(data_name_value) # print(data) # 遍历data数据 for list_line in data: # 去掉引号 # list_line = eval(list_line) # print("-" * 10) # print(list_line) # print("-" * 10) action = {
"_index": self.index_name, "_type": self.index_type, "_id": list_line["corporateAnnualInfoId"], # _id 也可以默认生成,不赋值 "_source": {
"corporateAnnualInfoId": list_line["corporateAnnualInfoId"], "companyId": list_line["companyId"], "nbYear": list_line["nbYear"], "annualCount": list_line["annualCount"], "basicInfo": list_line["basicInfo"], "onlineStoreInfo": list_line["onlineStoreInfo"], "shareContributive": list_line["shareContributive"], "frimAssetInfo": list_line["frimAssetInfo"], "socialInfo": list_line["socialInfo"] } } p += 1 ACTIONS.append(action) print(ACTIONS) # success, _ = helpers.bulk(self.es, action, index=self.index_name, raise_on_error=True) # 批量处理 if len(ACTIONS) == bulk_num: print('插入', p / bulk_num, '批数据') print(len(ACTIONS)) success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True) del ACTIONS[0:len(ACTIONS)] print(success) if len(ACTIONS) > 0: success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True) del ACTIONS[0:len(ACTIONS)] print('Performed %d actions' % success)if __name__ == '__main__': #这里IP更换为自己的 obj = ElasticObj("IndexName", "IndexType", ip="es-cn-********.com") #创建索引 本人这里没有使用只是测试了一下 #obj.create_index() obj.detailedlog() #执行数据的插入 obj.insert_data()

转载地址:https://blog.csdn.net/weixin_43401381/article/details/112003930 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:PyHive(遇到的坑)以及一些ETl使用函数
下一篇:PyMongo2Es常见问题汇总

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2024年04月14日 21时41分25秒