基于ELK构建日志分析系统
所有文档

          Elasticsearch BES

          基于ELK构建日志分析系统

          介绍

          日志是一个系统不可缺少的组成部分,日志记录系统产生的各种行为,基于日志信息可以对系统进行问题排查,性能优化,安全分析等工作。 使用ELK(Elasticsearch、Logstash、Kibana) 结合Filebeat与Kafka构建日志分析系统,可以方便的对日志数据进行采集解析存储,以及近实时的查询分析。

          各组件在日志系统中的职责与关系如下所示:

          image.png

          1. Filebeat:采集原始日志数据,输出数据到Kafka;
          2. Kafka:作为消息队列,存储原始日志数据;
          3. Logstash:消费Kafka中数据,解析过滤后,将其存储到Elasticsearch中;
          4. Elasticsearch:存储用于分析的日志数据,并提供查询、聚合的底层能力;
          5. Kibana:提供日志分析的可视化界面;

          操作流程:

          1. 环境准备

            包括:创建百度Elasticsearch(简称BES)集群、Kibana服务,安装Filebeat、Logstash、Kafka。

          2. 配置Filebeat

            配置Filebeat的input为系统日志,output为Kafka,将日志数据采集到Kafka的指定topic中。

          3. 配置Logstash

            配置Logstash的input为Kafka,output为BES,使用Logstash消费topic中的数据并传输到BES中。

          4. 查看日志消费状态

            在消息队列Kafka中查看日志数据的消费的状态,验证日志数据是否采集成功。

          5. 通过Kibana查看日志数据

            在Kibana控制台的Discover页面,查询采集的日志数据。

          环境准备

          1. 创建BES集群

            参考集群创建中的内容,完成BES集群的创建。

            本文档中创建的BES集群选择7.4.2版本,Filebeat组件、Logstash组件使用的也是7.4.2版本。

          2. 创建Kibana服务

            在完成BES集群创建后,除了已购买的BES节点,默认会附赠一个单节点的Kibana服务(Kibana节点支持按需自定义配置),如图所示:

            image.png

            通过上图连接信息中所示的Kibana HTTP URL可以直接访问Kibana服务。

            Kibana用户名密码与BES服务的用户名密码相同,详见账号使用说明

          3. 安装Filebeat

            下载7.4.2的oss版本Filebeat;

            wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-oss-7.4.2-linux-x86_64.tar.gz

            解压安装Filebeat;

            tar -zxvf filebeat-oss-7.4.2-linux-x86_64.tar.gz
          4. 安装Logstash

            下载7.4.2的oss版本Logstash;

            wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.4.2.tar.gz

            解压安装Logstash;

            tar -zxvf logstash-oss-7.4.2.tar.gz
          5. 安装、配置Kafka服务

            5.1 下载Zookeeper(Kafka服务依赖Zookeeper);

            wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz

            5.2 解压安装Zookeeper;

            tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz

            5.3 配置Zookeeper;

            vi conf/zoo.cfg

            zoo.cfg内容如下所示:

            tickTime=2000
            dataDir=/your_path/apache-zookeeper-3.6.2-bin/data
            clientPort=2181
            initLimit=5
            syncLimit=2
            admin.serverPort=8888

            配置参数说明,如下所示:

            参数名称 参数说明
            tickTime 服务器之间或客户端与服务器之间维持心跳的时间间隔
            dataDir 数据文件目录
            clientPort 客户端连接端口
            initLimit Leader-Follower初始通信时限
            syncLimit Leader-Follower同步通信时限
            admin.serverPort 内置Jetty绑定的port

            5.4 启动Zookeeper;

            bin/zkServer.sh start

            5.5 下载Kafka;

            wget https://archive.apache.org/dist/kafka/2.0.1/kafka_2.12-2.0.1.tgz

            5.6 解压安装Kafka;

            tar -zxvf kafka_2.12-2.0.1.tgz

            5.7 配置Kafka;

            cd kafka_2.12-2.0.1
            vi config/server.properties

            server.properties内容如下所示:

            broker.id=0
            zookeeper.connect=localhost:2181

            配置参数说明,如下所示:

            参数名称 参数说明
            broker.id Kafka集群实例的唯一标识
            zookeeper.connect Zookeeper的连接地址

            5.8 启动Kafka;

            bin/kafka-server-start.sh config/server.properties

            5.9 创建名称为 logdata 的 topic,用于接收采集数据,命令示例如下所示:

            bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 20 --topic logdata

            命令参数说明,如下所示:

            参数名称 参数说明
            zookeeper Zookeeper的连接地址
            topic topic的名称
            replication-factor topic的副本数
            partitions topic的分区数量

          配置Filebeat

          1.创建Filebeat采集配置文件;

           cd filebeat-oss-7.4.2-linux-x86_64
           vi filebeat.kafka.yml

          filebeat.kafka.yml内容如下:

           filebeat.inputs:
             - type: log
               enabled: true
               paths:
                   - /your_path/*.log
           
           output.kafka:
               hosts: ["**.**.**.**:9092"]
               topic: logdata
               version: 2.0.1

          配置参数说明,如下所示:

          参数名称 参数说明
          type 输入类型,设置为log,表示输入源为日志
          enabled input处理器的开关
          paths 需要监控的日志文件的路径
          hosts 消息队列Kafka的地址
          topic 日志输出到消息队列Kafka的topic
          version Kafka的版本

          2.启动Filebeat;

          ./filebeat -e -c filebeat.kafka.yml  

          配置Logstash

          1.创建配置文件logstash.conf;

          cd logstash-oss-7.4.2
          vi logstash.conf

          logstash.conf内容如下:

          input {
             kafka {
               bootstrap_servers => ["127.0.0.1:9092"]
               group_id => "log-data"
               topics => ["logdata"] 
               codec => json
            }
          }
          filter {
          }
          output {
            elasticsearch {
              hosts => "http://es_ip:es_http_port"
              user =>"your username"
              password =>"your password"
              index => "kafka‐%{+YYYY.MM.dd}"
            }
          }

          配置参数说明,如下所示:

          参数名称 参数说明
          bootstrap_servers Kafka的地址
          group_id Kafka消费者组id
          topics Kafka的topic名称
          codec 数据格式
          hosts BES服务地址
          user BES账号
          password BES账号的密码
          index 数据写入的索引

          2.启动Logstash;

          bin/logstash -f logstash.conf 

          更多Logstash的使用说明,详见Logstash使用指南

          查看日志消费状态

          使用如下命令查看消费状态;

          bin/kafka-consumer-groups.sh  --bootstrap-server 127.0.0.1:9092 --group log-data --describe

          命令参数说明,如下所示:

          参数名称 参数说明
          bootstrap_servers Kafka的地址
          group Kafka消费者组

          执行结果如下所示:

          image.png

          结果属性说明,如下所示:

          参数名称 参数说明
          topic Kafka的topic名称
          partition topic的分区id
          current-offset consumer消费的具体位移
          log-end-offset partition的最高位移
          lag 堆积量
          consumer-id 消费者id

          通过Kibana查看日志数据

          完成以下的配置后,即可通过Kibana查看日志数据。

          1.创建索引模式

          在左侧导航栏,点击Management(下图中红框位置)后,在Kibana区域点击Index Patterns(下图中蓝框位置),然后再点击Create index pattern按钮(下图绿框位置)

          image.png

          在Index pattern处(下图红框位置)填入kafka*,然后点击点击Next Step按钮(下图中蓝框位置)

          image.png

          为Time Filter field name选择@timestamp字段(下图中红框位置),然后点击Create index pattern按钮(下图中蓝框位置)

          image.png

          2.查看日志数据

          在左侧导航栏,点击Discover(下图中红框位置)后,即可以查询日志数据

          image.png

          更多Kibana的使用说明,详见Kibana使用指南中的内容。

          上一篇
          Logstash
          下一篇
          Java-SDK