0%

基于 Elasticsearch 与 Kibana 搭建流量可视化平台

前言

前期工作:基于 Bind9 搭建内网 DNS 服务器,使得在 /etc/resolv.conf 中配置了该服务器 IP 的节点或终端的 DNS 流量都会流经该服务器,为集群 DNS 流量采集提供了先决基础。同时,初步使用日志记录 DNS 查询请求(配置在 /etc/named.conf 中),后续可采用 Packetbeat/TShark 等工具主动捕获。DNS 查询日志内容格式如下:

1
2
30-Nov-2021 08:57:25.192 client @0x7f74f800b650 192.168.101.145#61323 (apisix.apache.org): query: apisix.apache.org IN A + (168.168.168.47)
30-Nov-2021 08:57:25.192 client @0x7f7514148d20 192.168.101.145#61325 (github.com): query: github.com IN A + (168.168.168.47)

为了推进后续的 DNS 流量分类研究工作,本文介绍了如何基于现有的 DNS 流量日志,使用 Elasticsearch 和 Kibana 搭建流量监测可视化平台。本文包含以下内容:

  • 使用 Filebeat 将流量日志实时同步到 Elasticsearch;
  • 使用 Grok 正则过滤器对采集到的日志消息二次处理;
  • 使用网络数据包分析工具 Packetbeat

将日志文件转储到 Elasticsearch,一是作为历史归档考量,Bind9 日志存在存储时限,如果文件大小达到设置的上限,旧的日志文件会被循环覆盖。二是 JSON 格式易于拓展,支持嵌套和数组,且易于特征提取阶段 Python 进行处理。三是可视化支持好,在 Elasticsearch 数据集上,使用 Kibana 可以快速建立指标分析、图表分析以及时间序列分析等可视化。下图是基于 DNS 查询日志配置的 Kibana Dashboard 可视化界面。

93787acedd01f3ccf8ad7f2e4c62627d.png

实验环境

  • 操作系统 CentOS 7.9;
  • Docker,版本 19.03.13;
  • Elastic Stack 工具集,版本 7.15.2。包括 Elasticsearch、Kibana、Filebeat 和 Packetbeat。

部署 Elasticsearch + Kibana

使用 Docker 快速部署 Elasticsearch 与 Kibana 环境。首先,拉取 Docker 镜像:

1
2
$ docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.2
$ docker pull docker.elastic.co/kibana/kibana:7.15.2

需先以单点模式运行 Elasticsearch 容器,然后再启动 Kibana 容器。

1
2
$ docker run --name es01-test --net elastic -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.15.2
$ docker run -d --name kib01-test --net elastic -p 5601:5601 -e "ELASTICSEARCH_HOSTS=http://es01-test:9200" docker.elastic.co/kibana/kibana:7.15.2

成功启动容器后,可在 5601 端口访问 Kibana 首页。

d15ed7bec92f990449315ed49fa7bfeb.png

使用 Filebeat 监听日志文件变更

Filebeat 是 Elastic/Beats 数据采集器工具集中的一员,负责日志采集,相比于运行在 JVM 上的 Logstash,它更加轻量级。Filebeat 能够实时捕获文件的新增内容(类似 tail -f 命令),按行将新增内容转发到 Elasticsearch 或 Logstash 中存储,并记录文件当前偏移量,以便中断后下次启动从中断点继续开始。

下载、解压 Filebeat 安装包并重命名。

1
2
3
$ wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.15.2-linux-x86_64.tar.gz
$ tar -xzvf filebeat-7.15.2-linux-x86_64.tar.gz
$ mv filebeat-7.15.2-linux-x86_64 filebeat

Filebeat 安装目录中,包含启动脚本以及默认配置文件 filebeat.yml。

1
2
3
$ cd filebeat
$ ./filebeat version
filebeat version 7.15.2 (amd64), libbeat 7.15.2 [fd322dad6ceafec40c84df4d2a0694ea357d16cc built 2021-11-04 14:22:49 +0000 UTC]

按照 filebeat.yml 文件格式,编写自定义配置文件 named.yml。在配置文件中,设置日志输入源为 Bind9 生成的 query.log;去除了 Filebeat 生成的冗余字段(可选);设置了写入的 Elasticsearch 的 URL 以及索引分片数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/named/query.log

processors:
- drop_fields:
fields: ["host","agent","ecs","input"]
ignore_missing: false

setup.template.settings:
index.number_of_shards: 3

output.elasticsearch:
hosts: ["http://localhost:9200"]

默认情况下,Filebeat 写入 Elasticsearch 的索引名形如 filebeat-%{[beat.version]}-%{+yyyy.MM.dd}。此项配置在 output.elasticsearch.index 中,Filebeat 会自动创建索引别名,如 filebeat-7.15.2。

运行 Filebeat,参数 -c 指定配置文件 named.yml,-e 将错误日志输出到控制台。

1
$ ./filebeat -e -c named.yml

注:使用 nohup& 命令可以后台运行,并且输入 exit 退出命令行终端后依然不会停止(直接关闭命令行终端会停止)。

从 Filebeat 发往 Elasticsearch 的日志消息,包装在 message 字段中,除此之外,还有一些 Filebeat 生成的记录字段。一条日志信息转化后的格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"_index": "filebeat-7.15.2-2021.11.26-000001",
"_type": "_doc",
"_id": "5qjyan0BAsnp-ar_z6EP",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2021-11-29T09:07:16.390Z",
"log": {
"offset": 2892475,
"file": {
"path": "/var/log/named/query.log"
}
},
"message": "29-Nov-2021 17:07:13.335 client @0x7fd6a0010f10 192.168.101.60#58832 (shcn2-06b.iplc188.com): query: shcn2-06b.iplc188.com IN A + (168.168.168.47)"
},
"fields": {
//...
}
}

对于原始的 message 字段,能够提供的信息有限,因此需要对消息字段加以处理。Logstash 的插件 Grok 正则过滤器提供对字符串的解析功能。如今,Elastic 提供了 Ingest Node 功能,允许在文档写入 Elasticsearch 索引之前经过预处理,并提供了对 Grok 的集成。

使用 Grok 正则处理日志消息

在 Kibana 的 “Stack Management | Ingest Node Pipelines” 界面,可以指定一系列的处理器 (Processors)对消息进行流水线处理,包括定义异常处理分支。

dc741da57eca3e2b9bdc4bf70357cc3c.png

对于 query.log 文件中的日志记录,由于日志格式固定,因此可以使用 Grok 对一条记录进行匹配。正则表达式格式如 %{IP:source.ip},冒号左侧为 Grok 默认 Pattern,冒号右侧为存储的 JSON 字段名,支持嵌套结构。解析日志的 Grok 正则表达式如下:

1
2
3
23-Nov-2021 13:58:03.222 client @0x7f7034054680 168.168.168.47#56504 (baidu.com): query: baidu.com IN A + (168.168.168.47)

%{GREEDYDATA:timestamp} %{WORD} %{DATA:source.client} %{IP:source.ip}#%{NUMBER:source.port} \(%{HOSTNAME:domain.fullname}\): query: %{HOSTNAME} %{WORD:class} %{WORD:type}

此外,还可以自定义 Grok Pattern。例如,对于域名字段来说,其“二级域名 + 顶级域名”构成的域名后缀(即用户可以在域名注册网站购买的域名)可能具有重要分析意义。为此,定义了名为 SLD 的 Grok Pattern,将其解析成 domain.eTLDplusOne 字段(有效顶级域名 + 1,名称借鉴自 Packetbeat)。

fb23cc0fd30fb9e1e9c8d89ee52a22a6.png

注:正则表达式中的反义字符由于要表示在 JSON 文档中,所以需要额外一个下划线,如 \\.

可以在 “Stack Management | Ingest Node Pipelines” 界面添加测试文档测试流水线是否正常工作,这对于验证正则表达式正确性有巨大帮助。此外,还能够在 Dev Tools 的 Grok Debugger 控制台中调试正则表达式。

实际上,在 Kibana 界面添加 Pipeline 相当于在 Elasticsearch 的 /_ingest/pipeline 索引中添加了如下的一条文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
"dns_query_log_pipeline" : {
"description" : "Extract fields from message.",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"""%{GREEDYDATA:timestamp} %{WORD} %{DATA:source.client} %{IP:source.ip}#%{NUMBER:source.port} \(%{HOSTNAME:domain.fullname}\): query: %{HOSTNAME} %{WORD:class} %{WORD:type} """
]
}
},
{
"date" : {
"field" : "timestamp",
"formats" : [
"dd-MMM-yyyy HH:mm:ss.SSS"
],
"target_field" : "timestamp"
}
},
{
"grok" : {
"field" : "domain.fullname",
"patterns" : [
"""(%{GREEDYDATA:domain.prefix}\.)*(%{SLD:domain.eTLDplusOne}){1}"""
],
"pattern_definitions" : {
"SLD" : """((xn--)?[a-z0-9-]+\.[a-z|\-]{2,63})"""
},
"on_failure" : [
{
"set" : {
"field" : "domain.hostname",
"copy_from" : "domain.fullname"
}
}
]
}
}
]
}
}

为了使该 Pipeline 生效,还需要在 Filebeat 的配置文件中添加此项配置,并重启 Filebeat 程序。

1
2
3
output.elasticsearch:
hosts: ["http://localhost:9200"]
pipeline: "dns_query_log_pipeline"

Kibana 可视化

要在 Kibana 查看 Elasticsearch 中存储的数据,首先需要在 “Stack Management | Index patterns” 中配置索引模式(Index pattern)。

c1ddec742b59b52562067be4e9ed6231.png

创建完成后,可以在 “Discover” 界面查看索引中存放的数据。

6625242b24495aaf1c3a084b4c17bb71.png

还可以在 “Dashboard” 界面自定义可视化看板,效果图如前言中所示。

使用 Packetbeat 监听数据包

除了使用 Filebeat + Grok 手动解析日志文件之外,Beats 工具集还提供了专用于网络数据包分析的轻量型工具 Packetbeat,它能够捕获由 ping 工具发起的 ICMP 报文(网络层),或者通过端口捕获应用层报文,如 HTTP 80 端口、MySQL 3306 端口等。捕获的报文可被解析成 JSON 格式存储到 Elasticsearch。

下载、解压 Packetbeat 安装包,并重命名。

1
2
3
$ curl -L -O https://artifacts.elastic.co/downloads/beats/packetbeat/packetbeat-7.15.2-linux-x86_64.tar.gz
$ tar xzvf packetbeat-7.15.2-linux-x86_64.tar.gz
$ mv packetbeat-7.15.2-linux-x86_64 packetbeat

Packetbeat 的配置文件模版位于 packetbeat.yml 中,我们需要自定义 DNS 配置文件 dns.yml。除了以 packetbeat. 开头的特定配置外,其他配置与 Filebeat 共用一套模版。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
packetbeat.interfaces.device: eth0

packetbeat.protocols:
- type: dns
ports: [53]

processors:
- drop_fields:
fields: ["host","agent","ecs"]
ignore_missing: false

setup.template.settings:
index.number_of_shards: 3

output.elasticsearch:
hosts: ["http://localhost:9200"]

运行 Packetbeat,参数 -c 指定配置文件 named.yml,-e 将错误日志输出到控制台。

1
./packetbeat -e -c dns.yml

启动 Packetbeat 后,捕获的数据被发送至 Elasticsearch,在 “Stack Management | Index patterns” 中配置索引模式后,就可以访问以 packetbeat-7.15.2 别名命名的索引中的数据。

此外,Packetbeat 还提供了一些可视化的 Kibana Dashboard 模版,通过如下命令初始化:

1
./packetbeat setup --dashboards

Dashboard 效果图如下。

7a2a13ab1327c49fa654607b52f662e5.png

总结

值得注意的是,Packetbeat 监听的是目的端口为 53 的数据包。不管是下面哪种 DNS 报文,Packetbeat 都能进行捕获。

  • 内网中访问内网 DNS 服务器的其他终端,如 192.168.101.215 -> 168.168.168.47(内网 DNS 服务器);
  • 内网 DNS 服务器转发的 DNS 查询,如 168.168.168.47 -> 8.8.8.8(谷歌 DNS 服务器,可在 Bind9 配置文件中定义转发规则);
  • 甚至是手动指定 DNS 服务器的查询报文,如 dig @114.114.114.114 baidu.com

08d3eb68410326d6b2798b8aa397eacc.png

相比之下,使用日志记录 DNS 查询请求的方式就显得十分局限了。即使内网中的用户在 /etc/resolv.conf 中配置了内网 DNS 服务器,也不能保证恶意软件乖乖的使用该 DNS 服务器。因此理论上,只有在内网所有终端上部署类似 Packetbeat 的数据包探测器(类似的还有 Wireshark 的底层库工具 tshark),才能真正全量的监听 DNS 流量。此外,Packetbeat 解析出的 JSON 结构的数据所包含的信息要远大于通过正则分析的日志信息。

因此对于端口固定的数据包探测,Packetbeat 的实用性要优于 Filebeat + Grok 的方案。但 Filebeat 适用范围更广,任何有生成日志的程序,甚至是批/流处理产生的中间数据文件,理论上都能通过 Filebeat 采集特征存入 Elasticsearch 进行后续分析。

参考