本文共 3474 字,大约阅读时间需要 11 分钟。
凌云时刻 · 技术
导读:这一节来看看使用命令行启动Consumer接收消息。
作者 | 计缘
来源 | 凌云时刻(微信号:linuxpk)
Reseting Offset
在实际的业务场景中,经常需要重复消费Topic中的Message,所以来看看如何重置Offset。
首先重置Offset可以通过如下的命令:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --reset-offsets [options] --execute --topic xxxx |
Kafka为我们提供了6种重置Offset的方式,也就是命令中的options
:
--to-earliest
:重置到最早的Offset。--to-latest
:重置到最后的Offset。--to-offset <Long: offset>
:重置到指定的Offset。--to-current
:重置到当前的Offset。--to-datetime <String: datetime>
:重置到指定时间的Offset,时间格式为YYYY-MM-DDTHH:mm:SS.sss
。--shift-by <Long: number-of-offsets>
:左移或右移Offset。
举个例子来看看:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --topic first_topic --reset-offsets --shift-by -2 --executeTOPIC PARTITION NEW-OFFSETfirst_topic 2 15first_topic 1 17first_topic 0 15 |
上面的命令将consumer_group_1
消费first_topic
的三个Partitions的Offset向左移了2位。如此之后,相当于consumer_group_1
还有6条Message没有消费。我们启动Consumer看一下:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --topic first_topicCFEthis is another message.AD |
可以看到启动Consumer后,消费了6条Message。其他的Reset Options用法是一样的。这使得我们可以非常灵活的控制Consumer消费Message。
Config CLI
我们再来看看如何通过命令进行Kafka的配置。用到的命令是kafka-config.sh
,该命令可以对Topic、Broker、Client进行配置。关键的属性有以下三个:
--entity-type
:这个属性设置要对什么进行配置,可选值为topics
、brokers
、clients
、users
。--entity-name
:这个属性设置对应Type的名称,比如Topic名称、Broker Id、Client Id、User name。--alter
:确认修改。
首先我们创建一个Topic:
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic configured-topic --partitions 3 --replication-factor 1 |
看看新创建的Topic的信息:
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic configured-topic --describeTopic:configured-topic PartitionCount:3 ReplicationFactor:1 Configs:Topic: configured-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: configured-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: configured-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 |
可以看到打印信息中的Configs是空的,说明这个Topic没有做额外的配置。或者也可以使用如下命令查看Topic的配置:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --describeConfigs for topic 'configured-topic' are |
看到打印信息只有Configs for topic 'configured-topic' are
,同样说明该Topic还没有额外配置信息。
接下来该这个Topic设置min.insync.replicas
属性:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --add-config min.insync.replicas=2 --alterCompleted Updating config for entity: topic 'configured-topic'. |
再来查看一下:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --describeConfigs for topic 'configured-topic' are min.insync.replicas=2kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic configured-topic --describeTopic:configured-topic PartitionCount:3 ReplicationFactor:1 Configs:min.insync.replicas=2Topic: configured-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: configured-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: configured-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 |
两种方式都可以看到刚才更新的配置信息。
将--add-config换成--delete-config
就可以删除配置项:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --add-config min.insync.replicas=2 --alterCompleted Updating config for entity: topic 'configured-topic'. |
总结
这一章节进一步介绍了如何通过Kafka CLI操作Consumer Offset,以及如何使用Config CLI对Kafka进行配置。下一章节会介绍如何使用Kafka API编写Kafka Java Client。希望能给小伙伴们带来帮助。
END
往期精彩文章回顾
长按扫描二维码关注凌云时刻
每日收获前沿技术与科技洞见
转载地址:https://lingyun.blog.csdn.net/article/details/107036668 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!