filebeat+kafka+logstash 详细配置
发布日期:2022-04-22 13:47:02 浏览次数:7 分类:博客文章

本文共 4576 字,大约阅读时间需要 15 分钟。

环境 :centos 7

192.168.1.1 zookeeper+kafka+logstash+es+kiana
192.168.1.2 zookeeper+kafka+filebeat
192.168.1.3 zookeeper+kafka+filebeat

组件介绍:

1.Filebeat负责从web服务器上实时抓取数据,当log文件发生变化时,将文件内容吐给kafka。

2.Kafka是消息队列,主要作用是在filebeat和logstash之间做缓存,避免因写入logstash的数据量过大,导致数据丢失。

3.Zookeeper是kafka的分发系统,他负责维护整个kafka集群的负载均衡,在部署的时候,每个kafka节点上都要单独安装zookeeper,同时要保证zookeeper之间能够互相通信(2181端口)。

4.Logstash是日志处理器,也是整个elk系统的核心。负责来自kafka的日志处理,然后把处理过的日志吐给elasticsearch。需要注意的是,经logstash处理过的日志都是json格式的

架构图:

一、配置filebeat

1.安装filebeat

yum -y install filebeat......rpm

可直接安装rpm包使用,看清linux版本

2.配置filebeat

cat /etc/filebeat/filebeat.yml
filebeat.inputs:- type: system_log  enabled: true  paths:    - /var/log/messagefilebeat.config.modules:  path: ${path.config}/modules.d/*.yml  reload.enabled: falsesetup.template.settings:  index.number_of_shards: 1setup.kibana:output.kafka:  hosts: ["192.168.1.1:9092","192.168.1.2:9092","192.168.1.3:9092"]  enabled: true  topic: node01-system-message	#会自定在kafka创建该topic供logstash订阅读取时使用processors:  - add_host_metadata: ~  - add_cloud_metadata: ~  - add_docker_metadata: ~  - add_kubernetes_metadata: ~

官方地址wiki:

3.启动

filebeat -e -c /etc/filebeat/filebeat.yml

二、kafka集群搭建并测试

1.下载kafka

官网下载地址为:

2.将下载好的压缩包解压

3.配置zookeeper

kafka启动依赖于zookeeper集群

1).使用kafka包里自带的zookeeper

添加如下基本配置(三台上基本一致)

vim /opt/kafka.../config/zookeeper.propertie

tickTime=2000initLimit=10syncLimit=5dataDir = /data/zookeeperserver.1 = 192.168.1.1:2888:3888server.2 = 192.168.1.2:2888:3888server.3 = 192.168.1.3:2888:3888

2888 端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;

3888 端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader ,而这个端口就是用来执行选举时服务器相互通信的端口。

mkdir -p /data/zookeeper
echo (1,2,3) > /data/zookeeper/myid	#每天ip对应,不能有重复

2).使用另外的zookeeper

3).启动zookpeer集群

/opt/kafka_2.12-2.7.0/bin/zookeeper-server-start.sh ../config/zookeeper.propertie	#注意这是前台启动zookeeper-server-stop.sh	#停止脚本

刚开始会报错是由于zookeeper集群在启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以上面日志前面部分的异常是可以忽略的。通过后面部分可以看到,集群在选出一个Leader后,最后稳定了。

4).zookpeer集群检查

netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24 : 3888 0.0.0.0 : * LISTEN 1959 / java

tcp 0 0 0.0.0.0 : 2181 0.0.0.0 : * LISTEN 1959/ java

tcp 0 0 192.168.2.24 : 2888 0.0.0.0 : * LISTEN 950 / java

可以看出,如果哪台是Leader,那么它就拥有2888这个端口

4.配置kafka集群

1).配置kafka(三台大致配置一样,除却ip跟broker_id)

vim /opt/kafka_2.12-2.7.0/config/server.properties

要修改或添加的参数,除却以下参数,采用默认即可

broker.id=1	#对应zookeepr集群iddelete.topic.enable=true	#默认时false,为true时,删除topic时会被立即删除host.name=192.168.1.1listeners=PLAINTEXT://192.168.1.1:9092advertised.host.name=192.168.1.1advertised.port=9092log.dirs=/data/kafka/kafkalogs############################# Zookeeper ############################zookeeper.connect=192.168.1.1:2181.192.168.1.2:2181,192.168.1.3:2181zookeeper.connection.timeout.ms=18000######### Group Coordinator Settings #############################group.initial.rebalance.delay.ms=0delete.topic.enable=true  #允许删除topicauto.create.topics.enable = true #允许自动创建topic

2).启动测试

启动kafka

/opt/kafka_2.12-2.7.0/bin/kafka-server-start.sh -daemon(后台启动) ../config/server.properties

三台依次启动,启动侯无发现错误信息即可测试

测试

创建topic

./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --replication-factor 3 --partitions 1 --topic yanghaoyu

--replication-factor 3 #该参数不能大于集群总数

查看topic列表

./kafka-topics.sh --list --zookeeper 192.168.1.2:2181

三、配置logstash

1.安装logstash(这里使用rpm文件直接安装使用)

yum - y install logstash.......

2.编辑配置文件

vim /etc/logstash/conf.d/test.yml
#输入配置,一个input{}里可以配置多个输入源input {  #kafka输入源配置  kafka {    #kafka集群地址    bootstrap_servers => ["192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092"]    #从kafka中哪个topic读取数据,这里的topic名要与filebeat中使用的topic保持一致    topics => ["node01-system-message"]    #这是kafka中的消费组者ID,默认值是“logstash”。kafka将消息发到每个消费者组中,同一个组中的消费者收到的数据不重复。例如有两个消费者组G1、G2,G1中有成员A、B,G2中有成员C、D。kafka从输入中收到了10条消息,会将这10条消息同时发送给G1和G2,A和B各会收到这10条消息中的一部分,他们收到消息的并集就是这10条消息,C和D同理。    group_id => "filebeat-logstash"    #kafka消费者组中每个消费者的ID,默认值是“logstash”    client_id => "logstash-node01"    #logstash的消费线程,一般一个线程对应kafka中的一个partition(分区),同一组logstash的consumer_threads之和应该不大于一个topic的partition,超过了就是资源的浪费,一般的建议是相等。    consumer_threads => 1    #由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。    codec => json  }}#输出配置,这里表示输出到文件output {  file {    path => "/tmp/logstash.output"  }}

3.启动测试

logstash -f /etc/logstash/conf.d/test.yml

tail -f /tmp/logstash.output

能看到filebeat收集的message的相关信息;测试成功,接下来输出到es

output {        elasticsearch {                hosts => ["192.168.137.25:9200"]                index => "node01-system-message-%{+YYYY-MM}"                }        }

转载地址:https://www.cnblogs.com/yhy223/p/14465055.html 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:filecoin python 离线地址生成和离线签名实现
下一篇:Filebeat+kafka+logstash 多路径日志收集

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2024年03月19日 16时30分01秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

c语言 实现sizeof功能,C语言简单实现sizeof功能代码 2019-04-21
c语言sin函数近似值,用泰勒公式求sin(x)的近似值 2019-04-21
c 语言登录系统源代码,c语言源代码---------------个人图书管理系统 2019-04-21
android线程通信方式,Android 主线程和子线程通信问题 2019-04-21
cps1 cps2 android,图文教程:CPS1和CPS2模拟器使用 2019-04-21
在线设计 html5 表单,html5注册表单制作-表单制作-小程序表单制作 2019-04-21
android小闹钟课程设计,《小闹钟》教学设计 2019-04-21
mysql文件系统_MySQL文件系统先睹为快(1) 2019-04-21
nums在python_程序找到一对(i,j),其中nums [i] + nums [j] +(i -j)在Python中最大化?... 2019-04-21
jquery后台内容管理_教育平台项目后台管理系统:课程内容模块 2019-04-21
grouping函数 mysql_sql聚合函数有哪些 2019-04-21
python os.walk如何不遍历隐藏文件_python 获取文件下所有文件或目录os.walk()的实例... 2019-04-21
python 股票估值_【中金固收·固收+】隐藏价值的角落:限售股AAP估值及Python实现方法(上)... 2019-04-21
java文档生成_Java文档自动生成 2019-04-21
java 共享目录_java 操作windows 共享目录方法介绍 2019-04-21
java 监控 宕机_JAVA监测tomcat是否宕机,控制重启 2019-04-21
catch that cow java_POJ3278——Catch That Cow 2019-04-21
java integer 不变模式_Java代码的变与不变 2019-04-21
java guava 使用_Java8-Guava实战示例 2019-04-21
python barrier option pricing_《Python金融数据分析》书内代码实战与讲解(二)金融衍生物定价... 2019-04-21