kafka集群部署及命令行操作
发布日期:2021-07-27 04:52:51
浏览次数:6
分类:技术文章
本文共 4226 字,大约阅读时间需要 14 分钟。
前期准备操作
关闭防火墙、普通的用户及文件权限
防火墙的相关操作关闭防火墙#第一步: 停止firewallsystemctl stop firewalld.service#第二步:禁止firewall开机启动systemctl disable firewalld.service#第三步:查看防火墙状态systemctl status firewalld.servic
普通的用户及文件权限
创建用户groupadd realtime && useradd realtime && usermod -a -G realtime realtime创建工作目录并赋权mkdir /exportmkdir /export/serverschmod 755 -R /export切换到realtime用户下su realtime
安装kafka
下载安装包
http://kafka.apache.org/downloads.html
在linux中使用wget命令下载安装包 wget http://mirrors.hust.edu.cn/apache/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz 解压安装包[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
修改解压后的文件名称
[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
在/opt/module/kafka目录下创建logs文件夹
[atguigu@hadoop102 kafka]$ mkdir logs
修改配置文件
[atguigu@hadoop102 kafka]$ cd config/[atguigu@hadoop102 config]$ vi server.properties
修改配置文件
cp /home/hadoop/apps/kafka/config/server.properties /home/hadoop/apps/kafka/config/server.properties.bak vi /home/hadoop/apps/kafka/config/server.properties 输入以下内容:输入以下内容:#broker的全局唯一编号,不能重复broker.id=0#删除topic功能使能delete.topic.enable=true#处理网络请求的线程数量num.network.threads=3#用来处理磁盘IO的现成数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka运行日志存放的路径 log.dirs=/opt/module/kafka/logs#topic在当前broker上的分区个数num.partitions=1#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,超时将被删除log.retention.hours=168#配置连接Zookeeper集群地址zookeeper.connect=bqy01:2181,bqy02:2181,bqy03:2181
配置环境变量(可不用操作)
[atguigu@hadoop102 module]$ sudo vi /etc/profile#KAFKA_HOMEexport KAFKA_HOME=/opt/module/kafkaexport PATH=$PATH:$KAFKA_HOME/bin[atguigu@hadoop102 module]$ source /etc/profile
分发到其他节点
[root@bqy01 kafka]# scp -r kafka bqy02:/data/module/[root@bqy01 kafka]# scp -r kafka bqy03:/data/module/
注:修改配置文件中的broker.id,broker.id不得重复
启动集群
依次在bqy01 、bqy02 、bqy03 节点上启动kafka
0.11版本[root@bqy01 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties[root@bqy02 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties[root@bqy03 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
2.11版本
[root@bqy01 kafka]$./bin/kafka-server-start.sh ./config/server.properties
后台启动
后台启动://第一种./bin/kafka-server-start.sh ./config/server.properties &//第二种nohup ./bin/kafka-server-start.sh ./config/server.properties &
前台和后台进程有什么区别?
前台相当于一个会话:程序退出就没了关闭集群
0.11版本
[root@bqy01 kafka]$ bin/kafka-server-stop.sh stop[root@bqy02 kafka]$ bin/kafka-server-stop.sh stop[root@bqy03 kafka]$ bin/kafka-server-stop.sh stop
2.11版本
[root@bqy01 kafka]$ ./bin/kafka-server-stop.sh ./config/server.properties
命令行操作
进入zkClick:
查看当前服务器所有的topic 上面两种都可以 创建topic[root@bqy02 kafka]# bin/kafka-topics.sh --zookeeper bqy02:2181 --create --replication-factor 3 --partitions 1 --topic first
[root@bqy02 kafka]#./bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 1 --partitions 1 --topic test01
两种都可以
选项说明:- topic 定义topic名
- replication-factor 定义副本数
- partitions 定义分区数
当副本数大于1时,记得要打开其他的节点,一个副本对应1个节点
删除topic
[root@bqy02 kafka]# bin/kafka-topics.sh --zookeeper bqy02:2181 \> --delete --topic firstTopic first is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
通过shell命令发送消息 开启 生产
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test01
通过shell消费消息 消费
./bin/kafka-console-consumer.sh --zookeeper bqy02:2181 --from-beginning --topic test01
Kafka 2.2.1版本启动消费者:
开三个节点./bin/kafka-console-consumer.sh --bootstrap-server bqy01:9092,bqy02:9092,bqy03:9092 --from-beginning --topic test01
开一个节点
./bin/kafka-console-consumer.sh --bootstrap-server bqy02:9092 --from-beginning --topic test01
查看某个Topic的详情
例如test01./bin/kafka-topics.sh --topic test01 --describe --zookeeper bqy02:2181
新增配置
./bin/kafka-topics.sh --zookeeper bqy02:2181 --create --topic test --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
修改配置
./bin/kafka-topics.sh --zookeeper bqy02:2181 --alter --topic test --config max.message.bytes=128000
删除配置:
./bin/kafka-topics.sh --zookeeper bqy02:2181 --alter --topic test --delete-config max.message.bytes
转载地址:https://blog.csdn.net/qq_45292079/article/details/104482938 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
不错!
[***.144.177.141]2024年09月18日 05时12分58秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
reRender属性的使用
2019-05-27
href="javascript:void(0)"
2019-05-27
h:panelGrid、h:panelGroup标签学习
2019-05-27
f:facet标签 的用法
2019-05-27
<h:panelgroup>相当于span元素
2019-05-27
java中append()的方法
2019-05-27
必学高级SQL语句
2019-05-27
经典SQL语句大全
2019-05-27
Eclipse快捷键 10个最有用的快捷键
2019-05-27
log日志记录是什么
2019-05-27
<rich:modelPanel>标签的使用
2019-05-27
<h:commandLink>和<h:inputLink>的区别
2019-05-27
<a4j:keeyAlive>的英文介绍
2019-05-27
关于list对象的转化问题
2019-05-27
VOPO对象介绍
2019-05-27
suse创建的虚拟机,修改ip地址
2019-05-27
linux的挂载的问题,重启后就挂载就没有了
2019-05-27
docker原始镜像启动容器并创建Apache服务器实现反向代理
2019-05-27
docker容器秒死的解决办法
2019-05-27
管理网&业务网的一些笔记
2019-05-27