
数据如何从HBase读到MR
发布日期:2021-05-06 23:30:59
浏览次数:24
分类:精选文章
本文共 2930 字,大约阅读时间需要 9 分钟。
TableMapReduceUtil.initTableMapperJob是用来对内输入的,传递的参数之一,就是输入格式化类TableInputFormat.class,且会进行set操作:
job.setInputFormatClass(inputFormatClass);
TableInputFormat的父类TableInputFormatBase会创建TableRecordReader:
if (trr == null) { trr = new TableRecordReader();}Scan sc = new Scan(this.scan);sc.setStartRow(tSplit.getStartRow());sc.setStopRow(tSplit.getEndRow());trr.setScan(sc);trr.setHTable(table);return trr;
所以记录读取器用的就是TableRecordReader。它会不断往下读:
@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException { return this.recordReaderImpl.nextKeyValue();}
查看实现类,说明K-V就是rowkey-Result对象:
public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) key = new ImmutableBytesWritable(); //这里说明value就是Result对象 if (value == null) value = new Result(); try { try { //读取下一行内容 value = this.scanner.next(); if (logScannerActivity) { rowcount ++; if (rowcount >= logPerRowCount) { long now = System.currentTimeMillis(); LOG.info("Mapper took " + (now-timestamp) + "ms to process " + rowcount + " rows"); timestamp = now; rowcount = 0; } } } catch (IOException e) { // do not retry if the exception tells us not to do so if (e instanceof DoNotRetryIOException) { throw e; } // try to handle all other IOExceptions by restarting // the scanner, if the second call fails, it will be rethrown LOG.info("recovered from " + StringUtils.stringifyException(e)); if (lastSuccessfulRow == null) { LOG.warn("We are restarting the first next() invocation," + " if your mapper has restarted a few other times like this" + " then you should consider killing this job and investigate" + " why it's taking so long."); } if (lastSuccessfulRow == null) { restart(scan.getStartRow()); } else { restart(lastSuccessfulRow); scanner.next(); // skip presumed already mapped row } value = scanner.next(); numRestarts++; } if (value != null && value.size() > 0) { //这里getRow得到的就是RowKey key.set(value.getRow()); lastSuccessfulRow = key.get(); return true; } updateCounters(); return false; } catch (IOException ioe) { if (logScannerActivity) { long now = System.currentTimeMillis(); LOG.info("Mapper took " + (now-timestamp) + "ms to process " + rowcount + " rows"); LOG.info(ioe); String lastRow = lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow); LOG.info("lastSuccessfulRow=" + lastRow); } throw ioe; }}
nextKeyValue()方法用来不断往下读,value就是Result对象。往下读取,调用的是this.scanner.next()。这里的K set的就是得到的Row Key。说明数据会以Row Key作为K,以Result对象作为V。
当我们在Mapper中进行如下操作,就能拿到单元格中的信息:
Cell cell = value.getColumnLatestCell("cf".getBytes(), "line".getBytes());