ELK: Metricbeat

Metricbeat简介

1.定期收集操作系统、软件或服务的指标数据
2.存储在Elasticsearch中进行实时的分析

Logs VS Metrics

1.Logs:用于记录离散的事件, 具有随机性. 例如, 应用程序的调试信息或错误信息等
2.Metrics:用于记录度量或可聚合的数据, 具有计划性. 例如, 服务的相应时长等.

Metricbeat组成

1.Module:Metribeat收集指标的对象, 比如linux、windows、mysql等 2.Metricset:Metribeat收集指标集合, 该集合以减少收集指标调用次数为划分依据. 1个Module可以有多个Metricset

Metribeat Event

1.Event的通用结构如下

Metricbeat Module

1.Metricbeat有大量的module:
<1>System
<2>Redis
<3>MySQL
<4>Kafak
<5>Zookeeper
<6>Docker
<7>Kubernetes …
2.每个Module都有自己的Metricsets, 比如System Module:
<1>core
<2>cpu
<3>diskio
<4>filesystem
<5>fsstat
<6>load
<7>memory …

Metricbeat配置

Metric Module实践

1.安装
2.配置metricbeat.yml
3.配置模板-index template
4.配置Kibana Dashboard
5.运行

ELK: Beats Introduction

Filebeat运行

Filebeat简介

1.读取日志文件, 但不做数据的解析处理
2.保证数据”At Least Once” 至少被读取一次, 即数据不会丢失
3.其他能力
<1>处理多行数据
<2>解析JSON格式数据
<3>简单的过滤功能

Filebeat使用流程:

1.安装
2.配置filebeat.yml
3.配置模板-index template
4.配置Kibana Dashboards
5.运行

Filebeats配置文件:

Filebeat配置模板-index template

1.建议通过ES API来创建template, 如果不使用filebeat module功能
2.使用filebeat module功能的话就要从filebeat自带的template来设置

Filebeat配置Kibana Dashboard

Filebeat集成Kibana Dashboard, 用于快速展示数据
1.结合Modules使用
2.一次性全部导入

Filebeat运行

Filebeat常见架构

1.通过filebeat采集日志, 将日志发送给logstash进行日志处理(grok解析等), 接下来将数据发送给es, es将数据进行存储查询, 最后通过kibana进行可是展示
2.通过filebeat采集日志, 通过elasticsearch进行数据解析、存储、查询(Ingest Node), 最后给kibana进行可视化分析

Elasticsearch Ingest Node

1.是5.x新增的一个节点类型
<1>在数据写入es前(bulk/index操作)对数据进行处理
<2>可设置为独立的ingest node专门进行数据转换处理(node.ingest: true) <3>api endpoint为pipeline

Ingest Node-Pipeline

1.Pipeline由一些列processor组成, 类似logstash的filer plugin

2.Pipeline的API比较简单, 主要有如下4个:
<1>创建PUT
<2>获取GET
<3>删除DELETE
<4>模拟调试SIMULATE

3.Processor对应Logstash的Filter Plugin, 基本都有相应的关键字
<1>Convert
<2>Grok
<3>Date
<4>Gsub
<5>Join
<6>Json
<7>Remove
<8>Script对应Ruby code
4.注意处理解析错误的情况

5.日志处理中常用到的es的插件(可以直接通过ingest进行数据处理):

6.pipeline的使用比较简单, 在索引相关的api中都有pipeline的参数可以指定

7.filebeat可以在output.elasticsearch中指定pipeline

Filebeat Modules

1.提供很多开箱即用的module

2.配置位于module文件夹中, 如nginx module(module/nginx)的相关配置如下
<1>Prospector配置:位于access/config/nginx-access.yml
<2>Index Template配置:fields.yml
<3>Ingest Pipeline配置:access/ingest/default.json
<4>Kibana Dashboards配置:kibana

ELK: Logstash Recommendation

调试的配置建议

1.http做input, 方便输入测试数据, 并且可以结合reload特性(stdin无法reload)
2.stdout做output, codec使用rubydebug, 即时查看解析结果
3.测试错误输入情况的输出, 以便对错误情况进行处理

4.@metadata特殊字段, 其内容不会输出在output中
<1>适合用来存储做条件判断、临时存储的字段
<2>相比remove_field有一定的性能提升

apache日志解析配置文件

CSV导入

监控运维api

1.logstash提供了丰富的api来查询logstash当前状态 <1>http://127.0.0.1:9600
<2>http://127.0.0.1:9600/_node
<3>http://127.0.0.1:9600/_node/stats <4>http://127.0.0.1:9600/_node/hot_threads

监控运维x-pack

通过x-pack监控logstash

ELK: Logstash Plugin Introduction

Input Plugin

input插件指定数据输入源, 一个pipeline可以有多个input插件: stdin, file, kafka

Input Plugin-stdin

最简单的输入, 从标准输入读取数据, 通用配置为:
1.codec类型为codec
2.type类型为string, 自定义该事件的类型, 可用于后续判断
3.tags类型为array, 自定义该事件的tag, 可用于后续判断
4.add_field类型为hash, 为该事件添加字段

Input Plugin-file

1.从文件读取数据, 如常见的日志文件. 文件读取通常要解决几个问题:  
<1>文件内容如何只被读取一次?即重启LS时, 从上次读取的位置继续: sincedb  
<2>如何即时读取到文件的新内容?:定时检查文件是否有更新  
<3>如何发现新文件并进行读取?:可以, 定时检查新文件  
<4>如果文件发生了归档(rotation)操作, 是否影响当前的内容读取?:不影响, 被归档的文件内容可以继续被读取
2.基于Filewatch的ruby库实现的

Input Plugin-file配置

1.path类型维数组, 指明读取的文件路径, 基于glob匹配语法  
<1> path => [“/var/log/**/*.log”, “/var/log/message”] 
2.exclue类型为数组排除不想监听的文件规则, 基于glob匹配语法 
<1>exclude => “*.gz”
3.sincedb_path类型为字符串, 记录sincedb文件路径
4.start_position类型为字符串, beginning or end, 是否从头读取文件5.stat_interval类型为数值, 单位秒, 定时检查文件是否有更新, 默认1秒6.discover_interval类型为数值, 单位秒, 定时检查是否有新文件待读取, 默认15秒
7.ignore_older类型为数值, 单位秒, 扫描文件列表时, 如果该文件上次更改时间超过设定的时长, 则不做处理, 但依然会监控是否有新内容, 默认关闭8.close_older类型为数值, 单位秒, 如果监听的文件在超过该设定时间内没有更新内容, 会被关闭文件句柄, 释放资源, 但依然会监控是否有新内容, 默认3600秒, 即1个小时

Input Plugin-glob匹配语法

1.主要包含如下几种匹配符: 
<1>*匹配任意字符, 但不匹配以.开头的隐藏文件, 匹配这类文件时要使用.*来匹配 
<2>**递归匹配子目录 
<3>?匹配单一字符 
<4>[]匹配多个字符, 比如[a-z]、[^a-z] 
<5>{}匹配多个单词, 比如{foo, bar, hello} 
<6>\转义符号

Input Plugin-kafka

kafka是最流行的消息队列, 也是Elastic Stack架构中常用的, 使用相对简单

Codec Plugin

Codec Plugin作用于input和output plugin, 负责将数据在原始与Logstash Event之间转换, 常见的codec有:
<1>plain 读取原始内容
<2>dots 将内容简化为点进行输出
<3>rubydebug将Logstash Events按照ruby格式输出
<4>line处理带有换行符的内容
<5>json处理json格式的内容
<6>multiline处理多行数据的内容

Codec Plugin-multiline

当一个Event的message由多行组成时, 需要使用该codec, 常见的情况是堆栈日志信息的处理, 如下:

1.主要设置参数:  
<1>pattern:设置行匹配的正则表达式, 可以使用grok <2>what:previous|next, 如果匹配成功, 那么匹配行是归属上一个事件还是下一个事件  
<3>negate:true or false是否对pattern的结果取反

Filter Plugin

Filter是Logstash功能强大的主要原因, 他可以对Logstash Event进行丰富的处理, 比如解析数据、删除字段、类型转换等等, 常见的有如下几个:
<1>date日期解析
<2>grok正则匹配解析
<3>dissect分隔符解析
<4>mutate对字段作处理, 比如重命名、删除、替换等
<5>json按照json解析字段内容到指定字段中
<6>geoip增加地理位置数据
<7>ruby利用ruby代码来动态修改Logstash Event

Filter Plugin-date

将日期字符串解析为日期类型, 然后替换@timestamp字段或者指定其他字段

1.match
<1>类型为数组, 用于指定日期匹配的格式, 可以一次指定多种日期格式 <2>match => match => [“logdate” => “MMM dd yyyy HH:mm:sss”, “MMM d yyyy HH:mm:ss”, “ISO8601”]
2.target
<1>类型为字符串, 用于指定赋值的字段名, 默认是@timestamp
3.timezone
<1>类型为字符串, 用于指定时区

Filter Plugin-grok

1.Grok语法如下:
<1>%{SYNTAX:SEMANTIC}
<2>SYNTAX为grok pattern的名称, SEMANTIC为赋值字段名称
<3>%{NUMBER:duration}可以匹配数值类型, 但grok匹配出的内容都是字符串类型, 可以通过在最后指定为int或者float来强制转换类型:%{NUMBER:duration:float}
2.常见的一些parrtern利于编写匹配规则
3.https://github.com/logstash-plugins/logstash-patterns-core/treemaster/patterns

自定义grok pattern

1.pattern_definitions参数, 以键值对的方式定义pattern名称和内容2.pattern_dir参数, 以文件的形式被读取

3.match匹配多种样式

4.overwrite

5.tag_on_failure 默认是_grokparsefailure, 可以基于此做判断

Grok调试建议

1.正则表达式
<1>https://www.debuggex.com
<2>http://www.regexr.com
2.grok调试
<1>http://grokdebug.herokuapp.com
<2>x-pack grok debugger

Filter Plugin-dissect

1.基于分隔符原理解析数据, 解决grok解析时消耗过多cpu资源的问题

2.dissect的应用场景有一定的局限性:
<1>主要用于每行格式相似且分隔符明确简单的场景
3.dissect语法较为简单, 由一些列字段(field)和分隔符(delimiter)组成
<1>%{}字段
<2>%{}之间是分隔符

4.dissect可以自动处理空的匹配值

5.dissect分割后的字段值都是字符串, 可以使用convert_datatype属性进行类型转换

Filter Plugin-Mutate

1.使用最频繁的插件, 可以对字段进行各种操作, 比如重命名、删除、替换、更新等, 主要操作有:
<1>convert类型转换
<2>gsub 字符串替换
<3>split/join/merge 字符串切割、数组合并为字符串、数组合并为数组 <4>rename 字段重命名
<5>update/replace 字段内容更新或替换
<6>remove_field 删除字段
2.实现字段类型的转换, 类型为hash, 仅支持转换为integer, float, string和boolean

3.对字段内容进行替换, 类型为数组, 每3项为一个替换配置

4.字符串切割为数组

5.字符串拼接

6.将两个数组合并为1个数组, 字符串会被转为1个元素的数组进行操作

7.字段重命名

8.update/replace
<1>更新字段内容, 区别在于update只在字段存在时生效, 而replace在字段不存在时会执行新增字段的操作

9.删除字段

Filter Plugin-json

将字段内容为json格式的数据进行解析

Filter Plugin-geoip

常用的插件, 根据ip地址提供对应的地域信息, 比如经纬度、城市名等, 方便进行地理数据分析

Filter Plugin-ruby

最灵活的插件, 可以以ruby语言来随心所欲的修改Logstash Event对象

Output Plugin

负责将Logstash Event输出, 常见的插件如下:
<1>stdout
<2>file
<3>elasticsearch


Output Plugin-stdout

输出到标准输出, 多用于调试

Output Plugin-file

输出到文件, 实现将分散在多地的文件统一到一处的需求, 比如将所有web机器的日志收集到一个文件中, 便于查询

Output Plugin-elasticsearch

输出到elasticsearch, 是最常用的插件, 基于http协议实现

注意查看官方文档!

ELK: Getting Start In Logstash

架构简介

Input(数据采集)->Filter(数据解析/转换)->Output(数据输出)

Pipeline

1.input-filter-output的3阶段处理流程
2.队列管理
3.插件生命周期管理

Logstash Event

1.内部流转的数据表现形式
2.原始数据在input被装换为Event, 在output event被转换为目标格式数据
3.在配置文件中可以对Event中的属性进行增删改查

Life Of An Event

1.Input从日志文件读取数据
2.被数据的数据经过JSON Codec将数据变成Logstash Event 3.Logstash Event通过Queue流入到某个Worker Threads中
4.数据在某个Worker Threads中的Batcher, 当Batcher达到某个条件后, 将数据发送给Filter
5.Filter将数据发送给Output, Output进行输出
6.最后发送ACK发送给Queue, 告知处理了哪些数据

Queue简介

1.In Memory  
<1>无法处理进程Crash、机器宕机等情况, 会导致数据丢失 2.Persistent Queue In Disk  
<1>可处理进程Crash等情况, 保证数据不丢失  
<2>保证数据至少消费一次  
<3>充当缓冲区, 可以代替Kafka等消息队列的作用

Persistent Queue简单流程

1.数据从Input进入到Persistent Queue
2.Persistent Queue在磁盘上备份数据, Persistent Queue返回结果给Input(需Input Plugins支持Event Response)
3.数据从Persistent Queue发送给Filter/Output
4.Filter/Output收到数据处理完后返回ACK给Persistent Queue 5.Persistent Queue收到ACK后, 把磁盘上的数据清理掉, 保证数据不丢失

Persistent Queue的基本配置

1.queue.type:persisted:默认是memory
2.queue.max_bytes:4gb:队列存储最大数据量

线程

1.Input Thread 数据读取线程
2.Pipeline Worker Thread 数据处理线程

线程相关配置

1.pipeline.workers | -w  
<1>pipeline线程数, 即filter_output的处理线程数, 默认是CPU核数 2.pipeline.batch.size | -b  
<1>Batcher一次批量获取的待处理文档数, 默认125, 可以根据输出进行调整, 越大会占用越多的heap空间, 可以通过jvm.options进行调整 3.pipeline.batch.delay | -u  
<1>Batcher等待的时长, 单位为ms

Logstash配置文件

1.logstash设置相关的配置文件(在conf文件夹中, setting file)  <1>logstash.yml logstash相关配置, 比如node.name、path.data、pipeline.workers、queue.type等, 这其中的配置可以被命令行参数中的相关参数覆盖  
<2>jvm.options修改jvm的相关参数, 比如修改heap size等
2.pipeline配置文件  
<1>定义数据处理流程的文件, 以.conf结尾

logstash.yml

支持如下两种形式:

logstash.yml配置项

1.node.name:节点名, 便于识别
2.path.data: 持久化存储数据的文件夹, 默认是logstash home目录下的data
3.path.config:设定pipeline配置文件的目录
4.path.log:设定pipeline日志文件的目录
5.pipeline.workers:设定pipeline的线程数(filter + output), 优化的常用项
6.pipeline.batch.size/delay:设定批量处理数据的数目和延迟 7.queue.type:设定队列类型, 默认是memory
8.queue.max_bytes:队列总容量, 默认是1g

logstash命令行配置项

1.–node.name
2.-f –path.config pipeline路径, 可以是文件或者文件夹
3.-path.settings logstash配置文件夹路径, 其中包含logstash.yml
4.-e –config.string 指明pipeline内容, 多用于测试使用
5.-w –pipeline.batch.size
6.–path.data
7.–debug
8.-t –config.test_and_exit 测试

logstash配置方式建议

1.线上环境推荐采用配置文件的方式来设定logstash的相关配置, 这样可以减少犯错的机会, 而且文件便于进行版本化管理
2.命令行形式多采用来进行快速的配置测试、验证、检查等

logstash多实例运行方式

1.bin/logstash –path.settings instance1
2.bin/logstash –path.settings instance2
3.不同instance中修改logstash.yml, 自定义path.data, 确保其不同即可

pipeline配置

用于配置input、filter和output插件, 结构如下:

pipeline配置语法

1.主要有如下的数值类型:
<1>布尔类型Boolean: isFailed=>true
<2>数值类型Number: port=>33
<3>字符串类型String: name=>”Hello World”
<4>数组Array/List
users => [{id=>1, name=>bob}, {id=>2, name=>tim}]
path =>  [“/var/log/messages”, “/var/log/*.log”]
<5>哈希类型Hash
match => {   
“field1” => “value1”,
    “field2” => “value2”
}
<6>注释: #
<7>在配置中可以引用Logstash Event的属性(字段), 主要有如下两种方式:
(1)直接引用字段值;
(2)在字符串中以springf方式引用

直接引用字段值Field Reference

使用[]即可, 嵌套字段写多层[]即可

字符串中引用字段值sprintf

使用%{}来实现

pipeline判断语句

1.支持条件判断语法, 主要格式如下

2.表达式主要包含如下操作符:
<1>比较: ==、 !=、 <、>、<=、>=
<2>正则是否匹配: =~、!~
<3>包含(字符串或者数组): in、not in
<4>布尔操作符: and、or、nand、xor、!
<5>分组操作符:()