
本文共 9018 字,大约阅读时间需要 30 分钟。
深入理解Kafka系列(七)--管理Kafka
系列文章目录
前言
本系列是我通读《Kafka权威指南》这本书做的笔录和思考。
正文
本篇文章主要从Kafka的一些脚本工具来介绍。
主题操作
创建主题
在集群当中创建一个主题需要用到3个参数(必须提供)。
- 主题名字:可以包含字母、数字、下划线、英文状态下的破折号和句号。
- 复制系数:主题的副本数量。
- 分区:主题的分区数量。
不建议在单个集群里面使用英文状态下的句号或者下划线来命名,因为主题的名字会被用在度量指标上,句号会被替换成下划线,如topic.1会变成topic_1。
标准的创建主题的格式:
./bin/kafka-topics.sh --zookeeper <zookeeper地址> --create --topic <主题名称> --replication-factor <复制系数> --partitions <分区数量>
如案例:
./bin/kafka-topics.sh --zookeeper 192.168.237.130:2181 --create --topic test19 --replication-factor 1 --partitions 3
如果出现Created topic xxx则代表创建主题成功。
忽略重复创建主题的错误:
首先我们知道,如果topic已经存在,会报错,如下图:
这里可以使用参数 –if-not-exists来忽略错误,即使主题存在,也不会抛出异常。
增加分区
有时候,我们创建完一个topic之后,我们可能需要为这个主题增加分区数量。而增加分区数量的目的主要是为了拓展主题的容量以及降低单个分区的吞吐量。并且如果要在单个消费者群组里面运行更多的消费者,那么主题数量也需要增加,因为一个分区只能由群组里的一个消费者读取。
示例:把topic的主题增加到16个
./bin/kafka-topics.sh --zookeeper 192.168.237.130:2181 --alter --topic test19 --partitions 16
输入后会出现以下提示:
对于主题分区的操作需要注意这么几个点:
1.调整基于键的主题
从消费者角度来看,为基于键的主题添加分区是很困难的,因为如果改变了分区的数量,键到分区之间的映射也会发生改变,所以,如果是基于键的主题来说,建议一开始就设置好分区数量,不要后期进行更改。
2.减少分区数量
我们无法减少分区的数量,注意,如果删除了分区,分区里的数据也会被一并删除,导致数据不一致。我们也无法将数据分配给其他分区,会导致消息乱序。所以如果要减少分区数量,只能删除整个主题,然后重新创建他。
其他操作
1.删除主题
如果说一个主题不再被使用,只要他还存在集群当中,就会占用一定数量的磁盘空间和文件句柄。为了能够删除主题,broker的delete.topic.enable参数必须设置为true。否则该参数被设置为false的话,删除主题的请求会被忽略。
具体命令:–delete
./bin/kafka-topics.sh --zookeeper 192.168.237.130:2181 --topic test19 --delete
其实Kafka并不是马上把这个主题删除,而是对这个主题进行一个标记。那么zookeeper在下一次扫描的时候,会把带有删除标记的主题从zookeeper上删除,这时候才是真正意义上的删除主题。
2.列出集群当中的所有主题:–list
./bin/kafka-topics.sh --zookeeper 192.168.237.130:2181 --list
3.列出主题的详细信息:–describe
./bin/kafka-topics.sh --zookeeper 192.168.237.130:2181 --describe
一般来说这个命令是用来排查问题的,比如:
- 结合使用 –under-replicated-partitions 参数可以列出所有包含不同步副本的分区。
- 结合使用 –unavailable-partitions 参数可以列出所有没有首领的分区,这些分区已经处于离线状态。
消费者群组
在Kafka里面,有两个地方保存着消费者群组的信息。
- 新版本的消费者:信息保存在zookeeper上。
- 旧版本的消费者:信息保存在broker上。
而kafka-consumer-group.sh工具可以用于列出上述两种消费者群组。
对于新版本的命令格式:
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.237.130:9092 --list --new-consumer
对于旧版本的命令格式:
./bin/kafka-consumer-groups.sh --zookeeper 192.168.237.130:2181 --list
这里可以看出有个消费者组:console-consumer-79432
,在这基础上,我们可以查看旧版本消费者群组的详细信息,使用 –describe --group <消费者组名称>
./bin/kafka-consumer-groups.sh --zookeeper 192.168.237.130:2181 --describe --group console-consumer-79432
结果:
参数说明:
字段 | 描述 |
---|---|
TOPIC | 正在被读取的主题名称 |
PARTITION | 正在被读取的分区ID |
CURRENT-OFFSET | 消费者群组最近提交的偏移量,也就是消费者在分区里读取的当前位置 |
LOG-END-OFFSET | 当前高水位偏移量,也就是最近一个被读取消息的偏移量 |
LAG | 消费者的CURRENT-OFFSET 和broker的LOG-END-OFFSET之间的差距 |
OWNER | 消费者群组正在读取该分区的消费者,消费者ID |
GROUP | 消费者群组的名称 |
偏移量管理
以上命令还可以获取偏移量,并保存批次的最新偏移量,从而实现偏移量的重置。
Kafka使用kafka-run-class.sh脚本来调用底层的Java实现类来实现导出。在导出偏移量的时候,会生成一个文件,文件里包含了分区和偏移量的信息。偏移量信息以一种导入工具能够识别的格式保存在文件里,每个分区在文件里占用一行。
1.导出偏移量
示例:将群组中的偏移量导出到offsets文件里
./bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect 192.168.237.130:2181 --group console-consumer-79432 --output-file offsets
查看内容:
cat offsets ## 格式为:/consumers/[消费者组名称]/offsets/topic/[主题名称]/[分区Id]:[偏移量] /consumers/console-consumer-79432/offsets/test2/0:8
2.导入偏移量:
示例:
./bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect 192.168.237.130:2181 --input-file offsets
注意:
在导入偏移量之前,必须先关闭所有的消费者,如果消费者群组处于活跃状态,他们不会读取新的偏移量,反而有可能将新导入的偏移量覆盖掉。
动态配置变更
啥叫动态配置变更,意思是,我们可以在集群处于运行状态的时候,覆盖主题、客户端的配置参数。 一旦设置完毕,他们就成为集群的永久配置,被保存在zookeeper上,broker启动的时候会读取他们。
覆盖主题的配置项
更改主题配置的命令格式如下:
kafka-configs.sh --zookeeper <zk地址:2181> --alter --entity-type topics --entity-name <主题名称> --add-config <key>=<value>,<key>=<value>……
可用的主题配置参数表如下图(内容太多了,直接截图过来):
示例:将主题test19的消息保留时间设置为1个小时
./bin/kafka-configs.sh --zookeeper 192.168.237.130:2181 --alter --entity-type topics --entity-name test19 --add-config retention.ms=3600000
覆盖客户端的配置
对于kafka客户端来说,只能覆盖生产者or消费者的配置参数,配额都以字节每秒为单位,表示客户端在每个broker上的生产or消费速率。
更改客户端配置的命令格式如下:
kafka-configs.sh --zookeeper <zk地址:2181> --alter --entity-type clients --entity-name <client ID> --add-config <key>=<value>,<key>=<value>……
可用的客户端配置参数表:
其他相关操作
1.查找被覆盖的配置
在更改了某一个默认配置后,可以使用命令行工具列出所有被覆盖过的配置,从而用于检查主题或者客户端的配置。通过–describe命令实现。
示例:
./bin/kafka-configs.sh --zookeeper 192.168.237.130:2181 --describe --entity-type topics --entity-name test19
2.删除被覆盖的配置
动态的配置完全可以被移除,从而恢复到集群的默认配置。
示例:
./bin/kafka-configs.sh --zookeeper 192.168.237.130:2181 --alter --entity-type topics --entity-name test19 --delete-config retention.ms
验证:空的则代表删除成功。
分区管理
首选的首领选举
之前提到过,使用多个分区副本可以提升可靠性。但是只有其中的一个副本可以作为分区首领,而且只有首领所在的broker可以进行生产和消费活动。Kafka将副本清单里的第一个同步副本选为首领,但是在关闭并重启broker的时候,并不会自动恢复原先首领的身份。
那么通过触发首选的副本选举,可以让broker重新获得首领。可以使用kafka-preferred-replica-election.sh工具手动触发选举。
./bin/kafka-preferred-replica-election.sh --zookeeper 192.168.237.130:2181
修改分区副本
在某些时候,可能需要修改分区的副本。
- 主题分区再整个集群里的不均匀分布造成了集群的不均衡。
- broker离线造成分区不同步。
- 新加入的broker需要从集群里面获得负载。
可以使用kafka-reassign-partitions.sh工具来修改分区。一般修改的步骤有两个:
- 根据broker清单和主题清单生成一组迁移步骤。
- 执行这些迁移步骤。
操作:
1.创建一个包含主题清单的json文件,文件格式如下:
{ "topics":[ { "topic":"test19" }, { "topic":"test" } ], "version":1}
2.执行迁移步骤:
./bin/kafka-reassign-partitions.sh --zookeeper 192.168.237.130:2181 --generate --topics-to-move-json-file topics.json --broker-list 0# 结果,输出2个json对象。# 当前的分区分配情况Current partition replica assignment{ "version":1,"partitions":[{ "topic":"test19","partition":0,"replicas":[0]},{ "topic":"test19","partition":2,"replicas":[0]},{ "topic":"test","partition":0,"replicas":[0]},{ "topic":"test19","partition":1,"replicas":[0]}]}# 建议的分区情况# 建议把第一个json对象存起来,以便在必要的时候进行回滚,第二个json对象应该被保存到另外一个文件当中# 作为kafka-reassign-partitions.sh工具的输入来执行第二个步骤Proposed partition reassignment configuration{ "version":1,"partitions":[{ "topic":"test19","partition":0,"replicas":[0]},{ "topic":"test19","partition":2,"replicas":[0]},{ "topic":"test","partition":0,"replicas":[0]},{ "topic":"test19","partition":1,"replicas":[0]}]}
将上述第二段json保存到一个文件当中,名叫reassign.json,然后执行以下命令来执行建议的分区分配方案:
./bin/kafka-reassign-partitions.sh --zookeeper 192.168.237.130:2181 --execute --reassignment-json-file reassign.json
结果:
该命令会将指定分区的副本重新分配到新的broker上。(我都是一台机器操作的,所以实际上没有改变,但是操作步骤大概就是这样)
转储日志片段
如果需要查看某个特定消息的内容,可以使用工具来解码分区的日志片段。
示例1:解码日志片段,显示消息的概要信息。
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files logs/mysql.login-0/00000000000000000000.log
示例2:解码日志片段,显示消息的数据内容。(多个参数–print-data-log)
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files logs/mysql.login-0/00000000000000000000.log --print-data-log
消费和生产
控制台生产者
kafka-console-producer.sh工具用于向Kafka主题中写入消息,默认情况下,该工具将命令行输入的每一行视为一个消息。
使用控制台生产者时,有两个参数必须指定:
- –broker-list:指定broker
- –topic:指定目标主题
./bin/kafka-console-producer.sh --broker-list 192.168.237.130:9092 --topic test2
同时,控制台生产者也可以接受配置参数,接收配置参数也有两种方式:
- 通过–producer.config <文件路径>来指定配置文件。
- 通过命令行以–producer-property KEY=VALUE来实现。
如:
./bin/kafka-console-producer.sh --broker-list 192.168.237.130:9092 --topic test2 --producer-property linger.ms=360000
还有其他的命令行参数用于调整行为:
- –key-serializer ClassName 指定消息键的编码器类名,默认是kafka.serializer.DefaultEncoder
- –value-serializer ClassName指定消息值的编码器类名,默认是kafka.serializer.DefaultEncoder
- –compression-codec String 指定生成消息所使用的压缩类型,可以是none、gzip、snappy、lz4
- –sync 指定已同步的方式生成消息,也就是说在发送下一条消息之前会等待当前消息得到确认。
控制台消费者
kafka-console-consumer.sh工具提供了一种从一个或者多个主题上读取消息的方式。该消息默认下,会打印没有经过格式化的原始消息字节。
一般有两个参数是必选的:
- zookeeper(旧版本):zookeeper的地址,ip:端口
- topic:需要连接的主题名称
如果使用的是新版本的kafka,必须使用**–new-consumer和–broker-list**
./bin/kafka-console-consumer.sh --zookeeper 192.168.237.130:2181 --topic test2
同时,控制台消费者也可以接受配置参数,接收配置参数也有两种方式:
- 通过–consumer.config <文件路径>来指定配置文件。
- 通过命令行以–consumer-property KEY=VALUE来实现。
有这么几个常用的控制台消费者的配置:
- –formatter ClassName 指定消息格式化器的类名,用于解码消息,默认值是kafka.tools.DefaultFormatter
- –from-beginning 指定从最旧的偏移量开始读取数据,否则从最新的偏移量处开始读取
- –max-messages Num 指定在退出前最多读取num个消息
- –partition Num 指定只读取Id为num的分区(需要新版本消费者)
不安全的操作
在这里,列举一些常见的操作,一般在紧急情况下可以使用,但是不建议执行,因为这些操作是不安全的。
移动集群控制器
每个Kafka集群都有一个控制器,他是运行在集群上某个broker上的线程。负责管理集群,但是有时候我们可能需要将控制器从一个controller迁移到另外一个broker上。例如:由于某些异常,控制器虽然还在运行,但是无法提供正常的功能。
操作:
- 当前控制器一般会将自己注册到zookeeper上的一个节点,这个节点处于集群路径的最顶层,名字叫做/controller。
- 我们需要手动删除这个节点,那么会释放当前控制器,集群将会进行新的控制器选举。
取消分区重分配
一般分区重分配的流程为:
- 发起重分配请求(创建Zookeeper节点)
- 集群控制器将分区添加到broker上
- 新的broker开始复制分区,直到副本达到同步状态
- 控制器从分区副本清单里面移出旧的broker
因为分区重分配是并行进行的,所以一般情况下没有理由取消一个正在进行中的重分配任务。 不过有个例外:重分配进行到一半的时候, broker发生了故障并且无法立即重启,这会导致重分配过程无法结束,进而妨碍其他重分配任务的进行, 这种情况下可以让集群取消重分配任务。
操作:
- 从zookeeper上删除/admin/reassign_partitions节点
- 重新选举控制器(删除/controllerj节点)
移出待删除的主题
一般使用命令行工具删除主题的时候,会在zookeeper上创建一个节点叫做删除主题的请求。正常情况下,集群会立即执行这个请求, 但是命令行工具不知道集群是否启用了主题删除功能,因此如果集群的主题删除功能是禁用的,那么命令行工具发起的请求会被一直挂起。 当然这种请求是可以被删除的。
操作:
- 主题的删除是通过在/admin/delete_topic节点下创建一个待删除主题为名的子节点来实现的。
- 删除对应目录下的节点即可移出被挂起的请求(待删除的主题)
手动删除主题
如果集群禁用了主题删除功能,那么可以进行手动删除,但是这个要求线下关闭及群里的所有broker。
为什么要关闭broker?
答:在集群还在运行的时候修改zookeeper里面的元数据是非常危险的,会造成集群的不稳定,所以要把broker关闭。
手动删除主题的操作:
- 关闭集群中所有的broekr
- 删除zookeeper路径中/brokers/topics/[topicName](先删除节点下的子节点)
- 删除每个broker的分区目录,这些目录的名字可能是[topicName]-Num(分区Id)
- 重启所有broker
总结
本文大概讲了这么几个点:
- kafka-topics.sh的主题操作。
- kafka-consumer-groups.sh的消费者组操作。
- kafka-run-class.sh的偏移量导入导出操作以及日志数据查看操作。
- kafka-configs.sh的主题覆盖操作。
- kafka-preferred-replica-election.sh的首领选举操作。
- kafka-reassign-partitions.sh的分区管理操作。
- kafka-console-consumer.sh和kafka-console-producer.sh的控制台消费者生产者操作。
- 一些常见的不安全操作。
基本上总结了kafka自带工具中几个常见的工具和用法。
下一篇文章准备从Kafka的流式处理来讲。
发表评论
最新留言
关于作者
