python 从数据库导入数据到elasticsearch title有则更新,无则添加
1、常规方法,速度较慢
import mysql.connector from elasticsearch import Elasticsearch import time # 连接 MySQL 数据库 mysql_conn = mysql.connector.connect( host="localhost", user="root", password="FiroRegePUE0000idB3", database="car" ) # 连接 Elasticsearch es = Elasticsearch(['http://localhost:9200']) # 定义 MySQL 查询语句 query = 'SELECT title, content FROM yj_ask_1' # 执行查询 cursor = None while cursor is None: try: cursor = mysql_conn.cursor() cursor.execute(query) except mysql.connector.errors.OperationalError as e: if e.errno == mysql.connector.errorcode.CR_SERVER_LOST or e.errno == mysql.connector.errorcode.CR_CONN_HOST_ERROR: print("Reconnecting...") time.sleep(1) mysql_conn.ping(True) else: raise # 遍历查询结果 for title, content in cursor: # 查询 Elasticsearch 中是否已存在该文章 es_query = { "query": { "term": {"title.keyword": title} } } es_results = es.search(index='articles', body=es_query, request_timeout=30)['hits']['hits'] if len(es_results) > 0: # 更新 Elasticsearch 中的文章数据 es_id = es_results[0]['_id'] es_body = { "content": content } es.update(index='articles', id=es_id, body={"doc": es_body}, request_timeout=30) else: # 在 Elasticsearch 中追加文章数据 es_body = { "title": title, "content": content } es.index(index='articles', body=es_body, request_timeout=30) # 关闭 MySQL 连接和 Elasticsearch 连接 cursor.close() mysql_conn.close() es.transport.close()
2、异步,速度有所提高
import mysql.connector from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk import asyncio async def update_article(title, content, es): # 查询 Elasticsearch 中是否已存在该文章 es_query = { "query": { "term": {"title.keyword": title} } } es_results = es.search(index='articles', body=es_query, request_timeout=30)['hits']['hits'] if len(es_results) > 0: # 更新 Elasticsearch 中的文章数据 es_id = es_results[0]['_id'] es_body = { "content": content } await es.update(index='articles', id=es_id, body={"doc": es_body}, request_timeout=30) else: # 在 Elasticsearch 中追加文章数据 es_body = { "title": title, "content": content } await es.index(index='articles', body=es_body, request_timeout=30) async def import_data(): # 连接 MySQL 数据库 mysql_conn = mysql.connector.connect( host="localhost", user="root", password="FiroRgePUE00idB3", database="car" ) # 连接 Elasticsearch es = Elasticsearch(['http://localhost:9200']) # 定义 MySQL 查询语句 query = 'SELECT title, content FROM yj_ask_1' # 执行查询 cursor = mysql_conn.cursor() cursor.execute(query) # 将查询结果转换为 Elasticsearch 文档格式 actions = [] for title, content in cursor: action = { '_op_type': 'update', '_index': 'articles', '_id': title, 'doc': {'content': content}, 'doc_as_upsert': True } actions.append(action) # 使用 Elasticsearch 的 bulk API 一次性索引或更新多个文档 tasks = [update_article(action['_id'], action['doc']['content'], es) for action in actions] await asyncio.gather(*tasks) # 关闭 MySQL 连接和 Elasticsearch 连接 cursor.close() mysql_conn.close() es.transport.close() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(import_data())