ELK: Logstash Plugin Introduction

Input Plugin

input插件指定数据输入源, 一个pipeline可以有多个input插件: stdin, file, kafka

Input Plugin-stdin

最简单的输入, 从标准输入读取数据, 通用配置为:
1.codec类型为codec
2.type类型为string, 自定义该事件的类型, 可用于后续判断
3.tags类型为array, 自定义该事件的tag, 可用于后续判断
4.add_field类型为hash, 为该事件添加字段

Input Plugin-file

1.从文件读取数据, 如常见的日志文件. 文件读取通常要解决几个问题:  
<1>文件内容如何只被读取一次?即重启LS时, 从上次读取的位置继续: sincedb  
<2>如何即时读取到文件的新内容?:定时检查文件是否有更新  
<3>如何发现新文件并进行读取?:可以, 定时检查新文件  
<4>如果文件发生了归档(rotation)操作, 是否影响当前的内容读取?:不影响, 被归档的文件内容可以继续被读取
2.基于Filewatch的ruby库实现的

Input Plugin-file配置

1.path类型维数组, 指明读取的文件路径, 基于glob匹配语法  
<1> path => [“/var/log/**/*.log”, “/var/log/message”] 
2.exclue类型为数组排除不想监听的文件规则, 基于glob匹配语法 
<1>exclude => “*.gz”
3.sincedb_path类型为字符串, 记录sincedb文件路径
4.start_position类型为字符串, beginning or end, 是否从头读取文件5.stat_interval类型为数值, 单位秒, 定时检查文件是否有更新, 默认1秒6.discover_interval类型为数值, 单位秒, 定时检查是否有新文件待读取, 默认15秒
7.ignore_older类型为数值, 单位秒, 扫描文件列表时, 如果该文件上次更改时间超过设定的时长, 则不做处理, 但依然会监控是否有新内容, 默认关闭8.close_older类型为数值, 单位秒, 如果监听的文件在超过该设定时间内没有更新内容, 会被关闭文件句柄, 释放资源, 但依然会监控是否有新内容, 默认3600秒, 即1个小时

Input Plugin-glob匹配语法

1.主要包含如下几种匹配符: 
<1>*匹配任意字符, 但不匹配以.开头的隐藏文件, 匹配这类文件时要使用.*来匹配 
<2>**递归匹配子目录 
<3>?匹配单一字符 
<4>[]匹配多个字符, 比如[a-z]、[^a-z] 
<5>{}匹配多个单词, 比如{foo, bar, hello} 
<6>\转义符号

Input Plugin-kafka

kafka是最流行的消息队列, 也是Elastic Stack架构中常用的, 使用相对简单

Codec Plugin

Codec Plugin作用于input和output plugin, 负责将数据在原始与Logstash Event之间转换, 常见的codec有:
<1>plain 读取原始内容
<2>dots 将内容简化为点进行输出
<3>rubydebug将Logstash Events按照ruby格式输出
<4>line处理带有换行符的内容
<5>json处理json格式的内容
<6>multiline处理多行数据的内容

Codec Plugin-multiline

当一个Event的message由多行组成时, 需要使用该codec, 常见的情况是堆栈日志信息的处理, 如下:

1.主要设置参数:  
<1>pattern:设置行匹配的正则表达式, 可以使用grok <2>what:previous|next, 如果匹配成功, 那么匹配行是归属上一个事件还是下一个事件  
<3>negate:true or false是否对pattern的结果取反

Filter Plugin

Filter是Logstash功能强大的主要原因, 他可以对Logstash Event进行丰富的处理, 比如解析数据、删除字段、类型转换等等, 常见的有如下几个:
<1>date日期解析
<2>grok正则匹配解析
<3>dissect分隔符解析
<4>mutate对字段作处理, 比如重命名、删除、替换等
<5>json按照json解析字段内容到指定字段中
<6>geoip增加地理位置数据
<7>ruby利用ruby代码来动态修改Logstash Event

Filter Plugin-date

将日期字符串解析为日期类型, 然后替换@timestamp字段或者指定其他字段

1.match
<1>类型为数组, 用于指定日期匹配的格式, 可以一次指定多种日期格式 <2>match => match => [“logdate” => “MMM dd yyyy HH:mm:sss”, “MMM d yyyy HH:mm:ss”, “ISO8601”]
2.target
<1>类型为字符串, 用于指定赋值的字段名, 默认是@timestamp
3.timezone
<1>类型为字符串, 用于指定时区

Filter Plugin-grok

1.Grok语法如下:
<1>%{SYNTAX:SEMANTIC}
<2>SYNTAX为grok pattern的名称, SEMANTIC为赋值字段名称
<3>%{NUMBER:duration}可以匹配数值类型, 但grok匹配出的内容都是字符串类型, 可以通过在最后指定为int或者float来强制转换类型:%{NUMBER:duration:float}
2.常见的一些parrtern利于编写匹配规则
3.https://github.com/logstash-plugins/logstash-patterns-core/treemaster/patterns

自定义grok pattern

1.pattern_definitions参数, 以键值对的方式定义pattern名称和内容2.pattern_dir参数, 以文件的形式被读取

3.match匹配多种样式

4.overwrite

5.tag_on_failure 默认是_grokparsefailure, 可以基于此做判断

Grok调试建议

1.正则表达式
<1>https://www.debuggex.com
<2>http://www.regexr.com
2.grok调试
<1>http://grokdebug.herokuapp.com
<2>x-pack grok debugger

Filter Plugin-dissect

1.基于分隔符原理解析数据, 解决grok解析时消耗过多cpu资源的问题

2.dissect的应用场景有一定的局限性:
<1>主要用于每行格式相似且分隔符明确简单的场景
3.dissect语法较为简单, 由一些列字段(field)和分隔符(delimiter)组成
<1>%{}字段
<2>%{}之间是分隔符

4.dissect可以自动处理空的匹配值

5.dissect分割后的字段值都是字符串, 可以使用convert_datatype属性进行类型转换

Filter Plugin-Mutate

1.使用最频繁的插件, 可以对字段进行各种操作, 比如重命名、删除、替换、更新等, 主要操作有:
<1>convert类型转换
<2>gsub 字符串替换
<3>split/join/merge 字符串切割、数组合并为字符串、数组合并为数组 <4>rename 字段重命名
<5>update/replace 字段内容更新或替换
<6>remove_field 删除字段
2.实现字段类型的转换, 类型为hash, 仅支持转换为integer, float, string和boolean

3.对字段内容进行替换, 类型为数组, 每3项为一个替换配置

4.字符串切割为数组

5.字符串拼接

6.将两个数组合并为1个数组, 字符串会被转为1个元素的数组进行操作

7.字段重命名

8.update/replace
<1>更新字段内容, 区别在于update只在字段存在时生效, 而replace在字段不存在时会执行新增字段的操作

9.删除字段

Filter Plugin-json

将字段内容为json格式的数据进行解析

Filter Plugin-geoip

常用的插件, 根据ip地址提供对应的地域信息, 比如经纬度、城市名等, 方便进行地理数据分析

Filter Plugin-ruby

最灵活的插件, 可以以ruby语言来随心所欲的修改Logstash Event对象

Output Plugin

负责将Logstash Event输出, 常见的插件如下:
<1>stdout
<2>file
<3>elasticsearch


Output Plugin-stdout

输出到标准输出, 多用于调试

Output Plugin-file

输出到文件, 实现将分散在多地的文件统一到一处的需求, 比如将所有web机器的日志收集到一个文件中, 便于查询

Output Plugin-elasticsearch

输出到elasticsearch, 是最常用的插件, 基于http协议实现

注意查看官方文档!

发表评论

电子邮件地址不会被公开。 必填项已用*标注