一、安装
pip install elasticsearch 
二、一个小封装类
#索引类
class ElasticSearchClient(object):
    # TODO:实例和事务化单个node,若需要多个node,需要重构代码
    def __init__(self, filepath="app/conf/conf.ini"):
        #读取es配置
        conf=configparser.ConfigParser()
        conf.read(filepath,encoding='utf-8')
        # TODO:传参

        self.es_servers = [{
            "host": conf.get('Elasticsearch','url'),
            "port": conf.get('Elasticsearch','port')
        }]
    # http_auth是对设置了安全机制的es库需要写入 账号与密码,如果没有设置则不用写这个参数
        self.es_client = elasticsearch.Elasticsearch(hosts=self.es_servers,http_auth=("xxx", "xxxxx")) 

    # TODO:进行创建一个数据库,即index
    def create_index(self, index_name):
        self.es_client.indices.create(index=index_name)
    # TODO:指定map创建一个数据库
    def createindex_by_map(self,index_name,map):
        self.es_client.indices.create(index=index_name,body=map)
    # TODO:进行删除一个数据库,即index
    def delete_es_index(self, index_name):
        self.es_client.indices.delete(index=index_name)

    # 数据库不用进入,也不用退出。


class LoadElasticSearch(object):
    # TODO:对单个index进行增删改查
    def __init__(self, index, doc_type='docx'):
        # TODO:输入单个index的名称
        self.index = index
        self.doc_type = doc_type
        try:
            self.es_client = ElasticSearchClient().es_client
        except Exception as e:
            print(e)
            print('连接es失败,请查看是否连接。')

        if not self.es_client.indices.exists(index=index):
            # 创建Index
            self.es_client.indices.create(index=self.index)

    def set_index_mapping(self, set_mappings):
        # TODO:设置mapping结构
        """
        设置index的mapping,类似于表结构。
        注意!!!!现在仅仅对mapping中的properties参数,其他的参数还很多
        前提为:已有index,并且已自定义分词器,详情见https://blog.csdn.net/u013905744/article/details/80935846
        输入参数举例说明:
            set_mappings = {"answer": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "answerAuthor": {
                        "type": "string"
                    },
                    "answerDate": {
                        "type": "date",
                        "format": "strict_date_optional_time||epoch_millis"//这里出现了复合类型
                    },
                    ...
                    {...
                    }
                }
        """
        mapping = {
            self.doc_type: {
                "properties": set_mappings
            }
        }
        self.es_client.indices.put_mapping(index=self.index, doc_type=self.doc_type, body=mapping)

    def add_date(self, row_obj):
        """
        单条插入ES
        """
        self.es_client.index(index=self.index, doc_type=self.doc_type, body=row_obj)

    def add_date_bulk(self, row_obj_list):
        """
        批量插入ES,输入文本格式为单条插入的list格式
        """
        load_data = []
        i = 1
        bulk_num = 2000  # 2000条为一批
        for row_obj in row_obj_list:
            action = {
                "_index": self.index,
                "_type": self.doc_type,
                "_source": row_obj
            }
            load_data.append(action)
            i += 1
            # 批量处理
            if len(load_data) == bulk_num:
                print('插入', i / bulk_num, '批数据')
                print(len(load_data))
                success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
                del load_data[0:len(load_data)]
                print(success, failed)

        if len(load_data) > 0:
            success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
            del load_data[0:len(load_data)]
            print(success, failed)

    def update_by_id(self, row_obj):
        """
        根据给定的_id,更新ES文档
        :return:
        """

        _id = row_obj.get("_id", 1)
        row_obj.pop("_id")
        self.es_client.update(index=self.index, doc_type=self.doc_type, body={"doc": row_obj}, id=_id)

    def delete_by_id(self, _id):
        """
        根据给定的id,删除文档
        :return:
        """
        self.es_client.delete(index=self.index, doc_type=self.doc_type, id=_id)

    def search_by_query(self, body):
        '''
        根据查询的query语句,来搜索查询内容
        '''
        search_result = self.es_client.search(index=self.index, doc_type=self.doc_type, body=body)
        return search_result
三、如何使用

1.创建索引时指定Mapping

我们在创建索引时,需要给创建的索引指定Mapping,我将Mapping文件放入了一个xxx.json文件中

{
  "settings": {
  #设置副本数
   "number_of_replicas": 1,
     #设置分片
   "number_of_shards": 4,
      #设置分析器 我们采用ik作为tokenizer pinyin作为filter
   "analysis": {
     "analyzer": {
       "my_analyzer":{
       "type":"custom",
       "tokenizer":"ik_max_word",
       "filter":["pinyin_first_letter_and_full_pinyin_filter"]
     }
     },
     "filter": {
       "pinyin_first_letter_and_full_pinyin_filter": {
                    "type" : "pinyin",
                    "keep_first_letter" : "true",
                    "keep_full_pinyin" : "false",
                    "keep_none_chinese" : "true",
                    "keep_original" : "false",
                    "limit_first_letter_length" : 16,
                    "lowercase" : "true",
                    "trim_whitespace" : "true",
                    "keep_none_chinese_in_first_letter" : "true"
                }
     }

   }
 },
 "mappings": {
   "dynamic_templates": [
     {
       "strings":{
           #设定读取到索引中是String类型就设置type为text字段采用我自己设置的分析器,并增加 keyword字段
         "match_mapping_type":"string", 
         "mapping":{
           "type":"text",
           "analyzer":"my_analyzer",
           "fields":{
             "raw":{
               "type":"keyword"
             }

           }
         }
       }
     }
     ]
 }
}

创建代码

mappath="xxxx/xxxx.json"
f=open(mappath,'r',encoding='utf-8')
#读取map
map=json.load(f)
es=ElasticSearchClient()
#创建索引
es.createindex_by_map(indexname,map=map)

2.查询

es_client = LoadElasticSearch(indexname)
search={"query":xxxx}
res = es_client.search_by_query(one_body)

标签: Elasticsearch, python

评论已关闭