flume采集log4j日志到kafka
发布日期:2021-08-17 00:52:22 浏览次数:37 分类:技术文章

本文共 5213 字,大约阅读时间需要 17 分钟。

简单测试项目:

1、新建Java项目结构如下:

测试类FlumeTest代码如下:

package com.demo.flume;import org.apache.log4j.Logger;public class FlumeTest {        private static final Logger LOGGER = Logger.getLogger(FlumeTest.class);    public static void main(String[] args) throws InterruptedException {        for (int i = 20; i < 100; i++) {            LOGGER.info("Info [" + i + "]");            Thread.sleep(1000);        }    }}

监听kafka接收消息Consumer代码如下:

package com.demo.flume;/** * INFO: info * User: zhaokai * Date: 2017/3/17 * Version: 1.0 * History: 

如果有修改过程,请记录

*/import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class Consumer { public static void main(String[] args) { System.out.println("begin consumer"); connectionKafka(); System.out.println("finish consumer"); } @SuppressWarnings("resource") public static void connectionKafka() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.163:9092"); props.put("group.id", "testConsumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("flumeTest")); while (true) { ConsumerRecords
records = consumer.poll(100); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } for (ConsumerRecord
record : records) { System.out.printf("===================offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } }}

log4j配置文件配置如下:

log4j.rootLogger=INFO,console# for package com.demo.kafka, log would be sent to kafka appender.log4j.logger.com.demo.flume=info,flumelog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppenderlog4j.appender.flume.Hostname = 192.168.1.163log4j.appender.flume.Port = 4141log4j.appender.flume.UnsafeMode = truelog4j.appender.flume.layout=org.apache.log4j.PatternLayoutlog4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n # appender consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

备注:其中hostname为flume安装的服务器IP,port为端口与下面的flume的监听端口相对应

pom.xml引入如下jar:

org.slf4j
slf4j-log4j12
1.7.10
org.apache.flume
flume-ng-core
1.5.0
org.apache.flume.flume-ng-clients
flume-ng-log4jappender
1.5.0
junit
junit
4.12
org.apache.kafka
kafka-clients
0.10.2.0
org.apache.kafka
kafka_2.10
0.10.2.0
org.apache.kafka
kafka-log4j-appender
0.10.2.0
com.google.guava
guava
18.0

2、配置flume

flume/conf下:

新建avro.conf 文件内容如下:

当然skin可以用任何方式,这里我用的是kafka,具体的skin方式可以看官网

a1.sources=source1a1.channels=channel1a1.sinks=sink1a1.sources.source1.type=avroa1.sources.source1.bind=192.168.1.163a1.sources.source1.port=4141a1.sources.source1.channels = channel1a1.channels.channel1.type=memorya1.channels.channel1.capacity=10000a1.channels.channel1.transactionCapacity=1000a1.channels.channel1.keep-alive=30a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.sink1.topic = flumeTesta1.sinks.sink1.brokerList = 192.168.1.163:9092a1.sinks.sink1.requiredAcks = 0a1.sinks.sink1.sink.batchSize = 20a1.sinks.sink1.channel = channel1

如上配置,flume服务器运行在192.163.1.163上,并且监听的端口为4141,在log4j中只需要将日志发送到192.163.1.163的4141端口就能成功的发送到flume上。flume会监听并收集该端口上的数据信息,然后将它转化成kafka event,并发送到kafka集群flumeTest topic下。

3、启动flume并测试

  1. flume启动命令:bin/flume-ng agent --conf conf --conf-file conf/avro.conf --name a1 -Dflume.root.logger=INFO,console
  2. 运行FlumeTest类的main方法打印日志
  3. 允许Consumer的main方法打印kafka接收到的数据

转载于:https://www.cnblogs.com/dreammyle/p/6595693.html

转载地址:https://blog.csdn.net/weixin_30825199/article/details/99222891 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:定时器参考
下一篇:【bzoj2006】[NOI2010]超级钢琴 倍增RMQ+STL-堆

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年04月07日 02时54分55秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

C++面经总结之《Effective C++》(一) 2019-04-27
C++面经总结之《Effective C++》(二) 2019-04-27
打开我的收藏夹 -- Python爬虫篇(2) 2019-04-27
这是什么“虎狼之词”啊!!!程序员的健康问题,看一线老中医怎么说!!! 2019-04-27
打开我的收藏夹 -- Python数据分析杂谈 2019-04-27
上手Pandas,带你玩转数据(1)-- 实例详解pandas数据结构 2019-04-27
上手Pandas,带你玩转数据(2)-- 使用pandas从多种文件中读取数据 2019-04-27
上手Pandas,带你玩转数据(3)-- pandas数据存入文件 2019-04-27
爬虫遇上不让右击、不让F12的网站,该怎么办? 2019-04-27
上手Pandas,带你玩转数据(4)-- 数据清洗 2019-04-27
上手Pandas,带你玩转数据(5)-- 数据转换与数据定位 2019-04-27
上手Pandas,带你玩转数据(6)-- 摆脱对pandas可视化丑图的刻板印象吧 2019-04-27
从零开始,学会Python爬虫不再难!!! -- (1)开篇:初识爬虫,基础铺垫 丨蓄力计划 2019-04-27
从零开始,学会Python爬虫不再难!!! -- (2)承接:解析网页,抓取标签 丨蓄力计划 2019-04-27
AttributeError: module ‘urllib‘ has no attribute ‘quote‘的解决办法 2019-04-27
linux shell — 6.初识 EXT2 文件系统 2019-04-27
Java — String(字符串) 2019-04-27
linux shell — 7.linux 磁盘与文件系统管理 2019-04-27
linux shell — 8.linux 磁盘与文件系统管理(2) 2019-04-27
Java — 事件监听、事件处理 初体验 2019-04-27