一、安装
pip install elasticsearch
二、一个小封装类
#索引类
class ElasticSearchClient(object):
# TODO:实例和事务化单个node,若需要多个node,需要重构代码
def __init__(self, filepath="app/conf/conf.ini"):
#读取es配置
conf=configparser.ConfigParser()
conf.read(filepath,encoding='utf-8')
# TODO:传参
self.es_servers = [{
"host": conf.get('Elasticsearch','url'),
"port": conf.get('Elasticsearch','port')
}]
# http_auth是对设置了安全机制的es库需要写入 账号与密码,如果没有设置则不用写这个参数
self.es_client = elasticsearch.Elasticsearch(hosts=self.es_servers,http_auth=("xxx", "xxxxx"))
# TODO:进行创建一个数据库,即index
def create_index(self, index_name):
self.es_client.indices.create(index=index_name)
# TODO:指定map创建一个数据库
def createindex_by_map(self,index_name,map):
self.es_client.indices.create(index=index_name,body=map)
# TODO:进行删除一个数据库,即index
def delete_es_index(self, index_name):
self.es_client.indices.delete(index=index_name)
# 数据库不用进入,也不用退出。
class LoadElasticSearch(object):
# TODO:对单个index进行增删改查
def __init__(self, index, doc_type='docx'):
# TODO:输入单个index的名称
self.index = index
self.doc_type = doc_type
try:
self.es_client = ElasticSearchClient().es_client
except Exception as e:
print(e)
print('连接es失败,请查看是否连接。')
if not self.es_client.indices.exists(index=index):
# 创建Index
self.es_client.indices.create(index=self.index)
def set_index_mapping(self, set_mappings):
# TODO:设置mapping结构
"""
设置index的mapping,类似于表结构。
注意!!!!现在仅仅对mapping中的properties参数,其他的参数还很多
前提为:已有index,并且已自定义分词器,详情见https://blog.csdn.net/u013905744/article/details/80935846
输入参数举例说明:
set_mappings = {"answer": {
"type": "string",
"index": "not_analyzed"
},
"answerAuthor": {
"type": "string"
},
"answerDate": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"//这里出现了复合类型
},
...
{...
}
}
"""
mapping = {
self.doc_type: {
"properties": set_mappings
}
}
self.es_client.indices.put_mapping(index=self.index, doc_type=self.doc_type, body=mapping)
def add_date(self, row_obj):
"""
单条插入ES
"""
self.es_client.index(index=self.index, doc_type=self.doc_type, body=row_obj)
def add_date_bulk(self, row_obj_list):
"""
批量插入ES,输入文本格式为单条插入的list格式
"""
load_data = []
i = 1
bulk_num = 2000 # 2000条为一批
for row_obj in row_obj_list:
action = {
"_index": self.index,
"_type": self.doc_type,
"_source": row_obj
}
load_data.append(action)
i += 1
# 批量处理
if len(load_data) == bulk_num:
print('插入', i / bulk_num, '批数据')
print(len(load_data))
success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
del load_data[0:len(load_data)]
print(success, failed)
if len(load_data) > 0:
success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
del load_data[0:len(load_data)]
print(success, failed)
def update_by_id(self, row_obj):
"""
根据给定的_id,更新ES文档
:return:
"""
_id = row_obj.get("_id", 1)
row_obj.pop("_id")
self.es_client.update(index=self.index, doc_type=self.doc_type, body={"doc": row_obj}, id=_id)
def delete_by_id(self, _id):
"""
根据给定的id,删除文档
:return:
"""
self.es_client.delete(index=self.index, doc_type=self.doc_type, id=_id)
def search_by_query(self, body):
'''
根据查询的query语句,来搜索查询内容
'''
search_result = self.es_client.search(index=self.index, doc_type=self.doc_type, body=body)
return search_result
三、如何使用
1.创建索引时指定Mapping
我们在创建索引时,需要给创建的索引指定Mapping,我将Mapping文件放入了一个xxx.json文件中
{
"settings": {
#设置副本数
"number_of_replicas": 1,
#设置分片
"number_of_shards": 4,
#设置分析器 我们采用ik作为tokenizer pinyin作为filter
"analysis": {
"analyzer": {
"my_analyzer":{
"type":"custom",
"tokenizer":"ik_max_word",
"filter":["pinyin_first_letter_and_full_pinyin_filter"]
}
},
"filter": {
"pinyin_first_letter_and_full_pinyin_filter": {
"type" : "pinyin",
"keep_first_letter" : "true",
"keep_full_pinyin" : "false",
"keep_none_chinese" : "true",
"keep_original" : "false",
"limit_first_letter_length" : 16,
"lowercase" : "true",
"trim_whitespace" : "true",
"keep_none_chinese_in_first_letter" : "true"
}
}
}
},
"mappings": {
"dynamic_templates": [
{
"strings":{
#设定读取到索引中是String类型就设置type为text字段采用我自己设置的分析器,并增加 keyword字段
"match_mapping_type":"string",
"mapping":{
"type":"text",
"analyzer":"my_analyzer",
"fields":{
"raw":{
"type":"keyword"
}
}
}
}
}
]
}
}
创建代码
mappath="xxxx/xxxx.json"
f=open(mappath,'r',encoding='utf-8')
#读取map
map=json.load(f)
es=ElasticSearchClient()
#创建索引
es.createindex_by_map(indexname,map=map)
2.查询
es_client = LoadElasticSearch(indexname)
search={"query":xxxx}
res = es_client.search_by_query(one_body)