项目实战 从 0 到 1 学习之Flink (21)Flink读HBase并写入HBase
发布日期:2021-05-14 00:16:41 浏览次数:25 分类:博客文章

本文共 6456 字,大约阅读时间需要 21 分钟。

���������HBase������������������������������������RichSourceFunction���������������������������������������OutputFormat������������������������������

������������������RichSourceFunction

package com.my.flink.utils.streaming.hbase; import com.my.flink.utils.config.ConfigKeys;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import java.io.IOException;import java.util.Iterator; /** * @Description hbase reader * @Author jiangxiaozhi * @Date 2018/10/17 10:05 **/public class HBaseReader extends RichSourceFunction
> { private static final Logger logger = LoggerFactory.getLogger(HBaseReader.class); private Connection conn = null; private Table table = null; private Scan scan = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); conn = HBaseConnection.getHBaseConn(); table = conn.getTable(TableName.valueOf(ConfigKeys.HBASE_SOURCE_TABLE())); scan = new Scan(); scan.setStartRow(Bytes.toBytes("1001")); scan.setStopRow(Bytes.toBytes("1004")); scan.addFamily(Bytes.toBytes(ConfigKeys.HBASE_SOURCE_CF())); } @Override public void run(SourceContext
> ctx) throws Exception { ResultScanner rs = table.getScanner(scan); Iterator
iterator = rs.iterator(); while (iterator.hasNext()) { Result result = iterator.next(); String rowkey = Bytes.toString(result.getRow()); StringBuffer sb = new StringBuffer(); for (Cell cell : result.listCells()) { String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); sb.append(value).append(","); } String valueString = sb.replace(sb.length() - 1, sb.length(), "").toString(); Tuple2
tuple2 = new Tuple2<>(); tuple2.setFields(rowkey, valueString); ctx.collect(tuple2); } } @Override public void cancel() { try { if (table != null) { table.close(); } if (conn != null) { conn.close(); } } catch (IOException e) { logger.error("Close HBase Exception:", e.toString()); } }}

������������������TableInputFormat������

 

env.createInput(new TableInputFormat[org.apache.flink.api.java.tuple.Tuple2[String, String]] {      override def mapResultToTuple(r: Result): org.apache.flink.api.java.tuple.Tuple2[String, String] = {        val rowkey = Bytes.toString(r.getRow)        val sb = new StringBuffer()        for (cell: Cell <- r.rawCells()) {          val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)          sb.append(value).append(",")        }        val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString        val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String]        tuple2.setField(rowkey, 0)        tuple2.setField(valueString, 1)        tuple2      }       override def getTableName: String = HBASE_SOURCE_TABLE       override def getScanner: Scan = {        scan      }       override def configure(parameters: Configuration): Unit = {        val conf = HBaseConfiguration.create();        conf.set(HConstants.ZOOKEEPER_QUORUM, ZOOKEEPER_QUORUM)        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, ZOOKEEPER_CLIENT_PORT)        conn = ConnectionFactory.createConnection(conf)        table = classOf[HTable].cast(conn.getTable(TableName.valueOf(HBASE_SOURCE_TABLE)))        scan = new Scan() {          setStartRow(Bytes.toBytes("1001"))          setStopRow(Bytes.toBytes("1004"))          addFamily(Bytes.toBytes(HBASE_SOURCE_CF))        }      }       override def close() = {        if (table != null) {          table.close()        }        if (conn != null) {          conn.close()        }       }    })

���������env���StreamExecutionEnvironment���

 

������HBase������������������������������������������mysql���������������������������������OutputFormat���������������������

package com.my.flink.utils.streaming.hbase; import com.my.flink.utils.config.ConfigKeys;import org.apache.flink.api.common.io.OutputFormat;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import java.io.IOException; /** * @Description HBaseOutputFormat * @Author jiangxiaozhi * @Date 2018/10/16 14:06 **/public class HBaseOutputFormat implements OutputFormat
> { private static final Logger logger = LoggerFactory.getLogger(HBaseOutputFormat.class); private org.apache.hadoop.conf.Configuration conf = null; private Connection conn = null; private Table table = null; @Override public void configure(Configuration parameters) { } @Override public void open(int taskNumber, int numTasks) throws IOException { conn = HBaseConnection.getHBaseConn(); table = conn.getTable(TableName.valueOf(ConfigKeys.HBASE_SINK_TABLE())); } @Override public void writeRecord(org.apache.flink.api.java.tuple.Tuple2
record) throws IOException { Put put = new Put(Bytes.toBytes(record.f0)); put.addColumn(Bytes.toBytes(ConfigKeys.HBASE_SINK_CF()), Bytes.toBytes("test1"), Bytes.toBytes(record.f1)); table.put(put); } @Override public void close() throws IOException { if (table != null) { table.close(); } if (conn != null) { conn.close(); } }}

���������������

������HBase���1.���������HBaseReaderenv.addSource(new HBaseReader())//������DataStream2.���������TableInputFormat���������DataStream���������������env.createInput..... ���������HBase���1.���������HBaseWritterenv.addSource(new HBaseWriter())//HBaseWritter������������������������JdbcWriter2.dataStream.writeUsingOutputFormat(new HBaseOutputFormat())
 
上一篇:项目实战 从 0 到 1 学习之Flink(22)读mysql并写入mysql
下一篇:项目实战 从 0 到 1 学习之Flink (20)Flink读取hdfs文件

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2025年04月30日 23时23分50秒