分类 大数据组件 下的文章

1.B站关注的up主

flink官方runtime视频

2.老白

https://www.cnblogs.com/bethunebtj/p/9168274.html

https://cloud.tencent.com/developer/article/1172583

3.公众号 微云

https://www.jianshu.com/p/e5387fa6c6a0

https://blog.csdn.net/qq_21653785/article/details/79499127

https://zhuanlan.zhihu.com/p/22736103

算子

https://www.jianshu.com/p/2e379f242272

4.源码阅读笔记

https://blog.jrwang.me/2019/flink-source-code-streamgraph/

http://wuchong.me/blog/2016/05/04/flink-internal-how-to-build-streamgraph/

5.源码阅读总结

http://litianmin.com:9001/blog/post/litm7@qq.com/Flink%E6%BA%90%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E6%80%BB%E7%BB%93

6.github

https://github.com/wangzzu/awesome/issues/28 十分推荐

http://chenyuzhao.me/2017/02/06/flink%E7%89%A9%E7%90%86%E8%AE%A1%E5%88%92%E7%94%9F%E6%88%90/

简述:从log 日志中读取,然后设置不同的type,传输到kafka

Filebeat.yml

filebeat.config.inputs:
  enabled: true
  path: configs/*.yml  #input配置文件的路径
output.kafka:
  hosts: ["xxx.xx.xx.xx:9100"]
  topic: '%{[fields.type]}'  #根据设置的不同的fields.type 输出到不同的kafka的topic。

input.yml

- type: log           #指定输入类型
  backoff: "1s"                #到底之后 多久在检测
  #多行过滤
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
  multiline.negate: true
  multiline.match: after
  paths:           #日志路径
    - /home/logs/systemlog/access/*
  fields:       #给日志设置type
    type: systemlog_access

多行过滤文档

filebeat可以配置processors,来统一对多种日志进行过滤或增强 processor文档

Filebeat可以将运行状况发送到ES监控集群,然后在kibana上监控 文档

Search API

指定查询的索引

2020-03-25 15-50-32.png

一、URI Search(用的不多)

在URI中使用查询参数

2020-03-25 16-01-52.png

2020-03-25 16-09-16.png

2020-03-25 16-10-31.png

2020-03-25 16-12-11.png

2020-03-25 16-13-30.png

二、Request Body Search

官方文档

使用Elasticsearch提供的,基于JSON格式的更加完备的Query Domain Specific Language(DSL)

2020-03-25 15-58-06.png

#ignore_unavailable=true,可以忽略尝试访问不存在的索引“404_idx”导致的报错
POST /movies,404_idx/_search?ignore_unavailable=true
{
  "profile": true,
    "query": {
        "match_all": {}
    }
}
#查询movies分页
POST /kibana_sample_data_ecommerce/_search
{
  "from":10,
  "size":20,
  "query":{
    "match_all": {}
  }
}


#对日期排序
POST kibana_sample_data_ecommerce/_search
{
  "sort":[{"order_date":"desc"}],
  "query":{
    "match_all": {}
  }

}

#source filtering
POST kibana_sample_data_ecommerce/_search
{
  "_source":["order_date"],
  "query":{
    "match_all": {}
  }
}


#脚本字段
GET kibana_sample_data_ecommerce/_search
{
  "script_fields": {
    "new_field": {
      "script": {
        "lang": "painless",
        "source": "doc['order_date'].value+'hello'"
      }
    }
  },
  "query": {
    "match_all": {}
  }
}

#match查询
POST movies/_search
{
  "query": {
    "match": {
      "title": "last christmas"
    }
  }
}

POST movies/_search
{
  "query": {
    "match": {
      "title": {
        "query": "last christmas",
        "operator": "and" #and表示query的两个词必须同时出现
      }
    }
  }
}
#短语查询 Match Phrase
POST movies/_search
{
  "query": {
    "match_phrase": {
      "title":{
        "query": "one love"
      }
    }
  }
}

POST movies/_search
{
  "query": {
    "match_phrase": {
      "title":{
        "query": "one love",
        "slop": 1  #one 与love之间可以存在其他词
      }
    }
  }
}
POST /products/_bulk
{ "index": { "_id": 1 }}
{ "price" : 10,"avaliable":true,"date":"2018-01-01", "productID" : "XHDK-A-1293-#fJ3" }
{ "index": { "_id": 2 }}
{ "price" : 20,"avaliable":true,"date":"2019-01-01", "productID" : "KDKE-B-9947-#kL5" }
{ "index": { "_id": 3 }}
{ "price" : 30,"avaliable":true, "productID" : "JODL-X-1937-#pV7" }
{ "index": { "_id": 4 }}
{ "price" : 30,"avaliable":false, "productID" : "QQPX-R-3956-#aD8" }



#基本语法
POST /products/_search
{
  "query": {
    "bool" : {
      "must" : {
        "term" : { "price" : "30" }
      },
      "filter": {
        "term" : { "avaliable" : "true" }
      },
      "must_not" : {
        "range" : {
          "price" : { "lte" : 10 }
        }
      },
      "should" : [
        { "term" : { "productID.keyword" : "JODL-X-1937-#pV7" } },
        { "term" : { "productID.keyword" : "XHDK-A-1293-#fJ3" } }
      ],
      "minimum_should_match" :1
    }
  }
}

#改变数据模型,增加字段。解决数组包含而不是精确匹配的问题
POST /newmovies/_bulk
{ "index": { "_id": 1 }}
{ "title" : "Father of the Bridge Part II","year":1995, "genre":"Comedy","genre_count":1 }
{ "index": { "_id": 2 }}
{ "title" : "Dave","year":1993,"genre":["Comedy","Romance"],"genre_count":2 }

#must,有算分
POST /newmovies/_search
{
  "query": {
    "bool": {
      "must": [
        {"term": {"genre.keyword": {"value": "Comedy"}}},
        {"term": {"genre_count": {"value": 1}}}

      ]
    }
  }
}

#Filter。不参与算分,结果的score是0
POST /newmovies/_search
{
  "query": {
    "bool": {
      "filter": [
        {"term": {"genre.keyword": {"value": "Comedy"}}},
        {"term": {"genre_count": {"value": 1}}}
        ]

    }
  }
}


#Filtering Context
POST _search
{
  "query": {
    "bool" : {

      "filter": {
        "term" : { "avaliable" : "true" }
      },
      "must_not" : {
        "range" : {
          "price" : { "lte" : 10 }
        }
      }
    }
  }
}


#Query Context
POST /products/_bulk
{ "index": { "_id": 1 }}
{ "price" : 10,"avaliable":true,"date":"2018-01-01", "productID" : "XHDK-A-1293-#fJ3" }
{ "index": { "_id": 2 }}
{ "price" : 20,"avaliable":true,"date":"2019-01-01", "productID" : "KDKE-B-9947-#kL5" }
{ "index": { "_id": 3 }}
{ "price" : 30,"avaliable":true, "productID" : "JODL-X-1937-#pV7" }
{ "index": { "_id": 4 }}
{ "price" : 30,"avaliable":false, "productID" : "QQPX-R-3956-#aD8" }


POST /products/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "productID.keyword": {
              "value": "JODL-X-1937-#pV7"}}
        },
        {"term": {"avaliable": {"value": true}}
        }
      ]
    }
  }
}


#嵌套,实现了 should not 逻辑
POST /products/_search
{
  "query": {
    "bool": {
      "must": {
        "term": {
          "price": "30"
        }
      },
      "should": [
        {
          "bool": {
            "must_not": {
              "term": {
                "avaliable": "false"
              }
            }
          }
        }
      ],
      "minimum_should_match": 1
    }
  }
}


#Controll the Precision
#range范围查询
POST _search
{
  "query": {
    "bool" : {
      "must" : {
        "term" : { "price" : "30" }
      },
      "filter": {
        "term" : { "avaliable" : "true" }
      },
      "must_not" : {
        "range" : {
          "price" : { "lte" : 10 }
        }
      },
      "should" : [
        { "term" : { "productID.keyword" : "JODL-X-1937-#pV7" } },
        { "term" : { "productID.keyword" : "XHDK-A-1293-#fJ3" } }
      ],
      "minimum_should_match" :2
    }
  }
}



POST /animals/_search
{
  "query": {
    "bool": {
      "should": [
        { "term": { "text": "brown" }},
        { "term": { "text": "red" }},
        { "term": { "text": "quick"   }},
        { "term": { "text": "dog"   }}
      ]
    }
  }
}

POST /animals/_search
{
  "query": {
    "bool": {
      "should": [
        { "term": { "text": "quick" }},
        { "term": { "text": "dog"   }},
        {
          "bool":{
            "should":[
               { "term": { "text": "brown" }},
                 { "term": { "text": "brown" }},
            ]
          }

        }
      ]
    }
  }
}


DELETE blogs
POST /blogs/_bulk
{ "index": { "_id": 1 }}
{"title":"Apple iPad", "content":"Apple iPad,Apple iPad" }
{ "index": { "_id": 2 }}
{"title":"Apple iPad,Apple iPad", "content":"Apple iPad" }

#boost
# boost >1 打分相关度相对性提升
#0<boost<1 打分相关度相对性降低
#boost<0 贡献负分
POST blogs/_search
{
  "query": {
    "bool": {
      "should": [
        {"match": {
          "title": {
            "query": "apple,ipad",
            "boost": 1.1
          }
        }},

        {"match": {
          "content": {
            "query": "apple,ipad"
          }
        }}
      ]
    }
  }
}

查询结果

2020-03-25 15-59-0.png

参考极客时间Elasticsearch课程

一、安装
pip install elasticsearch 
二、一个小封装类
#索引类
class ElasticSearchClient(object):
    # TODO:实例和事务化单个node,若需要多个node,需要重构代码
    def __init__(self, filepath="app/conf/conf.ini"):
        #读取es配置
        conf=configparser.ConfigParser()
        conf.read(filepath,encoding='utf-8')
        # TODO:传参

        self.es_servers = [{
            "host": conf.get('Elasticsearch','url'),
            "port": conf.get('Elasticsearch','port')
        }]
    # http_auth是对设置了安全机制的es库需要写入 账号与密码,如果没有设置则不用写这个参数
        self.es_client = elasticsearch.Elasticsearch(hosts=self.es_servers,http_auth=("xxx", "xxxxx")) 

    # TODO:进行创建一个数据库,即index
    def create_index(self, index_name):
        self.es_client.indices.create(index=index_name)
    # TODO:指定map创建一个数据库
    def createindex_by_map(self,index_name,map):
        self.es_client.indices.create(index=index_name,body=map)
    # TODO:进行删除一个数据库,即index
    def delete_es_index(self, index_name):
        self.es_client.indices.delete(index=index_name)

    # 数据库不用进入,也不用退出。


class LoadElasticSearch(object):
    # TODO:对单个index进行增删改查
    def __init__(self, index, doc_type='docx'):
        # TODO:输入单个index的名称
        self.index = index
        self.doc_type = doc_type
        try:
            self.es_client = ElasticSearchClient().es_client
        except Exception as e:
            print(e)
            print('连接es失败,请查看是否连接。')

        if not self.es_client.indices.exists(index=index):
            # 创建Index
            self.es_client.indices.create(index=self.index)

    def set_index_mapping(self, set_mappings):
        # TODO:设置mapping结构
        """
        设置index的mapping,类似于表结构。
        注意!!!!现在仅仅对mapping中的properties参数,其他的参数还很多
        前提为:已有index,并且已自定义分词器,详情见https://blog.csdn.net/u013905744/article/details/80935846
        输入参数举例说明:
            set_mappings = {"answer": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "answerAuthor": {
                        "type": "string"
                    },
                    "answerDate": {
                        "type": "date",
                        "format": "strict_date_optional_time||epoch_millis"//这里出现了复合类型
                    },
                    ...
                    {...
                    }
                }
        """
        mapping = {
            self.doc_type: {
                "properties": set_mappings
            }
        }
        self.es_client.indices.put_mapping(index=self.index, doc_type=self.doc_type, body=mapping)

    def add_date(self, row_obj):
        """
        单条插入ES
        """
        self.es_client.index(index=self.index, doc_type=self.doc_type, body=row_obj)

    def add_date_bulk(self, row_obj_list):
        """
        批量插入ES,输入文本格式为单条插入的list格式
        """
        load_data = []
        i = 1
        bulk_num = 2000  # 2000条为一批
        for row_obj in row_obj_list:
            action = {
                "_index": self.index,
                "_type": self.doc_type,
                "_source": row_obj
            }
            load_data.append(action)
            i += 1
            # 批量处理
            if len(load_data) == bulk_num:
                print('插入', i / bulk_num, '批数据')
                print(len(load_data))
                success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
                del load_data[0:len(load_data)]
                print(success, failed)

        if len(load_data) > 0:
            success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
            del load_data[0:len(load_data)]
            print(success, failed)

    def update_by_id(self, row_obj):
        """
        根据给定的_id,更新ES文档
        :return:
        """

        _id = row_obj.get("_id", 1)
        row_obj.pop("_id")
        self.es_client.update(index=self.index, doc_type=self.doc_type, body={"doc": row_obj}, id=_id)

    def delete_by_id(self, _id):
        """
        根据给定的id,删除文档
        :return:
        """
        self.es_client.delete(index=self.index, doc_type=self.doc_type, id=_id)

    def search_by_query(self, body):
        '''
        根据查询的query语句,来搜索查询内容
        '''
        search_result = self.es_client.search(index=self.index, doc_type=self.doc_type, body=body)
        return search_result
三、如何使用

1.创建索引时指定Mapping

我们在创建索引时,需要给创建的索引指定Mapping,我将Mapping文件放入了一个xxx.json文件中

{
  "settings": {
  #设置副本数
   "number_of_replicas": 1,
     #设置分片
   "number_of_shards": 4,
      #设置分析器 我们采用ik作为tokenizer pinyin作为filter
   "analysis": {
     "analyzer": {
       "my_analyzer":{
       "type":"custom",
       "tokenizer":"ik_max_word",
       "filter":["pinyin_first_letter_and_full_pinyin_filter"]
     }
     },
     "filter": {
       "pinyin_first_letter_and_full_pinyin_filter": {
                    "type" : "pinyin",
                    "keep_first_letter" : "true",
                    "keep_full_pinyin" : "false",
                    "keep_none_chinese" : "true",
                    "keep_original" : "false",
                    "limit_first_letter_length" : 16,
                    "lowercase" : "true",
                    "trim_whitespace" : "true",
                    "keep_none_chinese_in_first_letter" : "true"
                }
     }

   }
 },
 "mappings": {
   "dynamic_templates": [
     {
       "strings":{
           #设定读取到索引中是String类型就设置type为text字段采用我自己设置的分析器,并增加 keyword字段
         "match_mapping_type":"string", 
         "mapping":{
           "type":"text",
           "analyzer":"my_analyzer",
           "fields":{
             "raw":{
               "type":"keyword"
             }

           }
         }
       }
     }
     ]
 }
}

创建代码

mappath="xxxx/xxxx.json"
f=open(mappath,'r',encoding='utf-8')
#读取map
map=json.load(f)
es=ElasticSearchClient()
#创建索引
es.createindex_by_map(indexname,map=map)

2.查询

es_client = LoadElasticSearch(indexname)
search={"query":xxxx}
res = es_client.search_by_query(one_body)

一、什么是Index Templates

帮助设定Mappings和Setting,并按照一定的规则,自动匹配到新创建的索引之上

  • 模板仅在一个索引被新创建时,才会产生作用。修改模板不会影响已创建的索引
  • 你可以设定多个索引模板,这些设置会被“merge”在一起
  • 你可以指定“oder”的数值,控制“merging”的过程

文档

二、Index Template的工作方式

当一个索引被创建时

  • 应用Elasticsearch默认的setting和mapping
  • 应用order数值低的Index Template中的设定
  • 应用order高的Index Template中的设定,之前的设定会被覆盖
  • 应用创建索引时,用户所指定的Setting和Mapping,并覆盖之前模板中的设定
PUT /_template/template_test
{
    "index_patterns" : ["test*"],
    "order" : 1,
    "settings" : {
        "number_of_shards": 1,
        "number_of_replicas" : 2
    },
    "mappings" : {
        "date_detection": false,
        "numeric_detection": true
    }
}
三、什么是Dynamic Template

根据Elasticsearch识别的数据类型,结合字段名称,来动态设定字段类型

  • 所有的字符串类型都设定成Keyword,或者关闭Keyword字段
  • is开头的字段都设置成boolean
  • long_开头的都设置成long类型

文档

四、Dynamic Template设定
  • Dynamic Template是定义在某个索引的Mapping中
  • Template有一个名称
  • 匹配规则是一个数组
  • 为匹配到字段设置Mapping
PUT my_index
{
  "mappings": {
    "dynamic_templates": [
            {
        "strings_as_boolean": {
          "match_mapping_type":   "string",
          "match":"is*",
          "mapping": {
            "type": "boolean"
          }
        }
      },
      {
        "strings_as_keywords": {
          "match_mapping_type":   "string",
          "mapping": {
            "type": "keyword"
          }
        }
      }
    ]
  }
}