Logstash使用指南
简介
Logstash是一款开源的实时数据采集工具。通过配置多种类型的input、fliter、output插件,Logstash支持多类数据源的数据采集、转换,并存储到目的端。
安装
- 检查JDK版本是不是
JAVA 8
以上,检查方式:
java -version
- 登录Elasticsearch官网,选择版本并下载Logstash。如果用户集群是百度智能云Elasticsearch 7.4.2版本,使用以下链接可直接下载
logstash-oss-7.4.2.zip
。
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.4.2.zip
快速使用
下边以用户在BCC上解析日志数据并写入百度智能云Elasticsearch为例,描述Logstash的使用方式。
安装Logstash
- 登录BCC并配置JAVA环境
- 下载并解压Logstash
- 在
config
目录下新建一个配置文件,命名为logstash-es.conf
,格式如下:
input {
...
}
filter {
...
}
output {
...
}
配置input
日志目录路径为/{LOG_PATH}/logs/*.log
,日志格式为:
[2021-01-22T10:21:02,837][INFO ][c.b.e.i.s.TestClass] [test_node1]test log.
[2021-01-22T10:21:02,838][INFO ][c.b.e.i.s.TestClass] [test_node1]test log.
[2021-01-22T10:21:02,838][INFO ][c.b.e.i.s.TestClass] [test_node1]test log.
[2021-01-22T10:26:02,839][ERROR][c.b.e.i.s.TestClass] [test_node1]throws a ERROR:
c.b.e.i.s.TestException: this a ERROR
at c.b.e.i.s.TestClass.test(TestClass.java:387) ~[baidu-test-logstash-7.4.2.jar:?]
at c.b.e.i.s.TestClass.test(TestClass.java:385) ~[baidu-test-logstash-7.4.2.jar:?]
则input配置为:
input {
file {
path => "/{LOG_PATH}/logs/*.log" #日志数据文件路径
start_position => "beginning" #读取文件位置,默认为 end
codec => multiline { #multiline插件。匹配不是以“[”开头的日志数据,并入上一行数据。
pattern => "^\["
negate => true
what => previous
}
}
}
配置filter
针对采集的日志,数据需要解析出日志时间time
、日志级别level
、日志所在类class
、节点名称node
、日志详细信息log_data
,则可以使用grok插件配置filter,使用方式如下:
filter {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:time}\]\[%{LOGLEVEL:level}( )*\]\[%{JAVACLASS:class}( )*\] \[%{DATA:node}\]%{GREEDYDATA:log_data}" }
remove_field => ["tags","path","@version","host"]
}
}
Grok
简介
grok是一种采用组合多个预定义的正则表达式,用来匹配分割文本并映射到关键字的工具。通常用来对日志数据进行预处理。
Logstash内置grok的120个预定义匹配字段,可参考:grok默认预定义匹配字段。grok支持正则表达式和自定义匹配字段规则,可以灵活满足扩展的需求。grok同时支持多条匹配,因此如果有不同格式的日志数据,可以使用多个正则表达式语句同时匹配。
示例
下文是一条http请求日志:
55.3.244.1 GET /index.html 15824 0.043
可以使用如下grok pattern来匹配这种记录:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
以下是结果:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
配置output
输出目的端为百度智能云Elasticsearch,索引名称格式为test_logstash_index_{YYYY.MM.dd}
,用户可以创建模板文件template.json
,格式如下:
{
"template": "test_logstash_index_*",
"settings": {
"index.number_of_shards": 5,
"number_of_replicas": 1,
"index.refresh_interval": "60s"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"class": {
"type": "keyword"
},
"log_data": {
"type": "keyword"
},
"level": {
"type": "keyword"
},
"message": {
"type": "keyword"
},
"node": {
"type": "keyword"
},
"time": {
"type": "date"
}
}
}
}
output配置如下:
output {
elasticsearch {
hosts => ["http://{ES_URL}"]
index => "test_logstash_index_%{+YYYY.MM.dd}"
template => "/{PATH}/template.json"
template_name => "my_template_es"
user => "{USER}"
password => "{PASSWORD}"
}
}
启动Logstash
bin/logstash -f config/logstash-es.conf
启动后,通过以下方式查询ES验证日志写入情况:
POST /test_logstash_index*/_search
{
"query": {
"match_all": {}
}
}
查询结果如下:
[
{
"_index": "test_logstash_index_2021.01.01",
"_type": "_doc",
"_id": "3Sf3KHcBRfCVcyxEVdqW",
"_score": 1,
"_source": {
"message": "[2021-01-22T10:21:02,837][INFO ][c.b.e.i.s.TestClass] [test_node1]test log.",
"node": "test_node1",
"level": "INFO",
"@timestamp": "2021-01-22T07:32:24.927Z",
"time": "2021-01-22T10:21:02,837",
"class": "c.b.e.i.s.TestClass",
"log_data": "test log."
}
},
{
"_index": "test_logstash_index_2021.01.01",
"_type": "_doc",
"_id": "4ScCKXcBRfCVcyxEL9rz",
"_score": 1,
"_source": {
"message": "[2021-01-22T10:21:02,838][INFO ][c.b.e.i.s.TestClass] [test_node1]test log.",
"node": "test_node1",
"level": "INFO",
"@timestamp": "2021-01-22T07:32:24.927Z",
"time": "2021-01-22T10:21:02,838",
"class": "c.b.e.i.s.TestClass",
"log_data": "test log."
}
},
{
"_index": "test_logstash_index_2021.01.01",
"_type": "_doc",
"_id": "3yf3KHcBRfCVcyxEdNqY",
"_score": 1,
"_source": {
"message": "[2021-01-22T10:21:02,838][INFO ][c.b.e.i.s.TestClass] [test_node1]test log.",
"node": "test_node1",
"level": "INFO",
"@timestamp": "2021-01-22T07:32:24.927Z",
"time": "2021-01-22T10:21:02,838",
"class": "c.b.e.i.s.TestClass",
"log_data": "test log."
}
},
{
"_index": "test_logstash_index_2021.01.01",
"_type": "_doc",
"_id": "4Cf4KHcBRfCVcyxEDtpl",
"_score": 1,
"_source": {
"@timestamp": "2021-01-22T07:21:21.141Z",
"message": "[2021-01-22T10:26:02,839][ERROR][c.b.e.i.s.TestClass] [test_node1]throws a ERROR:\nc.b.e.i.s.TestException: this a ERROR\n at c.b.e.i.s.TestClass.test(TestClass.java:387) ~[baidu-test-logstash-7.4.2.jar:?]\n at c.b.e.i.s.TestClass.test(TestClass.java:385) ~[baidu-test-logstash-7.4.2.jar:?]",
"time": "2021-01-22T10:26:02,839",
"level": "ERROR",
"node": "test_node1",
"log_data": "throws a ERROR:\nc.b.e.i.s.TestException: this a ERROR\n at c.b.e.i.s.TestClass.test(TestClass.java:387) ~[baidu-test-logstash-7.4.2.jar:?]\n at c.b.e.i.s.TestClass.test(TestClass.java:385) ~[baidu-test-logstash-7.4.2.jar:?]",
"class": "c.b.e.i.s.TestClass"
}
}
]
Logstash常用插件
输入插件input
插件名称 | 说明 | 详细信息 |
---|---|---|
beats | 从Elastic Beats框架中接收事件。 | Beats input plugin |
elasticsearch | 从Elasticsearch集群中读取数据。 | Elasticsearch input plugin |
generator | 生成随机测试数据。 | Generator input plugin |
file | 从文件中读取数据 | file input plugin |
http | 通过HTTP或HTTPS接收单行或多行事件。 | Http input plugin |
jdbc | 通过JDBC,将任一数据库数据读取到Logstash中。 | Jdbc input plugin |
kafka | 从Kafka主题读取事件。 | Kafka input plugin |
rabbitmq | 从RabbitMQ队列中读取事件。 | Rabbitmq input plugin |
redis | 从Redis实例中读取事件。 | Redis input plugin |
stdin | 从标准输入读取事件。 | Stdin input plugin |
syslog | 在网络上将Syslog消息作为事件读取。 | Syslog input plugin |
tcp | 通过TCP套接字读取事件。 | Tcp input plugin |
udp | 在网络上通过UDP,将消息作为事件读取。 | Udp input plugin |
更多input插件可参考Logstash官网。
编码插件codec
常用编码插件:
插件名称 | 说明 | 详细信息 |
---|---|---|
json | 解码(通过输入)和编码(通过输出)完整的JSON消息。 | Json codec plugin |
json_lines | 解码以新行分隔的JSON流。 | Json_lines codec plugin |
line | 读取面向行的文本数据。 | Line codec plugin |
multiline | 将多行消息合并到单个事件中。 | Multiline codec plugin |
更多codec插件可参考Logstash官网。
过滤器插件filter
插件名称 | 说明 | 详细信息 |
---|---|---|
date | 解析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。 | Date filter plugin |
geoip | 根据GeoIP库为数据中的IP地址提供对应的地理位置信息。 | Geoip filter plugin |
grok | 通过grok正则匹配将数据结构化。 | Grok filter plugin |
json | JSON解析过滤器,将包含JSON的已有字段扩展为Logstash事件中的实际数据结构 | JSON filter plugin |
kv | 处理各种形如key-value格式的数据 | Kv filter plugin |
metrics | 聚合指标 | Metrics filter plugin |
mutate | mutate提供了丰富的基础类型数据处理能力,包括类型转换,字符串处理和字段处理等 | Mutate filter plugin |
ruby | 执行Ruby代码 | Ruby filter plugin |
split | 通过分解事件的一个字段,并将产生的每个值嵌入原始事件的克隆版,从而克隆事件。 被分解的字段可以是字符串或字符串数组。 | Split filter plugin |
xml | XML过滤器。将包含XML的字段,扩展为实际数据结构 | Xml filter plugin |
更多input插件可参考Logstash官网。
输出插件output
插件名称 | 说明 | 详细信息 |
---|---|---|
stdout | 将数据向Linux控制台标准输出。 | Stdout output plugin |
tcp | 通过TCP套接字写入数据。 | Tcp output plugin |
csv | 以逗号分隔(CSV)或其他分隔的格式,将数据写入磁盘。 | Csv output plugin |
elasticsearch | 向Elasticsearch写入数据。 | Elasticsearch output plugin |
收到输出后发送电子邮件。 | Email output plugin | |
file | 向磁盘文件写入数据。 | File output plugin |
更多output插件可参考Logstash官网。
Logstash命令行使用
Logstash 提供了可执行脚本 bin/logstash
,可以方便快速运行,支持以下主要参数:
- -e
快速执行Logstash,一般用于测试使用。
- --config 或 -f
通常会把配置写到配置文件里,然后通过 bin/logstash -f agent.conf
的形式运行Logstash。
此外,Logstash还可以直接用 bin/logstash -f /etc/logstash.d/
来运行,Logstash 会自动读取 /etc/logstash.d/
目录下所有的文本文件,并执行。
- --log 或 -l
Logstash 默认输出日志到标准错误。生产环境下可以通过 bin/logstash -l logs/logstash.log
命令来统一存储日志。
- --filterworkers 或 -w
Logstash会为过滤插件(filter)运行多个线程。例如: bin/logstash -w 5
为Logstash的filter会运行 5 个线程同时工作。
注意:Logstash目前不支持输入插件的多线程,而输出插件的多线程需要在配置内部设置。