python下elasticsearch搜索接口封装实现
# -*- coding:utf-8 -*-
from elasticsearch import Elasticsearch,TransportError
from typing import Any
def search(es:Elasticsearch,index:Any,word:str,size:int = 10):
try:
result = es.search(index=index,q=word,size=size,ignore=[400,401,403,409])
except TransportError as e:
print(f'搜索{word}出错:{e.error}')
return None,0
try:
hits = result['hits']['hits']
total = result['hits']['total']['value']
# 以下代码可以用列表推导式实现
# result_items = []
# for item in hits:
# source = item['_source']
# title = item['_source']['title']
# content = item['_source']['content']
# score = item['_score']
# result_items.append(title)
except KeyError:
return None,0
return [item['_source'] for item in hits],total
if __name__ == '__main__':
es_client = Elasticsearch()
index_name = 'test-demo0'
keyword = '互联网'
items,count = search(es_client,index_name,keyword,20)
print(count)