
项目实战 从 0 到 1 学习之Flink (27)FlinkSql教程(一)
发布日期:2021-05-14 00:18:19
浏览次数:15
分类:博客文章
本文共 8093 字,大约阅读时间需要 26 分钟。
环境准备
安装Docker及相关镜像
由于
安装过程省略,毕竟一路下一步的东西,下面开始安装并启动kafka、mysql等服务。穷
买不起mac,所以挂的windows的链接,可自行去下载其他版本。安装zookeeper服务
- 因为kafka需要将许多信息固化存储在zk上,所以我们首先得安装zookeeper服务
- 执行
docker run -d --name zookeeper --publish 2181:2181 wurstmeister/zookeeper
,这里将2181端口绑定到本地,之后kafka才能连上zk。这样,我们的zk就安装完毕,接下来我们开始验证是否正确启动。 - 本地执行
docker ps -a
,能看到如下所示:
PS C:\Users\tzmaj> docker ps -aCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES2b8cd369aa3e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 3 seconds ago Up 2 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181>2181/tcp zookeeper
第一个值是容器id,现在我们把这个id拷出来,执行
docker exec -it 1bf952a747ef '/bin/bash'
,这样我们就进入到了容器里面。接下来进入/opt/zookeeper-3.4.13/bin
目录,执行./zkCli.sh
,进入zk的客户端root@2b8cd369aa3e:/opt/zookeeper-3.4.13/bin# ./zkCli.shConnecting to localhost:21812020-04-28 08:13:40,238 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT2020-04-28 08:13:40,241 [myid:] - INFO [main:Environment@100] - Client environment:host.name=2b8cd369aa3e2020-04-28 08:13:40,241 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.7.0_652020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation2020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre2020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/opt/zookeeper-3.4.13/bin/../build/classes:/opt/zookeeper-3.4.13/bin/../build/lib/*.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/opt/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/opt/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/opt/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/opt/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/opt/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/opt/zookeeper-3.4.13/bin/../conf:2020-04-28 08:13:40,243 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=
2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd642020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:os.version=4.19.76-linuxkit2020-04-28 08:13:40,244 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root2020-04-28 08:13:40,245 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root2020-04-28 08:13:40,245 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/opt/zookeeper-3.4.13/bin2020-04-28 08:13:40,246 [myid:] - INFO [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@674e5e21Welcome to ZooKeeper!2020-04-28 08:13:40,268 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1029] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)2020-04-28 08:13:40,278 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/127.0.0.1:2181, initiating sessionJLine support is enabled2020-04-28 08:13:40,300 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1303] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x100005e85070000, negotiated timeout = 30000WATCHER::WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] 接下来再执行
ls /
[zk: localhost:2181(CONNECTED) 0] ls /[zookeeper]
看来我们的zk应该装的没什么问题,那么一路
ctrl+d
退到最外面- 如果在执行
docker ps -a
发现STATUS那一列显示EXIT,那说明有问题,使用docker logs CONTAINER ID
查看具体日志 - 也可以使用类似于
ZooInspector
或者JAVA客户端等其他方式去连接zk,来验证zk服务是否正确启动
- 如果在执行
安装kafka服务
- 执行
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.47.44:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.47.44:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.11-0.11.0.3
注意:此处的172.17.47.44
是我本地的ip,大家在抄作业的时候,记得写上自己的名字
。接下来我们开始验证kafka能否正常使用。 - 执行
docker ps -a
找到kafka的容器id,然后执行docker exec -it e51c12e9a077 '/bin/bash'
,这样我们进入了kafka的容器里。接下来进入kafka客户端的目录/opt/kafka/bin
,执行kafka-console-producer.sh --broker-list 172.17.47.44:9092 --topic mykafka
,进入kafka生产者客户端。 - 再启动一个命令行,同样进入kafka客户端目录,然后执行
kafka-console-consumer.sh --bootstrap-server 172.17.47.44:9092 --topic mykafka --from-beginning
,这样进入kafka消费者客户端。 - 切回kafka生产者客户端窗口,输入我们准备好的JSON数据
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
,此时,观察kafka消费者客户端窗口,能看到一条数据被打印出来。如下所示:
PS C:\Users\tzmaj> docker exec -it 7a58196af291 '/bin/bash'bash-4.4# cd /opt/kafka/binbash-4.4# kafka-console-producer.sh --broker-list 172.17.47.44:9092 --topic mykafka>{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}>{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}>{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}>{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
PS C:\Users\tzmaj> docker exec -it 7a58196af291 '/bin/bash'bash-4.4# cd /opt/kafka/bin/bash-4.4# kafka-console-consumer.sh --bootstrap-server 172.17.47.44:9092 --topic mykafka --from-beginning{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}^CProcessed a total of 4 messages
- 执行
-
- 第一段是生产者,第二段是消费者,看样子我们能够正确的消费和生产数据。NICE!下面用JAVA测试一下能否正常消费
Properties properties = new Properties(); properties.put("bootstrap.servers", "172.17.47.44.:9092"); properties.put("group.id", "test1"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("mykafka")); while (true) { ConsumerRecords records = kafkaConsumer.poll(100); for (ConsumerRecord record : records) { System.out.printf(" value is =>>> %s", record.value()); } } 启动,然后观察控制台
value is =>>> {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
能够正常消费,我们的kafka客户端也没问题
安装mysql服务
执行
docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d -p 3306:3306 mysql:5.6
命令,此处的
MYSQL_ROOT_PASSWORD=123456
是将root密码设置为123456,大家可以根据个人喜爱自己修改使用Navicat连接我们的mysql
主机和端口分别就是我们的ip和设置的端口3306,密码就是上一步提到的
MYSQL_ROOT_PASSWORD
,测试连接连接成功!接下来连入我们的mysql,并执行
select CURRENT_TIMESTAMP() from dual
,返回2020-04-28 07:38:48
,看样子我们的mysql也没什么问题。
最后,数据下载地址
好了,我们Flink Sql教程系列第一课环境准备完成了,接下来开始正式进入Flink Sql的学习之旅。
发表评论
最新留言
初次前来,多多关照!
[***.217.46.12]2025年05月04日 13时26分44秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
【专题2:电子工程师 之 上位机】 之 【36.事件重载】
2019-03-09
【专题3:电子工程师 之 上位机】 之 【46.QT音频接口】
2019-03-09
一文学会JVM常见参数设置+调优经验(JDK1.8)
2019-03-09
一文理解设计模式--命令模式(Command)
2019-03-09
VTK:可视化之RandomProbe
2019-03-09
block多队列分析 - 2. block多队列的初始化
2019-03-09
Java时间
2019-03-09
不编译只打包system或者vendor image命令
2019-03-09
MySQL
2019-03-09
The wxWindows Library Licence (WXwindows)
2019-03-09
leetcode——第203题——虚拟头结点
2019-03-09
【编程】C语言入门:1到 100 的所有整数中出现多少个数字9
2019-03-09
MySQL----基础及常用命令
2019-03-09
模拟集成:MOS管的工作区小误区(简单版)
2019-03-09
flink启动(二)
2019-03-09
前端开发进阶手册.pdf
2019-03-09
软件架构设计和MESH经验之谈
2019-03-09
redis持久化分析
2019-03-09
复杂指针解析
2019-03-09
打开word时424错误
2019-03-09