
项目实战 从 0 到 1 学习之Flink (23)Flink 读取hive并写入hive
发布日期:2021-05-14 00:16:43
浏览次数:19
分类:博客文章
本文共 8848 字,大约阅读时间需要 29 分钟。
1���������������������������������������������������������������
HCatInputFormat
HCatInputFormatBase��������������������������������� ��������������� RichInputFormat���
public abstract class HCatInputFormatBaseextends RichInputFormat implements ResultTypeQueryabl
������������������jar������������������������
������������������������������
org.apache.flink flink-hadoop-fs 1.6.2 com.jolbox bonecp 0.8.0.RELEASE com.twitter parquet-hive-bundle 1.6.0 org.apache.hive hive-exec 2.1.0 org.apache.hive hive-metastore 2.1.0 org.apache.hive hive-cli 2.1.0 org.apache.hive hive-common 2.1.0 org.apache.hive hive-service 2.1.0 org.apache.hive hive-shims 2.1.0 org.apache.hive.hcatalog hive-hcatalog-core 2.1.0 org.apache.thrift libfb303 0.9.3 pom org.apache.flink flink-hadoop-compatibility_2.11 1.6.2 org.apache.flink flink-shaded-hadoop2 1.6.2
������hive���������
package com.coder.flink.core.FlinkHive import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.hadoop.conf.Configurationimport org.apache.flink.api.scala._ //������hive���������object ReadHive { def main(args: Array[String]): Unit = { val conf = new Configuration() conf.set("hive.metastore.local", "false") conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083") //������������������ ������������nameserver// conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083") val env = ExecutionEnvironment.getExecutionEnvironment //todo ������������ val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf)) dataset.first(10).print()// env.execute("flink hive test") } }
������������ Flink 1.9���������Hive���������������������������������Hive Jdbc������������������hive������������������������������������
org.apache.hive hive-jdbc 2.1.0
package com.coder.flink.core.FlinkHive; import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement; import java.sql.*; public class FlinkReadHive { public static void main(String[] args) throws ClassNotFoundException, SQLException { Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection con = DriverManager.getConnection("jdbc:hive2://172.10.4.143:10000/aijiami","hive","hive"); Statement st = con.createStatement(); ResultSet rs = st.executeQuery("SELECT * from ods_scenes_detail_new limit 10"); while (rs.next()){ System.out.println(rs.getString(1) + "," + rs.getString(2)); } rs.close(); st.close(); con.close(); }}
public class HiveApp { private static String driver = "org.apache.hive.jdbc.HiveDriver"; private static String url = "jdbc:hive2://Master:10000/default"; private static String user = "root"; //���������������������������������������������������������������root���������������Hive������������������������������root private static String password = ""; public static void main(String[] args) { ResultSet res = null; try { /** * ���������������JDBC��������������������������������������� */ Class.forName(driver); /** * ������������������JDBC���������Hive������������������������������10000������������������������������������ */ Connection conn = DriverManager.getConnection(url, user, password); /** * ������������������Statement������������������������������SQL������������������ */ Statement stmt = conn.createStatement(); /** * ���������������SQL������������������ * ���4.1���������������Table,��������������������������������������������� */ String tableName = "testHiveDriverTable"; stmt.execute("drop table if exists " + tableName ); stmt.execute("create table " + tableName + " (id int, name string)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'"); /** * ���4.2������������������������Table��� */ String sql = "show tables '" + tableName + "'"; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); if (res.next()) { System.out.println(res.getString(1)); } /** * ���4.3������������������������Table���schema��� */ sql = "describe " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println(res.getString(1) + "\t" + res.getString(2)); } /** * ���4.4���������������������������Hive������Table��� */ String filepath = "/root/Documents/data/sql/testHiveDriver.txt"; sql = "load data local inpath '" + filepath + "' into table " + tableName; System.out.println("Running: " + sql); stmt.execute(sql); /** * ���4.5���������������������Hive������Table������������ */ sql = "select * from " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2)); } /** * ���4.6���������Hive���������Table��������������������� */ sql = "select count(1) from " + tableName; //���������select count(*) ���������������mapreduce ������ ������������������������������������ yarn ��� start-yarn.sh System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()) { System.out.println("Total lines :" + res.getString(1)); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
������HDFS������������������
package com.coder.flink.core.test_demo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}import org.apache.flink.core.fs.FileSystem.WriteMode object WriteToHDFS { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment //2.������������ stu(age,name,height) val stu: DataSet[(Int, String, String)] = env.fromElements( (19, "zhangsan","aaaa"), (1449, "zhangsan","aaaa"), (33, "zhangsan","aaaa"), (22, "zhangsan","aaaa") ) //todo ��������������� stu.setParallelism(1).writeAsText("file:///C:/Users/Administrator/Desktop/Flink������/������������/test001.txt", WriteMode.OVERWRITE) env.execute() //todo ���������hdfs���������������,��������������������������������������� stu.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt", WriteMode.OVERWRITE) env.execute() //todo ���������hdfs���CSV������ //3.1������csv������ val inPath = "hdfs:///input/flink/sales.csv" case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double) val ds2 = env.readCsvFile[Sales]( filePath = inPath, lineDelimiter = "\n", fieldDelimiter = ",", lenient = false, ignoreFirstLine = true, includedFields = Array(0, 1, 2, 3), pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid") ) //3.2���CSV���������������hdfs val outPath = "hdfs:///output/flink/datasink/sales.csv" ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",fieldDelimiter = "|", WriteMode.OVERWRITE) env.execute() }}
发表评论
最新留言
逛到本站,mark一下
[***.202.152.39]2025年04月15日 09时54分57秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
基于 HTML5 WebGL 的污水处理厂泵站自控系统
2021-05-09
[系列] Go gRPC 调试工具
2021-05-09
django-表单之模型表单渲染(六)
2021-05-09
c++之程序流程控制
2021-05-09
spring-boot-2.0.3之redis缓存实现,不是你想的那样哦!
2021-05-09
httprunner学习23-加解密
2021-05-09
有道云笔记 同步到我的博客园
2021-05-09
李笑来必读书籍整理
2021-05-09
http头部 Expect
2021-05-09
Hadoop(十六)之使用Combiner优化MapReduce
2021-05-09
《机器学习Python实现_10_06_集成学习_boosting_gbdt分类实现》
2021-05-09
CoreCLR源码探索(八) JIT的工作原理(详解篇)
2021-05-09
IOS开发Swift笔记16-错误处理
2021-05-10
flume使用中的一些常见错误解决办法 (地址已经使用)
2021-05-10
andriod 开发错误记录
2021-05-10
C语言编译错误列表
2021-05-10
看明白这两种情况,才敢说自己懂跨链! | 喵懂区块链24期
2021-05-10
张一鸣:创业7年,我经历的5件事
2021-05-10
git拉取远程指定分支代码
2021-05-10