Kafka从上手到实践 - Kafka CLI:Consumer CLI & Producer CLI | 凌云时刻
发布日期:2021-06-30 18:31:14 浏览次数:2 分类:技术文章

本文共 5138 字,大约阅读时间需要 17 分钟。

凌云时刻 · 技术

导读:这一节来看看使用命令行启动Consumer接收消息。

作者 | 计缘

来源 | 凌云时刻(微信号:linuxpk)

Consumer CLI

通过如下的命令启动Consumer:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
  • kafka-console-consumer.sh是启动Consumer的命令。

  • --bootstrap-server指定要连接的Broker地址。

  • --topic指定Topic名称,既要从哪个Topic里读数据。

如上图所示,左边启动的是Consumer,右边启动的是Producer。Producer发送的消息可以实时的被Consumer接收到。但是有一个问题,那就是在上一节中,我们已经给first_topic这个Topic发送了一些数据。但是现在Consumer启动后并没有收到。这是因为通过上面的命令启动的Consumer接收的是最新的消息,如果想接收所有的消息,还需要带一个参数:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

--from-beginning表示启动的Consumer要接收所有的消息。

前文中说过,Consumer一般都是以组的形式存在,所以可以再加一个参数来创建一个Consumer Group:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group consumer_group_1

--group可以指定Consumer Group的名称。

如上图所示,左边启动了三个Consumer,这三个Consumer都在同一个名为consumer_group_1的组里。因为first_topic这个Topic有三个Partitions,所以当一个Consumer Group中有三个Consumer时,他们的收到的信息不会重复。

又如上图所示,左边启动了三个Consumer,但是前两个在consumer_group_1的组里,最后一个在consumer_group_2的组里,所以前两个Consumer是以轮询的方式收到消息的,而最后一个Consumer可以收到全部的消息。

上面两个示例也充分证明了前文中所说的,不同的Consumer Group可以消费同一个Topic中相同的Partition的消息,但是Consumer Group内的Consumer不能消费同一个Topic中相同的Partition的消息

上面的命令是显示的创建Consumer Group。上文中说到过,Kafka中,Consumer都是以组的形式连接Broker消费数据的。那么如果只有一个Consumer的情况下,是否有Consumer Group呢?其实,当只有一个Consumer时,也会自动创建一个Consumer Group。我们可以通过另外一组Consumer Group CLI来看一下:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --listconsole-consumer-40439console-consumer-81216consumer_group_1cconsole-consumer-14387consumer_group_2consumer_group_1console-consumer-40563

可以看到,已经存在的Consumer Group中,除了我们之前创建的,还有以console-consumer-xxxxx这种命名格式存在的Consumer Group。这就是当我们只启动一个Consumer时Kafka自动为这个Consumer创建的Consumer Group。这里可以做个实验,先启动一个Consumer:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic

然后再来看看Consumer Group是否有增加:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --listconsole-consumer-96752console-consumer-40439console-consumer-81216consumer_group_1cconsole-consumer-14387consumer_group_2consumer_group_1console-consumer-40563

我们看到增加了一个Consumer Groupconsole-consumer-96752

Consumer Group列表看完了,再来看看某一个Consumer Group的详细信息,比如查看consumer_group_1的详细信息。可以使用如下命令:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1Consumer group 'consumer_group_1' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDfirst_topic     0          17              17              0               -               -               -first_topic     2          17              17              0               -               -               -first_topic     1          18              18              0               -               -

首先会告诉我们该Consumer Group中是否有正在活跃的Consumer,目前没有启动任何Consumer,所以提示我们Consumer group 'consumer_group_1' has no active members.

然后会列出该Consumer Group消费的Topic、Partition情况、Offset情况、延迟(LAG)情况、处于活跃状态的Consumer信息。

可以看到consumer_group_1这个Consumer Group正在消费first_topic这个Topic中的Message,一共从三个Partition中消费了52条Messages,并且目前已经消费了全部的数据,因为每个Partition的延迟都是0,说明没有还未接收的Message。

现在我们再往first_topic中发送一条Message,再来看看情况如何:

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=1>this is another message.kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1Consumer group 'consumer_group_1' has no active members.TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDfirst_topic     0          17              17              0               -               -               -first_topic     2          17              17              0               -               -               -first_topic     1          18              19              1               -               -               -

可以看到Partition 1 的LOG-END-OFFSET是19,而CURRENT-OFFSET是18,并且Partition 1 的LAG是1,说明现在first-topic一共接收到了19条Message,而consumer-group-1只消费了18条,有1条延迟。

我们再启动consumer_group_1中的Consumer,然后再看看数据:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --group consumer_group_1this is another message.kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-IDfirst_topic     0          17              17              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1first_topic     1          19              19              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1first_topic     2          17              17              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1

可以看到,目前有一个处于活跃的Consumer,并且Messages全部被消费。

总结

这一章节主要介绍了如何使用Kafka的Consumer CLI接收Producer生产的Message。同时能更直观的印证之前介绍概念时提到的内容,比如Consumer Group的机制、Offset机制等。下一章节进一步介绍Offset的操作以及Config CLI。希望能给小伙伴们带来帮助。

END

往期精彩文章回顾

长按扫描二维码关注凌云时刻

每日收获前沿技术与科技洞见

转载地址:https://lingyun.blog.csdn.net/article/details/107036665 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Kafka从上手到实践 - Kafka CLI:Reseting Offset & Config CLI | 凌云时刻
下一篇:Kafka从上手到实践 - 实践真知:搭建单机Kafka | 凌云时刻

发表评论

最新留言

不错!
[***.144.177.141]2024年04月26日 10时19分48秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章