(七)HDFS写数据之写入过程
发布日期:2021-11-18 17:47:41 浏览次数:9 分类:技术文章

本文共 43295 字,大约阅读时间需要 144 分钟。

文章目录


前言

在这里插入图片描述

Demo

public class FileTest {
public static void main(String[] args) throws IOException {
//如何创建一个目录 Configuration configuration=new Configuration(); //namenode FileSystem fileSystem=FileSystem.newInstance(configuration); //TODO 创建目录(分析的是元数据的管理流程) fileSystem.mkdirs(new Path("")); /** * TODO 接下来分析HDFS上传文件的流程 * TODO 做一些重要的初始化工作 */ FSDataOutputStream fsous=fileSystem.create(new Path("/user.txt")); //TODO 完成上传文件的流程 fsous.write("fdsafdsafdsafs".getBytes()); //MR /** * master: * 启动 * worker: * 启动 * *往上面提交任务: * wordcount */ //Spark }}
public void write(int b) throws IOException {
//TODO out就是DFSOutputStream //这个地方就是看大家的面向对象知识是否充足了 out.write(b); position++; if (statistics != null) {
statistics.incrementBytesWritten(1); } }

在之前生成这个流的方法中,我们可以看到DFSOutputStream 是真正的流,经过我们的分析这儿的代码确实是调用DFSOutputStream类的,write方法。但是我们没有发现DFSOutputStream里面有write

public FSDataOutputStream create(final Path f, final FsPermission permission,    final EnumSet
cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {
statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver
() {
@Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
//TODO 创建了一个DFSOutputStream,做了很多初始化操作 /** * * 1) 往文件目录树里面添加了INodeFile * * 2) 添加了契约管理 * * 3) 启动了DataStreamer(写数据流程的关键服务) */ final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); //TODO FSDataOutputStream 是DFSOutputStream 进行了再一次的封装。【装饰模式】 return dfs.createWrappedOutputStream(dfsos, statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException {
return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this, absF); }

所以是调用父类FSOutputSummer的方法write

public synchronized void write(int b) throws IOException {
buf[count++] = (byte)b; if(count == buf.length) {
//TODO 写文件 flushBuffer(); } }
protected synchronized void flushBuffer() throws IOException {
//TODO 重要 flushBuffer(false, true); }
protected synchronized int flushBuffer(boolean keep,      boolean flushPartial) throws IOException {
int bufLen = count; int partialLen = bufLen % sum.getBytesPerChecksum(); int lenToFlush = flushPartial ? bufLen : bufLen - partialLen; if (lenToFlush != 0) {
//TODO 核心的代码 //TODO HDFS文件 -》 Block文件块(128M) -》 packet(64K) = 127chunk -> chunk 512 + shunksum 4 = 516 //目录树-》目录-》文件-》Block文件块(默认128M) -》packet(64K) 大约是127个chunk -》chunk 512字节+4字节 =chunksize 516字节 writeChecksumChunks(buf, 0, lenToFlush); if (!flushPartial || keep) {
count = partialLen; System.arraycopy(buf, bufLen - count, buf, 0, count); } else {
count = 0; } } // total bytes left minus unflushed bytes left return count - (bufLen - lenToFlush); }
private void writeChecksumChunks(byte b[], int off, int len)  throws IOException {
//TODO 计算出来chunk的校验和,crc32 sum.calculateChunkedSums(b, off, len, checksum, 0); //TODO 按照chunk的大小遍历数据 for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); //TODO 一个chunk一个chunk的写数据 writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize()); } }
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)			throws IOException {
TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src); try {
//TODO 写chunk writeChunkImpl(b, offset, len, checksum, ckoff, cklen); } finally {
scope.close(); } }
private synchronized void writeChunkImpl(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)			throws IOException {
dfsClient.checkOpen(); checkClosed(); if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum " + bytesPerChecksum); } if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException( "writeChunk() checksum size is supposed to be " + getChecksumSize() + " but found to be " + cklen); } if (currentPacket == null) {
//TODO 创建packet currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false); if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno() + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + ", bytesCurBlock=" + bytesCurBlock); } } //TODO 往packet里面写 chunk的校验和 4 byte currentPacket.writeChecksum(checksum, ckoff, cklen); //TODO 往packet里面写一个chunk 512 byte currentPacket.writeData(b, offset, len); //TODO 累计一共有多少个chunk -》 packet 如果写满了127chunk 那就是一个完整的packet currentPacket.incNumChunks(); //TODO Block -> packet Block -> 128那就是写满了一个文件块 bytesCurBlock += len; // If packet is full, enqueue it for transmission // //TODO 两个条件: //1:如果写满了一个packet(127 chunk) = packet //2:一个文件块写满了 Block (128M) 2048 packet if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", src=" + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk=" + appendChunk); } //TODO 写满了一个packet,把packet入队 waitAndQueueCurrentPacket(); // If the reopened file did not end at chunk boundary and the above // write filled up its partial chunk. Tell the summer to generate full // crc chunks from now on. if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) {
appendChunk = false; resetChecksumBufSize(); } if (!appendChunk) {
int psize = Math.min((int) (blockSize - bytesCurBlock), dfsClient.getConf().writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. // 如果当前累计的大小 等于一个block块的大小,说明我们已经写完了一个block了。 // if (bytesCurBlock == blockSize) {
//一个block写完的时候,最后一个packet是一个空的packet。 //相当于是作为一个表示,写完一个block的表示,其实里面是没有数据。用一个空包表示block写完的标志 currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); currentPacket.setSyncBlock(shouldSyncBlock); waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; } } }
private void waitAndQueueCurrentPacket() throws IOException {
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space boolean firstWait = true; try {
while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
if (firstWait) {
Span span = Trace.currentSpan(); if (span != null) {
span.addTimelineAnnotation("dataQueue.wait"); } firstWait = false; } try {
//TODO 如果队列写满了,那么就等待 dataQueue.wait(); } catch (InterruptedException e) {
// If we get interrupted while waiting to queue data, we still need to get rid // of the current packet. This is because we have an invariant that if // currentPacket gets full, it will get queued before the next writeChunk. // // Rather than wait around for space in the queue, we should instead try to // return to the caller as soon as possible, even though we slightly overrun // the MAX_PACKETS length. Thread.currentThread().interrupt(); break; } } } finally {
Span span = Trace.currentSpan(); if ((span != null) && (!firstWait)) {
span.addTimelineAnnotation("end.wait"); } } checkClosed(); // TODO 把当前的packet加入队列 queueCurrentPacket(); } catch (ClosedChannelException e) {
} } }
private void queueCurrentPacket() {
synchronized (dataQueue) {
if (currentPacket == null) return; currentPacket.addTraceParent(Trace.currentSpan()); //TODO 往dataQueue队列里面添加一个packet dataQueue.addLast(currentPacket); lastQueuedSeqno = currentPacket.getSeqno(); if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno()); } //把当前的 currentPacket = null currentPacket = null; //TODO 这儿之所以有这个操作是因为之前一开始初始化的时候 //启动了一个DataStreamer,DataStream一直监听这这个dataQueue //如果里面没有数据就一直wait,所以现在往里面添加了packet以后 //就唤醒等待的线程,所以这儿唤醒了以后我们要知道 DataStreamer的代码就可以 //跑起来了 dataQueue.notifyAll(); } }

线程被唤醒

public void run() {
long lastPacket = Time.monotonicNow(); TraceScope scope = NullScope.INSTANCE; while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder //再次进来的时候这个地方就能进来了 //hasError= true if (hasError && response != null) {
try {
response.close(); response.join(); response = null; } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); } } DFSPacket one; try {
// process datanode IO errors if any boolean doSleep = false; // true () if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
//这个时候代码是可以运行到这儿的 doSleep = processDatanodeError(); } synchronized (dataQueue) {
// wait for a packet to be sent. long now = Time.monotonicNow(); //TODO 第一次进来的时候,因为没有数据所以代码走的是这儿 // dataQueue.size() == 0 while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout / 2)) || doSleep) {
long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000; try {
//TODO 如果dataQueue里面没有数据,代码就会阻塞在这儿。 dataQueue.wait(timeout); } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.monotonicNow(); } if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue; } // get packet to be sent. if (dataQueue.isEmpty()) {
one = createHeartbeatPacket(); assert one != null; } else {
//TODO 往队列里面取出来packet one = dataQueue.getFirst(); // regular data packet long parents[] = one.getTraceParents(); if (parents.length > 0) {
scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); // TODO: use setParents API once it's available from HTrace 3.2 // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); // scope.getSpan().setParents(parents); } } } // get new block from namenode. /** * 建立数据管道 * 向NameNode申请Block */ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Allocating new block"); } //TODO 步骤一:建立数据管道 /** * nextBlockOutputStream 这个方法里面完成了两个事: * 1)向Namenode申请block * 2) 建立数据管道 */ setPipeline(nextBlockOutputStream()); //重要 //TODO 步骤二:启动了ResponseProcessor 用来监听我们一个packet发送是否成功 initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src); } if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked synchronized (dataQueue) {
while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {
try {
// wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet Span span = null; synchronized (dataQueue) {
// move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) {
span = scope.detach(); one.setTraceSpan(span); //TODO 步骤三:从dataQueue把要发送的这个packet移除初五 dataQueue.removeFirst(); //TODO 步骤四:然后往ackQueue里面添加这个packet ackQueue.addLast(one); dataQueue.notifyAll(); } } if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one); } // write out data to remote datanode TraceScope writeScope = Trace.startSpan("writeTo", span); try {
//这个就是我们写数据代码 one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. //PrimaryDatanode 指的是数据管道第一個datanode tryMarkPrimaryDatanodeFailed(); //抛异常 throw e; } finally {
writeScope.close(); } lastPacket = Time.monotonicNow(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent; } if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue; } // Is this block full? if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked synchronized (dataQueue) {
while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue; } endBlock(); } if (progress != null) {
progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown); } } catch (Throwable e) {
// Log warning if there was a real error. if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) {
setLastException((IOException) e); } else {
setLastException(new IOException("DataStreamer Exception: ", e)); } //捕获到了异常 //把标识改为true hasError = true; if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
// Not a datanode issue streamerClosed = true; } } finally {
scope.close(); } } closeInternal(); }

步骤一 建立管道

向namenode申请block

private LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null; DatanodeInfo[] nodes = null; StorageType[] storageTypes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; ExtendedBlock oldBlock = block; /** * * 因为申请block或者 建立数据管道,这些都是重要的操作 * 务必要执行成功,但是这些操作都涉及到网络的请求。网络的事说不准。 * 我们的代码里面不能说一次失败了就失败了,我们要进行多次尝试。 * 所以大家经常看到HDFS里面的很多地方的代码都是用的循环 * */ do {
hasError = false; lastException.set(null); errorIndex = -1; success = false; DatanodeInfo[] excluded = excludedNodes.getAllPresent(excludedNodes.asMap().keySet()).keySet() .toArray(new DatanodeInfo[0]); block = oldBlock; // TODO 向NameNode申请一个block /** * 服务端那儿的操作: * 1) 创建了一个block,往文件目录树里面挂载了block的信息 * 2)在磁盘上面记录了元数据信息 * 3)在BLockMananger里面记录了block的元数据信息 * * hadoop3 */ lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); block = lb.getBlock(); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); storageTypes = lb.getStorageTypes(); // // Connect to first DataNode in the list. // TODO 其实HDFS管道的建立就是靠的这段代码完成的。 //hadoop1 hadoop2 【hadoop3】 //block 不要让我把副本再往hadoop3 success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) {
DFSClient.LOG.info("Abandoning " + block); //TODO 如果管道建立不成功,那么就是放弃这个block dfsClient.namenode.abandonBlock(block, fileId, src, dfsClient.clientName); block = null; DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); //hadoop3 excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); } // TODO } while (!success && --count >= 0); if (!success) {
throw new IOException("Unable to create new block."); } return lb; }
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) {
long localstart = Time.monotonicNow(); while (true) {
try {
//TODO 通过RPC 调用NameNode服务端的代码 return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) {
IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) {
throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
if (retries == 0) {
throw e; } else {
--retries; DFSClient.LOG.info("Exception while adding a block", e); long elapsed = Time.monotonicNow() - localstart; if (elapsed > 5000) {
DFSClient.LOG.info("Waiting for replication for " + (elapsed / 1000) + " seconds"); } try {
DFSClient.LOG.warn( "NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) {
DFSClient.LOG.warn("Caught exception ", ie); } } } else {
throw e; } } } } }

调用的是namenode的rpc方法

public LocatedBlock addBlock(String src, String clientName,      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,      String[] favoredNodes)      throws IOException {
checkNNStartup(); if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src + " fileId=" + fileId + " for " + clientName); } Set
excludedNodesSet = null; if (excludedNodes != null) {
excludedNodesSet = new HashSet
(excludedNodes.length); for (Node node : excludedNodes) {
excludedNodesSet.add(node); } } List
favoredNodesList = (favoredNodes == null) ? null : Arrays.asList(favoredNodes); //TODO 添加一个block /** * * 1) 选择三台DataNode副本机器 * 2) 修改了目录树 * 3) 存储元数据信息 * * */ LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, clientName, previous, excludedNodesSet, favoredNodesList); if (locatedBlock != null) metrics.incrAddBlockOps(); return locatedBlock; }

getAdditionalBlock的关键代码:

。。。。。。。    // choose targets for the new block to be allocated.    //TODO 选择存放block的datanode主机    // HDFS 机架感知    final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock(         src, replication, clientNode, excludedNodes, blockSize, favoredNodes,        storagePolicyID);        。。。。。。 INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);      //TODO  修改了内存里面的目录树(修改内存里面的元数据信息)      saveAllocatedBlock(src, inodesInPath, newBlock, targets);      //TODO 把元数据写入到磁盘      persistNewBlock(src, pendingFile);      offset = pendingFile.computeFileSize();

继续之前的方法,建立数据管道,关键代码如下

private boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS,				boolean recoveryFlag) {
while (true) {
boolean result = false; DataOutputStream out = null; try {
assert null == s : "Previous socket unclosed"; assert null == blockReplyStream : "Previous blockReplyStream unclosed"; //创建了socket //socket rpc http s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); //创建输出流 OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); //创建输入流 InputStream unbufIn = NetUtils.getInputStream(s); //TODO 注意这儿是一个socket IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); unbufOut = saslStreams.out; unbufIn = saslStreams.in; //TODO 这个输出流是把客户端的数据写到DataNode上面 out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); //TODO 客户端通过这个输入流来读DataNode返回来的信息 blockReplyStream = new DataInputStream(unbufIn); // // Xmit header info to datanode // BlockConstructionStage bcs = recoveryFlag ? stage.getRecoveryStage() : stage; // We cannot change the block length in 'block' as it counts the number // of bytes ack'ed. ExtendedBlock blockCopy = new ExtendedBlock(block); blockCopy.setNumBytes(blockSize); boolean[] targetPinnings = getPinnings(nodes, true); // send the request //TODO 发送写数据的请求 //这儿其实发送的是socket请求 //datanode那儿会启动一个DataXceiver服务接受socket请求 new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); }

writeBlock的关键代码

//TODO 写数据    // 大家注意这儿写的操作就是write_block    send(out, Op.WRITE_BLOCK, proto.build());
Hadoop网络其实很复杂:1)服务之间的方法的调用通信,注册,心跳 用的是Hadoop RPC2)同步元数据的时候 用的是HTTP3)写数据的时候用的是socket

在datanode端通过dataxciverServer进行数据的处理

public void run() {
Peer peer = null; while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
//TODO 接收socket的请求 peer = peerServer.accept(); // Make sure the xceiver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount + " exceeds the limit of concurrent xcievers: " + maxXceiverCount); } //TODO 每发送过来一个block 都启动一个DataXceiver 去处理这个block new Daemon(datanode.threadGroup, DataXceiver.create(peer, datanode, this)) .start(); } catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run } catch (AsynchronousCloseException ace) {
// another thread closed our listener socket - that's expected during shutdown, // but not in other circumstances if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); } } catch (IOException ie) {
IOUtils.cleanup(null, peer); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie); } catch (OutOfMemoryError ie) {
IOUtils.cleanup(null, peer); // DataNode can run out of memory if there is too many transfers. // Log the event, Sleep for 30 seconds, other transfers may complete by // then. LOG.error("DataNode is out of memory. Will retry in 30 seconds.", ie); try {
Thread.sleep(30 * 1000); } catch (InterruptedException e) {
// ignore } } catch (Throwable te) {
LOG.error(datanode.getDisplayName() + ":DataXceiverServer: Exiting due to: ", te); datanode.shouldRun = false; } }

DataXceiver线程的启动

/**   * Read/write data from/to the DataXceiverServer.   *    * TODO 通过DataXceiverServer 读写 数据   */  @Override  public void run() {
int opsProcessed = 0; Op op = null; try {
dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; try {
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut, socketIn, datanode.getXferAddress().getPort(), datanode.getDatanodeId()); input = new BufferedInputStream(saslStreams.in, HdfsConstants.SMALL_BUFFER_SIZE); socketOut = saslStreams.out; } catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) {
LOG.info("Failed to read expected encryption handshake from client " + "at " + peer.getRemoteAddressString() + ". Perhaps the client " + "is running an older version of Hadoop which does not support " + "encryption"); } else {
LOG.info("Failed to read expected SASL data transfer protection " + "handshake from client at " + peer.getRemoteAddressString() + ". Perhaps the client is running an older version of Hadoop " + "which does not support SASL data transfer protection"); } return; } super.initialize(new DataInputStream(input)); // We process requests in a loop, and stay around for a short timeout. // This optimistic behaviour allows the other end to reuse connections. // Setting keepalive timeout to 0 disable this behavior. do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1)); try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0; peer.setReadTimeout(dnConf.socketKeepaliveTimeout); } else {
peer.setReadTimeout(dnConf.socketTimeout); } //TODO 读取我们此次数据的请求类型 option op = readOp(); } catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc break; } catch (IOException err) {
// Since we optimistically expect the next op, it's quite normal to get EOF here. if (opsProcessed > 0 && (err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else {
incrDatanodeNetworkErrors(); throw err; } break; } // restore normal timeout if (opsProcessed != 0) {
peer.setReadTimeout(dnConf.socketTimeout); } opStartTime = monotonicNow(); //TODO 根据操作类型处理我们的数据 processOp(op); ++opsProcessed; } while ((peer != null) && (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0)); } catch (Throwable t) {
String s = datanode.getDisplayName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + " src: " + remoteAddress + " dst: " + localAddress; if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {
// For WRITE_BLOCK, it is okay if the replica already exists since // client and replication may write the same block to the same datanode // at the same time. if (LOG.isTraceEnabled()) {
LOG.trace(s, t); } else {
LOG.info(s + "; " + t); } } else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) {
String s1 = "Likely the client has stopped reading, disconnecting it"; s1 += " (" + s + ")"; if (LOG.isTraceEnabled()) {
LOG.trace(s1, t); } else {
LOG.info(s1 + "; " + t); } } else {
LOG.error(s, t); } } finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: " + datanode.getXceiverCount()); } updateCurrentThreadName("Cleaning up"); if (peer != null) {
dataXceiverServer.closePeer(peer); IOUtils.closeStream(in); } } }

processOp实际是调用了writeBlock(),关键代码

public void writeBlock() throws IOException {
。。。。。。。。 if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver //TODO 创建了BlockReceiver blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy, allowLazyPersist, pinning);。。。。。。 storageUuid = blockReceiver.getStorageUuid(); } else {
storageUuid = datanode.data.recoverClose( block, latestGenerationStamp, minBytesRcvd); } // // Connect to downstream machine, if appropriate //TODO 继续连接下游的机器 // if (targets.length > 0) {
InetSocketAddress mirrorTarget = null; // Connect to backup machine mirrorNode = targets[0].getXferAddr(connectToDnViaHostname); if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + mirrorNode); } mirrorTarget = NetUtils.createSocketAddr(mirrorNode); //TODO mirror是镜像的意思,这儿代表的副本,说白了就是它的datanode //多个副本就是递归的调用 mirrorSock = datanode.newSocket(); try {
// Do not propagate allowLazyPersist to downstream DataNodes. //TODO if (targetPinnings != null && targetPinnings.length > 0) {
//TODO 往下游发送socket连接 new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, false, targetPinnings[0], targetPinnings); } else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, false, false, targetPinnings); } // receive the block and mirror to the next target if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; //TODO 接受block blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) {
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack"); } //TODO 返回响应 writeResponse(SUCCESS, null, replyOut); } } }

创建blockreciver()

@Override // FsDatasetSpi  public synchronized ReplicaHandler createRbw(      StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)      throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeReference ref; while (true) {
try {
if (allowLazyPersist) {
// First try to place the block on a transient volume. ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } else {
//datanode -> 配置多块磁盘 ref = volumes.getNextVolume(storageType, b.getNumBytes()); } } catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
datanode.getMetrics().incrRamDiskBlocksWriteFallback(); allowLazyPersist = false; continue; } throw de; } break; } FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume File f; try {
//创建一个文件 f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) {
IOUtils.cleanup(null, ref); throw e; } ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); }

blockreciver的run方法

public void run() {
boolean lastPacketInBlock = false; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0; boolean isInterrupted = false; try {
Packet pkt = null; long expected = -2; PipelineAck ack = new PipelineAck(); long seqno = PipelineAck.UNKOWN_SEQNO; long ackRecvNanoTime = 0; try {
//TODO 如果你不是数据管道里面的最后一个节点 if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
// read an ack from downstream datanode // TODO 读取下游数据的处理结果 ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime(); if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack); } // Process an OOB ACK. Status oobStatus = ack.getOOBStatus(); if (oobStatus != null) {
LOG.info("Relaying an out of band ack of type " + oobStatus); sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L, PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS)); continue; } seqno = ack.getSeqno(); } //如果你管道里面的最后一个节点 //我们在这个里面是看不到 他要读取下游节点的处理结果的代码 if (seqno != PipelineAck.UNKOWN_SEQNO || type == PacketResponderType.LAST_IN_PIPELINE) {
//TODO pkt = waitForAckHead(seqno); if (!isRunning()) {
break; } expected = pkt.seqno; if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected + ", received=" + seqno); } if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream // nodes. // The value is 0 if this responder doesn't have a downstream // DN in the pipeline. totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime; // Report the elapsed time from ack send to ack receive minus // the downstream ack time. long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos(); if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns."); } } else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos); } } lastPacketInBlock = pkt.lastPacketInBlock; } } catch (InterruptedException ine) {
isInterrupted = true; } catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true; } else {
// continue to run even if can not read from mirror // notify client of the error // and wait for the client to shut down the pipeline mirrorError = true; LOG.info(myString, ioe); } } if (Thread.interrupted() || isInterrupted) {
/* * The receiver thread cancelled this thread. We could also check any other * status updates from the receiver thread (e.g. if it is ok to write to * replyOut). It is prudent to not send any more status back to the client * because this datanode has a problem. The upstream datanode will detect that * this datanode is bad, and rightly so. * * The receiver thread can also interrupt this thread for sending an out-of-band * response upstream. */ LOG.info(myString + ": Thread is interrupted."); running = false; continue; } if (lastPacketInBlock) {
// Finalize the block and close the block file finalizeBlock(startTime); } Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS; //TODO 直接往上游节点发送处理结果 sendAckUpstream(ack, expected, totalAckTimeNanos, (pkt != null ? pkt.offsetInBlock : 0), PipelineAck.combineHeader(datanode.getECN(), myStatus)); if (pkt != null) {
// remove the packet from the ack queue // TODO 如果下游数据处理成功,当前datanode就会从ackQueue里面移除packet removeAckHead(); } } catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) {
datanode.checkDiskErrorAsync(); LOG.info(myString, e); running = false; if (!Thread.interrupted()) {
// failure not caused by interruption receiverThread.interrupt(); } } } catch (Throwable e) {
if (running) {
LOG.info(myString, e); running = false; receiverThread.interrupt(); } } } LOG.info(myString + " terminating"); }

继续前面的方法:启动了ResponseProcessor 用来监听我们一个packet发送是否成功

private void initDataStreaming() {
this.setName("DataStreamer for file " + src + " block " + block); //TODO response = new ResponseProcessor(nodes); response.start(); stage = BlockConstructionStage.DATA_STREAMING; }
public void run() {
setName("ResponseProcessor for block " + block); PipelineAck ack = new PipelineAck(); TraceScope scope = NullScope.INSTANCE; while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes. try {
// read an ack from the pipeline long begin = Time.monotonicNow(); //TODO 读取下游的处理结果 ack.readFields(blockReplyStream); long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
DFSClient.LOG.warn("Slow ReadProcessor read fields took " + duration + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: " + Arrays.asList(targets)); } else if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient " + ack); } long seqno = ack.getSeqno(); // processes response status from datanodes. for (int i = ack.getNumOfReplies() - 1; i >= 0 && dfsClient.clientRunning; i--) {
final Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i)); // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.monotonicNow(); setRestartingNodeIndex(i); String message = "A datanode is restarting: " + targets[i]; DFSClient.LOG.info(message); throw new IOException(message); } // node error if (reply != SUCCESS) {
setErrorIndex(i); // first bad datanode throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i]); } } assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == DFSPacket.HEART_BEAT_SEQNO) {
// a heartbeat ack continue; } // a success ack for a data packet DFSPacket one; synchronized (dataQueue) {
one = ackQueue.getFirst(); } if (one.getSeqno() != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " + " for block " + block + one.getSeqno() + " but received " + seqno); } isLastPacketInBlock = one.isLastPacketInBlock(); // Fail the packet write for testing in order to force a // pipeline recovery. if (DFSClientFaultInjector.get().failPacket() && isLastPacketInBlock) {
failPacket = true; throw new IOException("Failing the last packet for testing."); } // update bytesAcked block.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) {
scope = Trace.continueSpan(one.getTraceSpan()); one.setTraceSpan(null); lastAckedSeqno = seqno; //TODO 如果发送成功那么就会把ackQueue里面packet移除来 ackQueue.removeFirst(); dataQueue.notifyAll(); one.releaseBuffer(byteArrayManager); } } catch (Exception e) {
if (!responderClosed) {
if (e instanceof IOException) {
setLastException((IOException) e); } hasError = true; // If no explicit error report was received, mark the primary // node as failed. tryMarkPrimaryDatanodeFailed(); synchronized (dataQueue) {
dataQueue.notifyAll(); } if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.warn( "DFSOutputStream ResponseProcessor exception " + " for block " + block, e); } responderClosed = true; } } finally {
scope.close(); } } } //这种写法是一种很经典的写法。 void close() {
responderClosed = true; //里面打断当前的线程,主要目的就是让线程快速退出。 this.interrupt(); } }

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112424458 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(八)二次开发
下一篇:(六)HDFS写流程之初始化工作

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月22日 23时59分51秒