当前位置:首页 > 日记本 > 正文内容

python 从数据库导入数据到elasticsearch title有则更新,无则添加

zhangchap1年前 (2023-03-29)日记本159

1、常规方法,速度较慢

import mysql.connector
from elasticsearch import Elasticsearch
import time
# 连接 MySQL 数据库
mysql_conn = mysql.connector.connect(
    host="localhost",
    user="root",
    password="FiroRegePUE0000idB3",
    database="car"
)

# 连接 Elasticsearch
es = Elasticsearch(['http://localhost:9200'])

# 定义 MySQL 查询语句
query = 'SELECT title, content FROM yj_ask_1'

# 执行查询
cursor = None
while cursor is None:
    try:
        cursor = mysql_conn.cursor()
        cursor.execute(query)
    except mysql.connector.errors.OperationalError as e:
        if e.errno == mysql.connector.errorcode.CR_SERVER_LOST or e.errno == mysql.connector.errorcode.CR_CONN_HOST_ERROR:
            print("Reconnecting...")
            time.sleep(1)
            mysql_conn.ping(True)
        else:
            raise

# 遍历查询结果
for title, content in cursor:

    # 查询 Elasticsearch 中是否已存在该文章
    es_query = {
        "query": {
            "term": {"title.keyword": title}
        }
    }
    es_results = es.search(index='articles', body=es_query, request_timeout=30)['hits']['hits']

    if len(es_results) > 0:
        # 更新 Elasticsearch 中的文章数据
        es_id = es_results[0]['_id']
        es_body = {
            "content": content
        }
        es.update(index='articles', id=es_id, body={"doc": es_body}, request_timeout=30)
    else:
        # 在 Elasticsearch 中追加文章数据
        es_body = {
            "title": title,
            "content": content
        }
        es.index(index='articles', body=es_body, request_timeout=30)

# 关闭 MySQL 连接和 Elasticsearch 连接
cursor.close()
mysql_conn.close()
es.transport.close()

2、异步,速度有所提高

import mysql.connector
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import asyncio


async def update_article(title, content, es):
    # 查询 Elasticsearch 中是否已存在该文章
    es_query = {
        "query": {
            "term": {"title.keyword": title}
        }
    }
    es_results = es.search(index='articles', body=es_query, request_timeout=30)['hits']['hits']

    if len(es_results) > 0:
        # 更新 Elasticsearch 中的文章数据
        es_id = es_results[0]['_id']
        es_body = {
            "content": content
        }
        await es.update(index='articles', id=es_id, body={"doc": es_body}, request_timeout=30)
    else:
        # 在 Elasticsearch 中追加文章数据
        es_body = {
            "title": title,
            "content": content
        }
        await es.index(index='articles', body=es_body, request_timeout=30)


async def import_data():
    # 连接 MySQL 数据库
    mysql_conn = mysql.connector.connect(
        host="localhost",
        user="root",
        password="FiroRgePUE00idB3",
        database="car"
    )

    # 连接 Elasticsearch
    es = Elasticsearch(['http://localhost:9200'])

    # 定义 MySQL 查询语句
    query = 'SELECT title, content FROM yj_ask_1'

    # 执行查询
    cursor = mysql_conn.cursor()
    cursor.execute(query)

    # 将查询结果转换为 Elasticsearch 文档格式
    actions = []
    for title, content in cursor:
        action = {
            '_op_type': 'update',
            '_index': 'articles',
            '_id': title,
            'doc': {'content': content},
            'doc_as_upsert': True
        }
        actions.append(action)

    # 使用 Elasticsearch 的 bulk API 一次性索引或更新多个文档
    tasks = [update_article(action['_id'], action['doc']['content'], es) for action in actions]
    await asyncio.gather(*tasks)

    # 关闭 MySQL 连接和 Elasticsearch 连接
    cursor.close()
    mysql_conn.close()
    es.transport.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(import_data())


分享给朋友:

相关文章

python 随机生成时间戳写入txt文件/运行sql语句

import time from random import randint with open('time.txt', ...

python jieba分词

import jieba from jieba.analyse import tfidf words = jieba.lcut('...

python 发布文章 随机分类(choice)

from random import choice catid = choice([5,6]) #choice 函数从列表中随机提取...

python读取txt文件放到Queue队列

from queue import Queue with open('kw.txt',encoding='utf-8')&nb...

python fake_useragent 模块用法

我们每次发送requests请求时通过random从中随机获取一个随机UserAgent,两行代码即可完成UserAgent的不停更换 from fake_useragent i...

如何为精简的 CSS 文件删除未使用的 CSS

如何为精简的 CSS 文件删除未使用的 CSS

精简的网站比臃肿的网站运行得更快,这已经不是什么秘密了。不要让不必要的 CSS 拖累您的 Web 项目;使用下面描述的工具和技术来帮助您删除未使用的 CSS 并提高您网站的整体性能。什么是未使用的 C...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。