数据如何从HBase读到MR
发布日期:2021-05-06 23:30:59 浏览次数:24 分类:精选文章

本文共 2930 字,大约阅读时间需要 9 分钟。

TableMapReduceUtil.initTableMapperJob是用来对内输入的,传递的参数之一,就是输入格式化类TableInputFormat.class,且会进行set操作:

job.setInputFormatClass(inputFormatClass);

TableInputFormat的父类TableInputFormatBase会创建TableRecordReader:

if (trr == null) {     trr = new TableRecordReader();}Scan sc = new Scan(this.scan);sc.setStartRow(tSplit.getStartRow());sc.setStopRow(tSplit.getEndRow());trr.setScan(sc);trr.setHTable(table);return trr;

所以记录读取器用的就是TableRecordReader。它会不断往下读:

@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {     return this.recordReaderImpl.nextKeyValue();}

查看实现类,说明K-V就是rowkey-Result对象:

public boolean nextKeyValue() throws IOException, InterruptedException {     if (key == null) key = new ImmutableBytesWritable();  //这里说明value就是Result对象  if (value == null) value = new Result();  try {       try {         //读取下一行内容      value = this.scanner.next();      if (logScannerActivity) {           rowcount ++;        if (rowcount >= logPerRowCount) {             long now = System.currentTimeMillis();          LOG.info("Mapper took " + (now-timestamp)            + "ms to process " + rowcount + " rows");          timestamp = now;          rowcount = 0;        }      }    } catch (IOException e) {         // do not retry if the exception tells us not to do so      if (e instanceof DoNotRetryIOException) {           throw e;      }      // try to handle all other IOExceptions by restarting      // the scanner, if the second call fails, it will be rethrown      LOG.info("recovered from " + StringUtils.stringifyException(e));      if (lastSuccessfulRow == null) {           LOG.warn("We are restarting the first next() invocation," +            " if your mapper has restarted a few other times like this" +            " then you should consider killing this job and investigate" +            " why it's taking so long.");      }      if (lastSuccessfulRow == null) {           restart(scan.getStartRow());      } else {           restart(lastSuccessfulRow);        scanner.next();    // skip presumed already mapped row      }      value = scanner.next();      numRestarts++;    }    if (value != null && value.size() > 0) {         //这里getRow得到的就是RowKey      key.set(value.getRow());      lastSuccessfulRow = key.get();      return true;    }    updateCounters();    return false;  } catch (IOException ioe) {       if (logScannerActivity) {         long now = System.currentTimeMillis();      LOG.info("Mapper took " + (now-timestamp)        + "ms to process " + rowcount + " rows");      LOG.info(ioe);      String lastRow = lastSuccessfulRow == null ?        "null" : Bytes.toStringBinary(lastSuccessfulRow);      LOG.info("lastSuccessfulRow=" + lastRow);    }    throw ioe;  }}

nextKeyValue()方法用来不断往下读,value就是Result对象。往下读取,调用的是this.scanner.next()。这里的K set的就是得到的Row Key。说明数据会以Row Key作为K,以Result对象作为V。

当我们在Mapper中进行如下操作,就能拿到单元格中的信息:

Cell cell = value.getColumnLatestCell("cf".getBytes(), "line".getBytes());
上一篇:RDB和AOF的持久化配置
下一篇:数据如何写入到HBase

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2025年04月06日 19时51分34秒