kafka入门+高级+案列+面试
发布日期:2021-06-28 20:55:16 浏览次数:3 分类:技术文章

本文共 33510 字,大约阅读时间需要 111 分钟。

Kafka升级版教程

参考课程链接:https://edu.csdn.net/course/play/25861/315725

第1 章 Kafka 概述


1.1 定义

Kafka 是一个分布式的基于发布/订阅模式消息队列(Message Queue),主要应用于

大数据实时处理领域

1.2 消息队列

1.2.1 传统消息队列的应用场景

  • MQ传统应用场景之异步处理
    在这里插入图片描述
    流量消峰->秒杀系统:
    在这里插入图片描述业务解耦
    不同服务之间通过生产消费的方式通信

使用消息队列(MQ)的优点

  • 1)解耦*(生产者往消息队中扔消息,消费者从生产队列中取消息,而不是A直接将消息扔给B)*
    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  • 2)可恢复性 (消费者宕机,重启后可以从消息队列中重新取数据)
    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所
    以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  • 3)缓冲 (一般是生产大于消费)
    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
  • 4)灵活性& 峰值处理能力 (增减服务器&削锋的功能)
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
  • 5)异步通信
    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
    以上优点kafka也具有,因为kafka本身就是消息队列

1.2.2 消息队列的两种模式

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
在这里插入图片描述
(2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
发布/订阅模式又分为两种:
1. 基于推送:队列主动将发布者放入到队列的消息主动推送给消费者(类似公众号)
2. 基于拉取:消费者决定何时取队列中取消息(kafka属于这一种)

在这里插入图片描述

1.3 Kafka 基础架构

  • Kafka 架构
    没有Broker可以有多个主题,每个主题可以存在多个分区,每个分区可以有多个副本
    在这里插入图片描述
  • **1)Producer :**消息生产者,就是向kafka broker 发消息的客户端;
  • 2)Consumer :消息消费者,向kafka broker 取消息的客户端;
  • 3)Consumer Group (CG):可以理解为一个大的消费者整体,提高消费的并发性消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • **4)Broker :**一台kafka 服务器就是一个broker。一个集群由多个broker 组成。一个broker 可以容纳多个topic。
  • **5)Topic :给数据分类,**可以理解为一个队列,生产者和消费者面向的都是一个topic;
  • 6)Partition:为了提高某一个Topic的负载均衡,提高kafkad的并发性为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个partition,每个partition 是一个有序的队列;
  • 7)Replica:备份数据,提高数据的冗余性副本,为保证集群中的某个节点发生故障时,该节点上的partition 数据不丢失,
    且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
  • 8)leader:对于分区来讲,及分区的leader每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
  • **9)follower:**每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。

注意: 1. 传输数据时,生产者/消费者,发布/消费消息只会去找分区的leader,follower只是备份的作用,当leader所在的服务器宕机时,才会去代替leader。

2. leader和follower不可能存在同一台服务器主机上
3. 某一个分区同时只能被同一个消费者组里面的一个消费者消费
在这里插入图片描述


第2章 Kafka安装

2.1 安装部署

2.1.1 集群规划

在这里插入图片描述

2.1.2 jar包下载

http://kafka.apache.org/downloads.html

在这里插入图片描述
kafka的服务使用scala写的,客户端是用java写的

2.1.3 集群部署

1)解压安装包

配置文件的内容

#broker的全局唯一编号,不能重复broker.id=0#删除topic功能使能delete.topic.enable=true#处理网络请求的线程数量num.network.threads=3#用来处理磁盘IO的现成数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka运行日志存放的路径log.dirs=/opt/module/kafka/logs#topic在当前broker上的分区个数num.partitions=1#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,超时将被删除log.retention.hours=168#配置连接Zookeeper集群地址zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

06_Kafka入门_安装&启动&关闭

10)kafka群起脚本 群

-daemon 以守护进程启动
在这里插入图片描述
在这里插入图片描述

for i in hadoop102hadoop103 hadoop104doecho "========== $i ==========" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

07_Kafka入门_命令行操作Topic增删查

在这里插入图片描述
在这里插入图片描述

命令行控制台生产者消费者测试

发布的消息回在topic中存储七天,七天之内都可以消费。
消费者,0.9之前的版本 是连zookeeper集群,0.9版本之后连的是kafka集群
在这里插入图片描述
数据日志分离
在这里插入图片描述
在这里插入图片描述


第3 章 Kafka 架构深入

3.1 Kafka 工作流程及文件存储机制

Kafka 工作流程

在这里插入图片描述
在这里插入图片描述
kafka 只能保证数据在每个分区内有序,不能保证全局有序

Kafka 中消息是以**topic** 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应于一个log 文件,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

Kafka文件存储机制

在这里插入图片描述
在这里插入图片描述
由于生产者生产的消息会不断追加到log 文件末尾,为防止log 文件过大导致数据定位效率低下,Kafka 采取了分片索引机制,将每个partition 分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号。例如,first 这个topic 有三个分区,
则其对应的文件夹为first-0,first-1,first-2。

00000000000000000000.index00000000000000000000.log00000000000000170410.index00000000000000170410.log                   //170410为当今存储数据的最小的偏移量(offset)00000000000000239430.index00000000000000239430.log              //239430为当今存储数据的最大的偏移量(offset)
index 和log 文件以当前segment 的第一条消息的offset 命名。下图为index 文件和log

文件的结构示意图。

index文件和log文件详解
在这里插入图片描述
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。

3.2 Kafka生产者

3.2.1 分区策略

1)分区的原因

(1)方便在集群中扩展,提高负载能力,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
2)分区的原则
我们需要将producer发送的数据封装成一个ProducerRecord对象。
在这里插入图片描述
(1)指明partition 的情况下,直接将指明的值直接作为partiton 值;
(2)没有指明partition 值但有key 的情况下,将key 的hash 值与topic 的partition 数进行取余得到partition 值;
(3)既没有partition 值又没有key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic 可用的partition 总数取余得到partition 值,也就是常说的round-robin 算法。

3.2.2 数据可靠性保证(ISR ack)

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

在这里插入图片描述
1)副本数据同步策略
在这里插入图片描述
Kafka 选择了第二种方案,原因如下:
1.同样为了容忍n 台节点的故障,第一种方案需要2n+1 个副本,而第二种方案只需要n+1个副本,而Kafka 的每个分区都有大量的数据,一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka 的影响较小。
2)ISR(同步副本) (对副本中,有一个Follower故障, 就无法实现全部同步情况的的优化) 为了数据的可靠性,不丢数据
采用第二种方案之后,设想以下情景:leader 收到数据,所有follower 都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader 维护了一个动态的in-sync replica set (ISR),意为和leader 保持同步的follower 集合。
ISR :假设有10个副本及(1个leade+9个follower),从中选取出几个follower作为ISR(t同步副本),假设选4个follower作为ISR,当这4个follower完成数据同步时,leader就可一个发送ack。
及当ISR 中的follower 完成数据的同步之后,leader 就会给producer 发送ack。如果follower长时间未向leader 同步数据, 则该follower 将被踢出ISR , 该时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从ISR 中的follower选举新的leader。
在这里插入图片描述
3)ack 应答机制 (数据丢不丢)
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR 中的follower 全部接收成功。所以Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks 参数配置:
acks:
0:producer 不等待broker 的ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当broker 故障时有可能丢失数据
acks = 1 数据丢失案例
在这里插入图片描述
1:producer 等待broker 的ack,partition 的leader 落盘成功后返回ack,如果在follower同步成功之前leader 故障,那么将会丢失数据

-1或者 all:producer 等待broker 的ack,partition 的leader 和follower(ISR中的follower) 全部落盘成功后才返回ack。但是如果在follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复

acks = -1 数据重复案例
在这里插入图片描述

在这里插入图片描述

4)故障处理细节

HW: 保证broker 存储一致性,提供消费数据一致性

Log文件中的HW和LEO

保证给消费者提供消费数据一致性:只给消费者暴露每个副本最小的offset
保证broker保存生产者保存数据的一致性:每次新选leadr后,除新选出的leader外,其他的大于,副本最小的offset部分的消息被截取,再同步leafer的数据
在这里插入图片描述在这里插入图片描述

LEO:指的是每个副本最大的offset;

HW:指的是消费者能见到的最大的offset,ISR 队列中最小的LEO。 HW是为了保障消费者消费数据的一致性和存储数据的一致性,不是为了数据丢不丢是,数据不丢失是ack的问题
(1)follower 故障
follower 发生故障后会被临时踢出ISR,待该follower 恢复后,follower 会读取本地磁盘
记录的上次的HW,并将log 文件高于HW 的部分截取掉,从HW 开始向leader 进行同步。
等该follower 的LEO 大于等于该Partition 的HW,即follower 追上leader 之后,就可以重
新加入ISR 了。
(2)leader 故障
leader 发生故障之后,会从ISR 中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
3.2.3 Exactly Once语义 (精准一次性)
将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once(至少一次)语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once(最多一次)语义
At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:
At Least Once + 幂等性= Exactly Once
要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

3.3 Kafka消费者

3.3.1 消费方式

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。

它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull 模式则可以根据consumer 的消费能力以适当的速率消费消息。

pull 模式不足之处是,如果kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。 针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为timeout。

3.3.2 分区分配策略

(只有在消费者组中才有分区策略的概念,单个消费所有的分区都归他自己没有分区策略的概念)

一个consumer group 中有多个consumer,一个 topic 有多个partition,所以必然会涉及到partition 的分配问题,即确定那个partition 由哪个consumer 来消费。Kafka 有两种分配策略,一是RoundRobin,一是Range。

1)RoundRobin(轮询调度)

分区分配策略之RoundRobin
每个消费获取的分区数相对均匀,顶多也就是有一个消息者多一个分区。
在这里插入图片描述
对于消费者组同时消费两个主题时如下图,RoundRobin策略利用TopicAndPartition类对象提出Topic1和Topic2这两个主题每个分区的哈希值,进行排序,之后再轮询调度。
多个topic订阅不同主题的时候RoundRobin利用哈希重新排序后就可能回将消费者没有订阅的主题发动也发送给消费者,所有使用RoundRobin有前提条件就是保证当前消费组中的消费者订阅的主题都是一样的
多个topic分区分配策略之RoundRobin
在这里插入图片描述

在这里插入图片描述

2)Range (kafka默认使用)

zeng
分区分配策略之Range
针对消费组中的每个消费者按一定范围分配主题分区
缺点: 会造成消费者分配分区不均匀,会造成一个消费者比其他消费者多多个分区。
在这里插入图片描述
消费者1会一直比消费者2得到的分区数多
在这里插入图片描述
Range是根绝topic->消费者,有订阅了topic的消费者才会被分到分区。在这里插入图片描述

  • 根绝业务需求选取分区策略:
    RoundRobin: 消费组中的消费者订阅的主题为同一个主题

**Range:**消费组中的消费者订阅的主题为不同的主题

  • 当消费者组中消费者数量发生变化(增加或者减少)的时候会触发分区策略

3.3.3 offset 的维护

消费的offset是按消费组保存的。

消费组+topic+分区 确认一个唯一的offset。

由于consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以consumer 需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

在这里插入图片描述

1)修改配置文件consumer.properties
如果要消费kafka用来管理消费者offset的topic:__consumer_offsets,需要修改以下配置为false

exclude.internal.topics=false

2)读取offset

0.11.0.0 之前版本:

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

0.11.0.0之后版 之后版 本(含):

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties--from-beginning

消费__consumer_offsets出来的数据

在这里插入图片描述

3.3.4 消费者组案例

1)需求

,如果生产者的属相参数linger.ms 设置为 0表示立即发送: 测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费消费到同一个topic的数据。因为一个分区对应一个消费者,而生产着是一条数据就立刻发送落盘,同一时刻只有一条数据落盘。
不同组可以同时消费一个topic中的数据。
2)案例实操
(1)在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。

[atguigu@hadoop103 config]$ vi consumer.propertiesgroup.id=atguigu
(2)在 hadoop102、hadoop103上分别启动消费者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \--zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-serverhadoop102:9092--topic first --consumer.config config/consumer.properties
(3)在 hadoop104上启动 生产者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \--broker-list hadoop102:9092 --topic first>hello world
(4)查看hadoop102和hadoop103的接收者。			同一时刻只有一个消费者接收到消息。

3.4 Kafka 高效读写数据

kafka集群读写速度快的主要原因是:分布式(分区–>并发),

单台机器也很快的原因是:顺序写磁盘,和零拷贝技术。
1)顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2)零复制技术
传统的文件拷贝
在这里插入图片描述

传统的文件拷贝通常需要从用户态去转到核心态,经过read buffer,然后再返回到用户态的应用层buffer,然后再从用户态把数据拷贝到核心态的socket buffer,然后发送到网卡。如下图所示:

在这里插入图片描述
从上图你会发现,传统的数据传输需要多次的用户态和核心态之间的切换,而且还要把数据复制多次,最终才打到网卡。

如果减少了用户态与核心态之间的切换,是不是就会更快了呢?如下图:

在这里插入图片描述
此时我们会发现用户态“空空如也”。数据没有来到用户态,而是直接在核心态就进行了传输,但这样依然还是有多次复制。首先数据被读取到read buffer中,然后发到socket buffer,最后才发到网卡。虽然减少了用户态和核心态的切换,但依然存在多次数据复制。

如果可以进一步减少数据复制的次数,甚至没有数据复制是不是就会做到最快呢?

DMA

别急,这里我们先介绍一个新的武器:DMA。

DMA,全称叫Direct Memory Access,一种可让某些硬件子系统去直接访问系统主内存,而不用依赖CPU的计算机系统的功能。听着是不是很厉害,跳过CPU,直接访问主内存。传统的内存访问都需要通过CPU的调度来完成。如下图:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3.5 Zookeeper 在Kafka 中的作用

Kafka 集群中有一个broker 会被选举为Controller,负责管理集群broker 的上下线,所有topic 的分区副本分配和leader 选举等工作。Controller 的管理工作都是依赖于Zookeeper 的。**kafka节点谁先在zk中注册谁就是controller, 一般是谁先启动谁就是controller**我们创建topic是现在zk中记录,之后再通过kafka的controller节点告诉kafka集群中其他节点tioic的相关信息(分区,分区leader,副本)**以下为kafka topic partition 的leader 选举过程:**

Leader选举流程

在这里插入图片描述

3.6 Kafka 事务

Kafka 从0.11 版本开始引入了事务支持。事务可以保证Kafka 在Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

3.6.1 Producer事务

Producer事务:保证生产者生产的数据精准一次写入kafka集群(跨分区,跨会话级别的)

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

在这里插入图片描述

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

3.6.2 Consumer事务

Consumer事务:保证消费者精准一次消费

上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。


第4章 Kafka API

4.1 Producer API

4.1.1 消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

KafkaProducer 发送消息流程

在这里插入图片描述

相关参数:

batch.size:只有数据积累到batch.size 之后,sender 才会发送数据。

linger.ms:如果数据迟迟未达到batch.size,sender 等待linger.time 之后就会发送数据。

4.1.2 异步发送API

1)导入依赖

org.apache.kafka
kafka-clients
0.11.0.0

2)编写代码

需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
1.不带回调函数的API

package com.atguigu.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.ExecutionException;public class CustomProducer {
public static void main(String[] args) throws ExecutionException InterruptedException {
Properties props = new Properties();//kafka集群,broker-list props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all");//重试次数 props.put("retries", 1);//批次大小 props.put("batch.size", 16384);//等待时间 props.put("linger.ms", 1);//RecordAccumulator缓冲区大小 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer
producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord
("first", Integer.toString(i), Integer.toString(i))); } producer.close(); }}

2.带回调函数的API

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.atguigu.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties;import java.util.concurrent.ExecutionException;public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer
producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord
("first", Integer.toString(i), Integer.toString(i)), new Callback() {
//回调函数,该方法会在Producer收到ack时调用,为异步调用 @Override public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success->" + metadata.offset()); } else {
exception.printStackTrace(); } } }); } producer.close(); }}

4.1.3 同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;import java.util.concurrent.ExecutionException;public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer
producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord
("first", Integer.toString(i), Integer.toString(i))).get(); } producer.close(); }}

4.2 Consumer API

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。所以offset的维护是Consumer消费数据是必须考虑的问题。

对于消费offset kafka 消费客户端只是在刚启动的时候从持久化数据文件中读取一次消费位置的offset去拉取数据,之后会在内存中也维护一个消费offset列表,之后会直接从内从中获取offset。只有当消费者客户端挂掉之后,才会再次去获取之前提交保存的消费offset。

4.2.1 自动提交offset

1)导入依赖

org.apache.kafka
kafka-clients
0.11.0.0

2)编写代码

需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
**enable.auto.commit:**是否开启自动提交offset功能
**auto.commit.interval.ms:**自动提交offset的时间间隔
以下为自动提交offset的代码:

package com.atguigu.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();//连接的集群 props.put("bootstrap.servers", "hadoop102:9092");//消费组 props.put("group.id", "test");//开启自动提交 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000");//反序列化key val props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//创建消费者 KafkaConsumer
consumer = new KafkaConsumer<>(props);//订阅主题 consumer.subscribe(Arrays.asList("first")); while (true) {
//获取取数 ConsumerRecords
records = consumer.poll(100);//解析打印ConsumerRecords for (ConsumerRecord
record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }}

4.2.2 手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

1)同步提交offset
由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;public class CustomComsumer {
public static void main(String[] args) Properties props = new Properties();//Kafka集群 props.put("bootstrap.servers", "hadoop102:9092");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("group.id", "test"); props.put("enable.auto.commit", "false");//关闭自动提交offset props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first"));//消费者订阅主题 while (true) {
//消费者拉取数据 ConsumerRecords
records = consumer.poll(100); for (ConsumerRecord
record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }//同步提交,当前线程会阻塞直到offset提交成功 consumer.commitSync(); } }}

2)异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
以下为异步提交offset的示例:

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.Map;import java.util.Properties;public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();//Kafka集群 props.put("bootstrap.servers", "hadoop102:9092");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("group.id", "test");//关闭自动提交offset props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first"));//消费者订阅主题 while (true) {
ConsumerRecords
records = consumer.poll(100);//消费者拉取数据 for(ConsumerRecord
record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }//异步提交 consumer.commitAsync(new OffsetCommitCallback() {
@Override public void onComplete(Map
offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" + offsets); } } }); } }}

重复消费:

如果在poll获取数据之后,处理完相应好的业务逻辑,提交offset之前,kafka消费者客户端挂掉,在次启动客户端会重新消费一次上拉取数据,造成重复消费。我们可以通过自定义存储offset ,例如使用数据库存储每次提交的offset, 在每次启动消费者客户端的时候指定分区开始消费的offset。

for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费 }

数据漏消费(消息丢失)

先提交offset后消费,有可能造成数据的漏消费;

如果在poll获取数据之后,先提交offset之前 再处理相应业务逻辑,再处理业务逻辑时kafka消费者客户端挂掉,在次启动客户端拉取的数据从上次提交的offset位置开始,而上次拉取的消息对应的业务逻辑并没有处理完成,就造成消息丢失。

3) 数据漏消费和重复消费分析

无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。

4.2.3 自定义存储offset

Kafka 0.9版本之前,offset存储在zookeeper,0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。

offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。

当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。
要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.util.*;public class CustomConsumer {
private static Map
currentOffset = new HashMap<>(); public static void main(String[] args) {
//创建配置信息 Properties props = new Properties();//Kafka集群 props.put("bootstrap.servers", "hadoop102:9092");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("group.id", "test");//关闭自动提交offset props.put("enable.auto.commit", "false");//Key和Value的反序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//创建一个消费者 KafkaConsumer
consumer = new KafkaConsumer<>(props);//消费者订阅主题 consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
//该方法会在Rebalance之前调用 @Override public void onPartitionsRevoked(Collection
partitions) {
commitOffset(currentOffset); } //该方法会在Rebalance之后调用 @Override public void onPartitionsAssigned(Collection
partitions) {
currentOffset.clear(); for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费 } } }); while (true) {
ConsumerRecords
records = consumer.poll(100);//消费者拉取数据 for (ConsumerRecord
record : records) { System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value()); currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()); } commitOffset(currentOffset);//异步提交 } } //获取某分区的最新offset private static long getOffset(TopicPartition partition) { return 0; } //提交该消费者所有分区的offset private static void commitOffset(Map
currentOffset) { }}

4.3 自定义Interceptor

4.3.1 拦截器原理

Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在

消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。

(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

4.3.2 拦截器案例

1)需求:

实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

拦截是在生产者端,不在消费者端

在这里插入图片描述

2)案例实操

(1)增加时间戳拦

实现 ProducerInterceptor<String, String>

package com.atguigu.kafka.interceptor;        import java.util.Map;        import org.apache.kafka.clients.producer.ProducerInterceptor;        import org.apache.kafka.clients.producer.ProducerRecord;        import org.apache.kafka.clients.producer.RecordMetadata;public class TimeInterceptor implements ProducerInterceptor
{
@Override public void configure(Map
configs) {
} @Override public ProducerRecord
onSend(ProducerRecord
record) {
// 创建一个新的record,把时间戳写入消息体的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
} @Override public void close() {
}}

(2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计

package com.atguigu.kafka.interceptor;        import java.util.Map;        import org.apache.kafka.clients.producer.ProducerInterceptor;        import org.apache.kafka.clients.producer.ProducerRecord;        import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor
{
private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map
configs) {
} @Override public ProducerRecord
onSend(ProducerRecord
record) {
return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数 if (exception == null) {
successCounter++; } else {
errorCounter++; } } @Override public void close() {
// 保存结果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); }}

(3)producer 主程

package com.atguigu.kafka.interceptor;        import java.util.ArrayList;        import java.util.List;        import java.util.Properties;        import org.apache.kafka.clients.producer.KafkaProducer;        import org.apache.kafka.clients.producer.Producer;        import org.apache.kafka.clients.producer.ProducerConfig;        import org.apache.kafka.clients.producer.ProducerRecord;public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 设置配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2 构建拦截链 List
interceptors = new ArrayList<>(); /**添加拦截器的顺序根据业务调整*/ //添加加时间拦截器 interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); //添加计数拦截器 interceptors.add("com.atguigu.kafka.interceptor.CounterInte rceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer
producer = new KafkaProducer<>(props);// 3 发送消息 for (int i = 0; i < 10; i++) {
ProducerRecord
record = new ProducerRecord<>(topic, "message" + i); producer.send(record); }// 4 一定要关闭producer,这样才会调用interceptor的close方法 producer.close(); }}

3)测试 )测试

(1)在 kafka上启动消费者 上启动消费者 上启动消费者 上启动消费者 ,然后运行 然后运行 然后运行 客户端 java

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \--bootstrap-serverhadoop102:9092--from-beginning --topic first

1501904047034,message0

1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9

监控Eagle

39_Kafka案例_监控Eagle的安装(14:36)

40_Kafka案例_监控Eagle的使用(17:59)

第6章 Flume对接Kafka

1)配置 flume(flume-kafka.conf)

# definea1.sources = r1a1.sinks = k1a1.channels = c1# sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.loga1.sources.r1.shell = /bin/bash -c# sinka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092a1.sinks.k1.kafka.topic = firsta1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.linger.ms = 1# channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# binda1.sources.r1.channels = c1a1.sinks.k1.channel = c1

2) 启动 kafkaIDEA消费者

3) 进入 flume根目录下,启动 根目录下,启动 flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4) 向 /opt/module/data/flume.log里追加数据,查看 里追加数据,查看 kafka

$ echohello >> /opt/module/data/flume.log

面试问题

kafka的主要作用:

异步处理
流量削峰
业务解耦

1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?

ISR(同步副本),OSR(除同步副本以为的副本)

ISR+OSR=AR

2.Kafka中的HW、LEO等分别代表什么?

LEO:指的是每个副本自己最大的offset;
HW:指的是消费者能见到的最大的offset,ISR 队列中最小的LEO。 HW是为了保障消费者消费数据的一致性和存储数据的一致性,不是为了数据丢不丢是,数据不丢失是ack的问题
在这里插入图片描述
3.Kafka中是怎么体现消息顺序性的?
:分区内有序

4.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

拦截器 -> 序列化器 -> 分区器(指定往哪个分区发送消息)

5.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?

两个线程
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

KafkaProducer 发送消息流程

在这里插入图片描述
6.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
正确 同一个消费者组中每个消费者不能消费同一个分区中的消息

(只有在消费者组中才有分区策略的概念,单个消费所有的分区都归他自己没有分区策略的概念)

一个consumer group 中有多个consumer,一个 topic 有多个partition,所以必然会涉及到partition 的分配问题,即确定那个partition 由哪个consumer 来消费。Kafka 有两种分配策略,一是RoundRobin,一是Range。
1)RoundRobin(轮询调度)
分区分配策略之RoundRobin
在这里插入图片描述
2)Range(kafka默认使用)
zeng
分区分配策略之Range
针对消费组中的每个消费者按一定范围分配主题
缺点: 会造成消费者分配分区不均匀
在这里插入图片描述
7.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
offset+1 :下一次要消费的消息

8.有哪些情形会造成重复消费?

先处理业务逻辑后提交offset

如果在poll获取数据之后,处理完相应好的业务逻辑,提交offset之前,kafka消费者客户端挂掉,在次启动客户端会重新消费一次上拉取数据,造成重复消费。我们可以通过自定义存储offset ,例如使用数据库存储每次提交的offset, 在每次启动消费者客户端的时候指定分区开始消费的offset。

消费者业务逻辑,消费了消息没有提交offset,如果消费失败,再此消费就会造成重复消费

9.那些情景会造成消息漏消费?

先提交offset后处理数据,如果业务逻辑处理异常失败就会造成本条消息漏消费

先提交offset后消费,有可能造成数据的漏消费;

如果在poll获取数据之后,先提交offset之前 再处理相应业务逻辑,再处理业务逻辑时kafka消费者客户端挂掉,在次启动客户端拉取的数据从上次提交的offset位置开始,而上次拉取的消息对应的业务逻辑并没有处理完成,就造成消息丢失。

10.分区分配的概念

Kafka 有两种分配策略,一是RoundRobin,一是Range。
在这里插入图片描述

11.kafka日志结构:

.log 实际存放数据的位置
.index 索引
在这里插入图片描述

由于生产者生产的消息会不断追加到log 文件末尾,为防止log 文件过大导致数据定位效率低下,Kafka 采取了分片索引机制,将每个partition 分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号。例如,first 这个topic 有三个分区,

则其对应的文件夹为first-0,first-1,first-2。

00000000000000000000.index00000000000000000000.log00000000000000170410.index00000000000000170410.log                   //170410为当今存储数据的最小的偏移量(offset)00000000000000239430.index00000000000000239430.log              //239430为当今存储数据的最大的偏移量(offset)

12.如果指定一个offset,kafka Controller 怎么找到对应的消息

“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。

第一步通过二分查找法根据offset找到 index文件,然后在index文件中找到log文件中消息具体的offset

在这里插入图片描述

  1. kafka Controller的作用?

    zk中kafka集群的数据发生变化,通过contoller同步给kafka集群中的每个节点。
    kafka集群临时选举出的老大来干活的,和zk打交道,将集群的元数据通知到其他节点

  2. kafka 中那些地方需要选举?选举策略是什么?

    选择Controller 增强资源,谁抢到了谁当
    选举Leader 通过ISR (0.9后前:同步的时间+信息条数)(0.9和之后:同步时间)

  3. 失效副本是指什么?有哪些对应措施

    kafka节点宕机

  4. kafka的哪些设计让他有如此高的性能?

    1.分布式, 2. 顺序写磁盘,3.零拷贝

  • kafka架构
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

转载地址:https://blog.csdn.net/yangshengwei230612/article/details/104369203 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:大数据分析框架
下一篇:ZooKeeper入门+安装

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月03日 05时49分50秒