基于ELK构建日志分析系统
所有文档
menu
没有找到结果,请重新输入

Elasticsearch BES

基于ELK构建日志分析系统

介绍

日志是一个系统不可缺少的组成部分,日志记录系统产生的各种行为,基于日志信息可以对系统进行问题排查,性能优化,安全分析等工作。 使用ELK(Elasticsearch、Logstash、Kibana) 结合Filebeat与Kafka构建日志分析系统,可以方便的对日志数据进行采集解析存储,以及近实时的查询分析。

各组件在日志系统中的职责与关系如下所示:

image.png

  1. Filebeat:采集原始日志数据,输出数据到Kafka;
  2. Kafka:作为消息队列,存储原始日志数据;
  3. Logstash:消费Kafka中数据,解析过滤后,将其存储到Elasticsearch中;
  4. Elasticsearch:存储用于分析的日志数据,并提供查询、聚合的底层能力;
  5. Kibana:提供日志分析的可视化界面;

操作流程:

  1. 环境准备

    包括:创建百度Elasticsearch(简称BES)集群、Kibana服务,安装Filebeat、Logstash、Kafka。

  2. 配置Filebeat

    配置Filebeat的input为系统日志,output为Kafka,将日志数据采集到Kafka的指定topic中。

  3. 配置Logstash

    配置Logstash的input为Kafka,output为BES,使用Logstash消费topic中的数据并传输到BES中。

  4. 查看日志消费状态

    在消息队列Kafka中查看日志数据的消费的状态,验证日志数据是否采集成功。

  5. 通过Kibana查看日志数据

    在Kibana控制台的Discover页面,查询采集的日志数据。

环境准备

  1. 创建BES集群

    参考集群创建中的内容,完成BES集群的创建。

    本文档中创建的BES集群选择7.4.2版本,Filebeat组件、Logstash组件使用的也是7.4.2版本。

  2. 创建Kibana服务

    在完成BES集群创建后,除了已购买的BES节点,默认会附赠一个单节点的Kibana服务(Kibana节点支持按需自定义配置),如图所示:

    image.png

    通过上图连接信息中所示的Kibana HTTP URL可以直接访问Kibana服务。

    Kibana用户名密码与BES服务的用户名密码相同,详见账号使用说明

  3. 安装Filebeat

    下载7.4.2的oss版本Filebeat;

    wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-oss-7.4.2-linux-x86_64.tar.gz

    解压安装Filebeat;

    tar -zxvf filebeat-oss-7.4.2-linux-x86_64.tar.gz
  4. 安装Logstash

    下载7.4.2的oss版本Logstash;

    wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.4.2.tar.gz

    解压安装Logstash;

    tar -zxvf logstash-oss-7.4.2.tar.gz
  5. 安装、配置Kafka服务

    5.1 下载Zookeeper(Kafka服务依赖Zookeeper);

    wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz

    5.2 解压安装Zookeeper;

    tar -zxvf apache-zookeeper-3.6.4-bin.tar.gz

    5.3 配置Zookeeper;

    vi conf/zoo.cfg

    zoo.cfg内容如下所示:

    tickTime=2000
    dataDir=/your_path/apache-zookeeper-3.6.4-bin/data
    clientPort=2181
    initLimit=5
    syncLimit=2
    admin.serverPort=8888

    配置参数说明,如下所示:

    参数名称 参数说明
    tickTime 服务器之间或客户端与服务器之间维持心跳的时间间隔
    dataDir 数据文件目录
    clientPort 客户端连接端口
    initLimit Leader-Follower初始通信时限
    syncLimit Leader-Follower同步通信时限
    admin.serverPort 内置Jetty绑定的port

    5.4 启动Zookeeper;

    bin/zkServer.sh start

    5.5 下载Kafka;

    wget https://archive.apache.org/dist/kafka/2.0.1/kafka_2.12-2.0.1.tgz

    5.6 解压安装Kafka;

    tar -zxvf kafka_2.12-2.0.1.tgz

    5.7 配置Kafka;

    cd kafka_2.12-2.0.1
    vi config/server.properties

    server.properties内容如下所示:

    broker.id=0
    zookeeper.connect=localhost:2181

    配置参数说明,如下所示:

    参数名称 参数说明
    broker.id Kafka集群实例的唯一标识
    zookeeper.connect Zookeeper的连接地址

    5.8 启动Kafka;

    bin/kafka-server-start.sh config/server.properties

    5.9 创建名称为 logdata 的 topic,用于接收采集数据,命令示例如下所示:

    bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 20 --topic logdata

    命令参数说明,如下所示:

    参数名称 参数说明
    zookeeper Zookeeper的连接地址
    topic topic的名称
    replication-factor topic的副本数
    partitions topic的分区数量

配置Filebeat

1.创建Filebeat采集配置文件;

 cd filebeat-oss-7.4.2-linux-x86_64
 vi filebeat.kafka.yml

filebeat.kafka.yml内容如下:

 filebeat.inputs:
   - type: log
     enabled: true
     paths:
         - /your_path/*.log
 
 output.kafka:
     hosts: ["**.**.**.**:9092"]
     topic: logdata
     version: 2.0.1

配置参数说明,如下所示:

参数名称 参数说明
type 输入类型,设置为log,表示输入源为日志
enabled input处理器的开关
paths 需要监控的日志文件的路径
hosts 消息队列Kafka的地址
topic 日志输出到消息队列Kafka的topic
version Kafka的版本

2.启动Filebeat;

./filebeat -e -c filebeat.kafka.yml  

配置Logstash

1.创建配置文件logstash.conf;

cd logstash-oss-7.4.2
vi logstash.conf

logstash.conf内容如下:

input {
   kafka {
     bootstrap_servers => "127.0.0.1:9092"
     group_id => "log-data"
     topics => ["logdata"] 
     codec => json
  }
}
filter {
}
output {
  elasticsearch {
    hosts => "http://es_ip:es_http_port"
    user =>"your username"
    password =>"your password"
    index => "kafka‐%{+YYYY.MM.dd}"
  }
}

配置参数说明,如下所示:

参数名称 参数说明
bootstrap_servers Kafka的地址
group_id Kafka消费者组id
topics Kafka的topic名称
codec 数据格式
hosts BES服务地址
user BES账号
password BES账号的密码
index 数据写入的索引

2.启动Logstash;

bin/logstash -f logstash.conf 

更多Logstash的使用说明,详见Logstash使用指南

查看日志消费状态

使用如下命令查看消费状态;

bin/kafka-consumer-groups.sh  --bootstrap-server 127.0.0.1:9092 --group log-data --describe

命令参数说明,如下所示:

参数名称 参数说明
bootstrap_servers Kafka的地址
group Kafka消费者组

执行结果如下所示:

image.png

结果属性说明,如下所示:

参数名称 参数说明
topic Kafka的topic名称
partition topic的分区id
current-offset consumer消费的具体位移
log-end-offset partition的最高位移
lag 堆积量
consumer-id 消费者id

通过Kibana查看日志数据

完成以下的配置后,即可通过Kibana查看日志数据。

1.创建索引模式

在左侧导航栏,点击Management(下图中红框位置)后,在Kibana区域点击Index Patterns(下图中蓝框位置),然后再点击Create index pattern按钮(下图绿框位置)

image.png

在Index pattern处(下图红框位置)填入kafka*,然后点击点击Next Step按钮(下图中蓝框位置)

image.png

为Time Filter field name选择@timestamp字段(下图中红框位置),然后点击Create index pattern按钮(下图中蓝框位置)

image.png

2.查看日志数据

在左侧导航栏,点击Discover(下图中红框位置)后,即可以查询日志数据

image.png

更多Kibana的使用说明,详见Kibana使用指南中的内容。

上一篇
开发指南
下一篇
典型实践