
本文共 19696 字,大约阅读时间需要 65 分钟。
目录
ELK日志搜集平台
一、组件
ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成:
1. Elasticsearch
ElasticSearch是一个基于Lucene的开源分布式搜索服务器。只搜索和分析日志
特点:分布式,零配置,自动发现,索引自动分片,索引副本机制等。它提供了一个分布式多用户能力的全文搜索引擎。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。在elasticsearch中,所有节点的数据是均等的。
2. Logstash:
Logstash是一个完全开源工具,可以对你的日志进行收集、过滤、分析,并将其存储供以后使用只收集和过滤日志,和改格式
简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。
Logstash的事件(logstash将数据流中等每一条数据称之为一个event)处理流水线有三个主要角色完成:inputs –> filters –> outputs:
logstash整个工作流分为三个阶段:输入、过滤、输出。每个阶段都有强大的插件提供支持:
Input 必须,负责产生事件(Inputs generate events),常用的插件有
- file 从文件系统收集数据
- syslog 从syslog日志收集数据
- redis 从redis收集日志
- beats 从beats family收集日志(如:Filebeats)
Filter常用的插件有, 可选,负责数据处理与转换(filters modify )
- grok是logstash中最常用的日志解释和结构化插件。:grok是一种采用组合多个预定义的正则表达式,用来匹配分割文本并映射到关键字的工具。
- mutate 支持事件的变换,例如重命名、移除、替换、修改等
- drop 完全丢弃事件
- clone 克隆事件
output 输出,必须,负责数据输出(outputs ship elsewhere),常用的插件有
- elasticsearch 把数据输出到elasticsearch
- file 把数据输出到普通的文件
3. Kibana:
Kibana 是一个基于浏览器页面的Elasticsearch前端展示工具,也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮你汇总、分析和搜索重要数据日志。
4. Kafka、zookeeper
Kafka
数据缓冲队列。同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
Kafka的特性:
-
高吞吐量:kafka每秒可以处理几十万条消息。
-
可扩展性:kafka集群支持热扩展- 持久性、
-
可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
-
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
-
高并发:支持数千个客户端同时读写
它主要包括以下组件 :
话题(Topic):是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。)生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务).消费者(Consumer):可以订阅一个或多个话题,从而消费这些已发布的消息。服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。partition(区):每个 topic 包含一个或多个 partition。replication:partition 的副本,保障 partition 的高可用。leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。follower:replica 中的一个角色,从 leader 中复制数据。zookeeper:kafka 通过 zookeeper 来存储集群的信息。
拓扑结构
ZooKeeper
ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、分布式同步等。Kafka的运行依赖ZooKeeper。 也是java微服务里面使用的一个注册中心服务
ZooKeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。
在Kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中。
5. Filebeat
隶属于Beats,轻量级数据收集引擎。基于原先 Logstash-fowarder 的源码改造出来。换句话说:Filebeat就是新版的 Logstash-fowarder,目前Beats包含四种工具:
- 1.Packetbeat(搜集网络流量数据)
- 2.Metricbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据。)
- 3.Filebeat(搜集文件数据)
- 4.Winlogbeat(搜集 Windows 日志数据)
搭建架构
filebeat安装在要收集日志的应用服务器中,filebeat收集到日志之后传输到kafka中,logstash通过kafka拿到日志,在由logstash传给后面的es,es将日志传给后面的kibana,最后通过kibana展示出来。
二、环境
安装软件 | 主机名 | IP地址 | 系统版本 |
---|---|---|---|
Elasticsearch/kafka/Logstash/filebeat/Kibana | ela1 | 192.168.195.128 | centos7.4–4G |
Elasticsearch/kafka/filebeat | ela2 | 192.168.195.129 | centos7.4–2G |
Elasticsearch/kafka//filebeat | ela3 | 192.168.195.130 | centos7.4—2G |
所有机器关闭防火墙,selinux.
三、版本
Elasticsearch: 6.5.4 #https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.0-linux-x86_64.tar.gz
Logstash: 6.5.4 #https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
Kibana: 6.5.4 #https://artifacts.elastic.co/downloads/kibana/kibana-7.10.0-linux-x86_64.tar.gz
Kafka: 2.11-2.1 #https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
Filebeat: 6.5.4
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.10.0-linux-x86_64.tar.gz
相关地址:
官网地址:
官网搭建:https://www.elastic.co/guide/index.html
四、ELA部署
1.每个节点创建用户ela
useradd ela
2.每个节点解压ela包到指定目录
tar -xf elasticsearch-7.10.0-linux-x86_64.tar.gz -C /usr/local/
3.每个节点修改安装主目录的属主属组
chown -R ela.ela /usr/local/elasticsearch-7.10.0
4.在每个节点上设置系统内核参数
设置内存映射
sysctl -w vm.max_map_count=262144 > /etc/sysctl.confsysctl -p
还需要设置关于这个进程可以打开的文件描述符数量
root@prod ~]# tail /etc/security/limits.conf#@faculty soft nproc 20#@faculty hard nproc 50#ftp hard nproc 0#@student - maxlogins 4# End of file* soft nofile 65536* hard nofile 131072* soft nproc 2048* hard nproc 4096
5.在每个节点设置如下集群参数
默认情况下 Elasticsearch 会使用:
$ES_HOME/config/elasticsearch.yml 作为配置文件启动进程。
编译配置文件 /usr/local/elasticsearch-7.10.0/config/elasticsearch.yml
并添加如下内容:
ela1 节点 设置的内容如下:
cluster.name: elknode.name: ela1node.data: truenetwork.host: 0.0.0.0http.port: 9200discovery.seed_hosts: - 192.168.195.129 - 192.168.195.128 - 192.168.195.130cluster.initial_master_nodes: ["ela1", "ela2", "ela3"]
ela2 节点设置的内容如下:
cluster.name: elknode.name: ela2node.data: truenetwork.host: 0.0.0.0http.port: 9200discovery.seed_hosts: - 192.168.195.129 - 192.168.195.128 - 192.168.195.130cluster.initial_master_nodes: ["ela1", "ela2", "ela3"]
ela3 节点设置的内容如下:
cluster.name: elknode.name: ela3node.data: truenetwork.host: 0.0.0.0http.port: 9200discovery.seed_hosts: - 192.168.195.129 - 192.168.195.128 - 192.168.195.130cluster.initial_master_nodes: ["ela1", "ela2", "ela3"]
参数说明:
cluster.name 集群名称,各节点配成相同的集群名称。
node.name 节点名称,各节点配置不同。
node.data 指示节点是否为数据节点。数据节点包含并管理索引的一部分。
network.host 绑定节点IP。
http.port 监听端口。
path.data 数据存储目录。
path.logs 日志存储目录。
discovery.seed_hosts 指定集群成员,用于主动发现他们,所有成员都要写进来,包括自己,每个节点中应该写一样的信息。
cluster.initial_master_nodes 指定有资格成为 master 的节点
http.cors.enabled 用于允许head插件访问ES。
http.cors.allow-origin 允许的源地址。
注意:
当您为提供自定义设置时 network.host,Elasticsearch会假设您正在从开发模式过渡到生产模式,并将许多系统启动检查从警告升级到异常。
cluster.initial_master_nodes 中的节点名称需要和 node.name 的名称一致。
6.启动集群
在每个节点启动elasticsearch进程
切换到普通用户ela
su - ela
然后执行以下命令:
cd /usr/local/elasticsearch-7.10.0./bin/elasticsearch -d -p /tmp/elasticsearch.pid
-d 后台运行
-p 指定一个文件,用于存放进程的 pid
7.查看日志
ls logs/elk.log
8.查看集群状态
curl -X GET "localhost:9200/_cat/health?v"epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent1609065251 10:34:11 elk green 3 3 0 0 0 0 0 0 - 100.0%
三种不同状态的含义
黄色:如果您仅运行单个Elasticsearch实例,则集群状态将保持黄色。单节点群集具有完整的功能,但是无法将数据复制到另一个节点以提供弹性。
绿色:副本分片必须可用,群集状态为绿色。
红色:如果群集状态为红色,则某些数据不可用。
9.查看集群节点信息
[elastic@ela1 elasticsearch-7.10.0]$ curl -X GET "localhost:9200/_cat/nodes?v"ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name192.168.122.106 59 93 0 0.00 0.01 0.05 cdhilmrstw - ela2192.168.122.218 65 94 0 0.00 0.02 0.06 cdhilmrstw - ela3192.168.122.6 49 94 0 0.07 0.05 0.05 cdhilmrstw * ela1
五、Logstsh部署
1.安装
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gztar zxvf logstash-7.10.0-linux-x86_64.tar.gz -C /usr/local/
2.测试安装
首先,让我们通过运行最基本的Logstash管道来测试Logstash安装。
Logstash管道具有两个必需元素input和output,以及一个可选元素filter。输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。
测试Logstash安装,请运行最基本的Logstash管道。
例如,可以执行如下命令使 Logstash 程序运行于前台
进入 Logstash 的安装主目录下执行:
bin/logstash -e ''
-e
选项用于设置 Logstash 处理数据的输入和输出
-e ''
这里使用了空字符串等同于使用 -e input { stdin { type => stdin } } output { stdout { codec => rubydebug } }
input { stdin { type => stdin } }
表示 Logstash 需要处理的数据来源来自于标准输入设备(键盘)
output { stdout { codec => rubydebug } }
表示 Logstash 把处理好的数据输出到标准输出设备(屏幕,也就是终端)
稍等片刻,当看到屏幕上输出如下字样,即可尝试使用键盘输入 hello 字样
[2020-11-29T11:47:57,606][INFO ][logstash.agent ] Successfully started Logstash API endpoint { :port=>9600}
输出 hello
立刻看到终端输出配格式化后的数据信息
message
字段对应的值是 Logstash 接收到的一行完整的数据
@version
是版本信息,可以用于建立索引使用(后面会讲)
@timestamp
处理此数据的时间戳,可以用于建立索引和搜索
type
就是之前 input
中设置的值,这个值可以任意修改,但是,type 是内置的变量,不能修改,用于建立索引和条件判断等
hosts
表示从那个主机过来的数据
3.配置输入和输出
input { beats { port => 5044 }}filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } remove_field => [ "message" ] } geoip { source => "clientip" }}output { stdout { codec => rubydebug } elasticsearch { # 这里是输出到 elasticsearch 集群中 hosts => ["192.168.195.128:9200","192.168195.129:9200","192.168.195.130:9200"] }}
4.运行
运行如下命令启动 Logstatsh
bin/logstash -f first-pipeline.conf --config.reload.automatic
--config.reload.automatic
会在你修改管道配置文件后自动加载,而不必重新启动 Logstash。
六、Kibana部署
1.部署
下载解压
curl -L -O https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-linux-x86_64.tar.gztar xzvf kibana-7.10.1-linux-x86_64.tar.gz -C /usr/local/
创建软链接(可选)
ln -s /usr/local/kibana-7.10.0-linux-x86_64 /usr/local/kibana
2.设置
配置主配置文件
/usr/local/kibana/config/kibana.yml
# 监听端口,默认端口是 5601server.port: 5601# 自己的监听地址server.host: "0.0.0.0"# 用于连接到 ES 集群的地址和端口elasticsearch.hosts: ["http://es01:9200"]# pid 文件路径(可选的配置)pid.file: /var/run/kibana/kibana.pid# 日志文件路径(可选的配置),默认是输出到标准输出(也就是终端屏幕)# logging.dest: stdout logging.dest: /var/log/kibana/kibana.log# 设置页面的字体为中文i18n.locale: "zh-CN"
创建用于运行 kibana 的普通用户
默认情况下,kibana 不允许使用 root 用户运行,所以这里创建一个普通用户
useradd ela
创建程序使用到的目录并赋予权限
mkdir /var/run/kibana /var/log/kibana/chown ela.ela /var/run/kibana /var/log/kibana/
给安装目录也赋予权限
chown ela.ela kibana/ -R
3.运行和关闭
使用普通用户运行
运行于前台
/usr/local/kibana/bin/kibana
运行于后台
nohup /usr/local/kibana/bin/kibana &
使用 root 用户运行
如果所使用 root 用户运行需要使用如下命令
/usr/local/kibana/bin/kibana --allow-root
注意这样启动程序,是运行于前台
日志报错
{ "type":"log","@timestamp":"2021-03-03T10:34:45Z","tags":["error","elasticsearch","data"],"pid":4151,"message":"[ConnectionError]: connect ECONNREFUSED 127.0.0.1:9200"}
说明 kibana 无法连接到 ES 集群,需要检查如下配置是否正确
配置文件 /usr/local/kibana/config/kibana.yml
elasticsearch.hosts: ["http://localhost:9200"]
检查 elasticsearch 主机的防火墙的状态,SELinux 的状态。
关闭
设置了 PID 文件路径的情况下
使用如下命令格式即可
pkill -F PID文件路径
本例中需要执行如下命令
pkill -F /var/run/kibana/kibana.pid
没有设置 PID 文件路径的情况下
先找到进程号
ps -ef | awk '/[k]ibana/ {print $2}'
之后用 kill
命令终止此进程
4.访问
使用浏览器访问 http://ip:5601
5.创建索引模式
Kibana需要使用索引模式来访问要浏览的Elasticsearch数据。索引模式选择要使用的数据,并允许您定义字段的属性。
索引模式可以指向特定索引,例如,您昨天的日志数据或包含您的数据的所有索引。它还可以指向 数据流或索引别名
完成后如下图
6.查询数据
七、Filebeat部署
1.组件介绍
可以使用 Filebeat 收集各种日志,之后发送到指定的目标系统上,但是同一时间只能配置一个输出目标。
2.安装
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.10.1-linux-x86_64.tar.gztar xzvf filebeat-7.10.1-linux-x86_64.tar.gz -C /usr/local/
3.自定义路径收集日志
准备需要收集日志的数据
123.127.39.50 - - [04/Mar/2021:10:50:28 +0800] "GET /logo.jpg HTTP/1.1" 200 14137 "http://81.68.233.173/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36" "-"
将以上内容保存到任意的路径下,比如这里保存到 /tmp/access.log
配置Filebeat 的输入
这里就是告诉 Filebeat 应该从那个地方找的需要收集的日志
配置文件在安装的家目录下的 filebeat.yml
找如下相关内容并配置
filebeat.inputs:- type: log enabled: true paths: - /var/log/access.log - /var/log/error.log
其他配置项不用修改
配置 Filebeat 的输出
这里配置的是将收集的文件输出的终端屏幕上,以便我们调试观察
继续修改配置文件 filebeat.yml
在文件最后添加如下内容
output.console: pretty: true
再找到如下配置项进行注释,因为,Filebeat 不允许同时配置两个输出。
#output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
4.运行
在Filebeat 安装的家目录下,执行如下命令启动 filebeat
./filebeat
运行成功后,屏幕上有数据输出.
5.输出内容
{ "@timestamp": "2021-03-04T03:49:10.539Z", "@metadata": { }, "fileset": { "name": "access" }, # 重要,被收集日志文件中的一行内容, "message": "123.127.39.50 - - [04/Mar/2021:10:50:28 +0800] \"GET /logo.jpg HTTP/1.1\" 200 14137 \"http://81.68.233.173/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36\" \"-\"", "event": { # 重要可以使用此值用于区别那个程序什么类型的日志 "dataset": "nginx.access", }, "agent": { }, "host": { "ip": [ # 本机所有有效 IP ], "mac": [ ], # 主机名 "hostname": "qq", }}
后面我们会把输出的内容输出到 Logstash 或者 kafka 中进行进一步处理
八、Kafka部署
1.安装配置jdk8
Kafka、Zookeeper(简称:ZK)运行依赖jdk8
tar zxvf /usr/local/package/jdk-8u121-linux-x64.tar.gz -C /usr/local/echo 'JAVA_HOME=/usr/local/jdk1.8.0_121PATH=$JAVA_HOME/bin:$PATHexport JAVA_HOME PATH' >>/etc/profilesource /etc/profile
2.安装配置ZK
Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额下载ZK程序。
配置相互解析——三台机器
[root@ela1 ~]# vim /etc/hosts192.168.195.128 ela1192.168.195.129 ela2192.168.195.130 ela3
(1)安装
[root@ela1 ~]# tar xzvf kafka_2.11-2.1.0.tgz -C /usr/local/
(2)配置
[root@ela1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties[root@ela1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties #添加如下配置dataDir=/opt/data/zookeeper/data # 需要创建,所有节点一致dataLogDir=/opt/data/zookeeper/logs # 需要创建,所有节点一致clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 # 以下 IP 信息根据自己服务器的 IP 进行修改server.1=192.168.195.128:2888:3888 //kafka集群IP:Portserver.2=192.168.195.129:2888:3888server.3=192.168.195.130:2888:3888#创建data、log目录[root@ela1 ~]# mkdir -p /opt/data/zookeeper/{data,logs}#创建myid文件[root@ela1 ~]# echo 1 > /opt/data/zookeeper/data/myid #myid号按顺序排
[root@ela2 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties[root@ela2 ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.propertiesdataDir=/opt/data/zookeeper/data dataLogDir=/opt/data/zookeeper/logsclientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.195.128:2888:3888server.2=192.168.195.129:2888:3888server.3=192.168.195.130:2888:3888#创建data、log目录[root@ela2 ~]# mkdir -p /opt/data/zookeeper/{data,logs}#创建myid文件[root@ela2 ~]# echo 2 > /opt/data/zookeeper/data/myid
[root@ela3 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties[root@ela3 ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.propertiesdataDir=/opt/data/zookeeper/data dataLogDir=/opt/data/zookeeper/logsclientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.195.128:2888:3888server.2=192.168.195.129:2888:3888server.3=192.168.195.130:2888:3888#创建data、log目录[root@ela3 ~]# mkdir -p /opt/data/zookeeper/{data,logs}#创建myid文件[root@ela3 ~]# echo 3 > /opt/data/zookeeper/data/myid
配置项含义:
dataDir ZK数据存放目录。dataLogDir ZK日志存放目录。clientPort 客户端连接ZK服务的端口。tickTime ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。initLimit 允许follower连接并同步到Leader的初始化连接时间,当初始化连接时间超过该值,则表示连接失败。syncLimit Leader与Follower之间发送消息时如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。server.1=172.16.244.31:2888:3888 2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。
3.配置Kafka
(1)配置
[root@ela1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties[root@ela1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties #在最后添加broker.id=1listeners=PLAINTEXT://192.168.195.128:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/opt/data/kafka/logsnum.partitions=6num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=2transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=536870912log.retention.check.interval.ms=300000zookeeper.connect=192.168.195.128:2181,192.168.195.129:2181,192.168.195.130:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0[root@ela1 ~]# mkdir -p /opt/data/kafka/logs
[root@ela2 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties[root@ela2 ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.propertiesbroker.id=2listeners=PLAINTEXT://192.168.195.129:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/opt/data/kafka/logsnum.partitions=6num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=2transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=536870912log.retention.check.interval.ms=300000zookeeper.connect=192.168.195.128:2181,192.168.195.129:2181,192.168.195.130:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0[root@ela2 ~]# mkdir -p /opt/data/kafka/logs
[root@ela3 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties[root@ela3 ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.propertiesbroker.id=3listeners=PLAINTEXT://192.168.195.130:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/opt/data/kafka/logsnum.partitions=6num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=2transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=536870912log.retention.check.interval.ms=300000zookeeper.connect=192.168.195.128:2181,192.168.195.129:2181,192.168.195.130:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0[root@ela3 ~]# mkdir -p /opt/data/kafka/logs
配置项含义:
broker.id 每个server需要单独配置broker id,如果不配置系统会自动配置。listeners 监听地址,格式PLAINTEXT://IP:端口。num.network.threadsnum.io.threadssocket.send.buffer.bytessocket.receive.buffer.bytessocket.request.max.byteslog.dirs 日志文件目录。num.partitionsnum.recovery.threads.per.data.dir offsets.topic.replication.factorlog.retention.hourslog.segment.byteslog.retention.check.interval.ms zookeeper.connect ZK主机地址,如果zookeeper是集群则以逗号隔开。zookeeper.connection.timeout.ms 连接到Zookeeper的超时时间。
4.其他节点配置
只需把配置好的安装包直接分发到其他节点,修改 Kafka的broker.id和 listeners就可以了。
5.启动、验证ZK集群
(1)启动
在三个节点依次执行:
[root@ela1 ~]# cd /usr/local/kafka_2.11-2.1.0/[root@ela1 kafka_2.11-2.1.0]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
(2)验证
查看端口
[root@ela1 ~]# netstat -lntp | grep 2181tcp6 0 0 :::2181 :::* LISTEN 1226/java
6.启动、验证Kafka
(1)启动
在三个节点依次执行:
[root@ela1 ~]# cd /usr/local/kafka_2.11-2.1.0/[root@ela1 kafka_2.11-2.1.0]# nohup bin/kafka-server-start.sh config/server.properties &
(2)验证
在192.168.195.128上创建topic
root@ela2 kafka_2.11-2.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopicCreated topic "testtopic".参数解释:–zookeeper指定zookeeper的地址和端口,–partitions指定partition的数量,–replication-factor指定数据副本的数量
在129.130上面查询192.168.195.128上的topic
[root@ela3 kafka_2.11-2.1.0]# bin/kafka-topics.sh --zookeeper 192.168.195.128:2181 --listtesttopic
模拟消息生产和消费
发送消息到192.168.195.128
[root@ela2 kafka_2.11-2.1.0]# bin/kafka-console-producer.sh --broker-list 192.168.195.128:9092 --topic testtopic>hello
从192.168.195.130接受消息
[root@ela3 kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.195.128:9092 --topic testtopic --from-beginninghello
kafka没有问题之后,回到logstach服务器:#安装完kafka之后的操作:[root@ela2 ~]# cd /usr/local/logstash-6.5.4/etc/conf.d/[root@ela2 conf.d]# cp input.conf input.conf.bak[root@ela2 conf.d]# vim input.confinput { kafka { #指定kafka服务 type => "nginx_log" codec => "json" #通用选项,用于输入数据的编解码器 topics => "nginx" #这里定义的topic decorate_events => true bootstrap_servers => "192.168.195.128:9092, 192.168.195.129:9092, 192.168.195.130:9092" }} 启动 logstash[root@ela2 conf.d]# cd /usr/local/logstash-6.5.4/[root@ela2 logstash-6.5.4]# nohup bin/logstash -f etc/conf.d/ --config.reload.automatic &
decorate_events => true
默认是 false` 这将向logstash 事件添加一个名为kafka的字段 ,这包含以下属性。
`topic 主题:与此消息相关联的主题
consumer_group
使用者群组:此事件中用来读取的使用者群组
partition
分区:与此消息关联的分区
offset
偏移量:与此消息关联的分区的偏移量
key
:包含message key的ByteBuffer
九、配置 Filebeat 输出到 Kafka
1.配置
output.kafka: # initial brokers for reading cluster metadata hosts: ["ela1:9092", "ela2:9092", "ela3:9092"] topic: 'nginx' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
2.验证
验证kafka是否生成topic
[root@ela3 filebeat]# cd /usr/local/kafka_2.11-2.1.0/[root@ela3 kafka_2.11-2.1.0]# bin/kafka-topics.sh --zookeeper 192.168.195.128:2181 --list__consumer_offsetsnginx #已经生成topictesttopic
十、配置 Logstash 从 Kafka 中获取日志
input { kafka { bootstrap_servers => "myhost:9092" topics => ["nginx"] codec => json }}
发表评论
最新留言
关于作者
