(七)HDFS写数据之写入过程
发布日期:2021-11-18 17:47:41
浏览次数:9
分类:技术文章
本文共 43295 字,大约阅读时间需要 144 分钟。
文章目录
前言
Demo
public class FileTest { public static void main(String[] args) throws IOException { //如何创建一个目录 Configuration configuration=new Configuration(); //namenode FileSystem fileSystem=FileSystem.newInstance(configuration); //TODO 创建目录(分析的是元数据的管理流程) fileSystem.mkdirs(new Path("")); /** * TODO 接下来分析HDFS上传文件的流程 * TODO 做一些重要的初始化工作 */ FSDataOutputStream fsous=fileSystem.create(new Path("/user.txt")); //TODO 完成上传文件的流程 fsous.write("fdsafdsafdsafs".getBytes()); //MR /** * master: * 启动 * worker: * 启动 * *往上面提交任务: * wordcount */ //Spark }}
public void write(int b) throws IOException { //TODO out就是DFSOutputStream //这个地方就是看大家的面向对象知识是否充足了 out.write(b); position++; if (statistics != null) { statistics.incrementBytesWritten(1); } }
在之前生成这个流的方法中,我们可以看到DFSOutputStream 是真正的流,经过我们的分析这儿的代码确实是调用DFSOutputStream类的,write方法。但是我们没有发现DFSOutputStream里面有write
public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSetcflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver () { @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { //TODO 创建了一个DFSOutputStream,做了很多初始化操作 /** * * 1) 往文件目录树里面添加了INodeFile * * 2) 添加了契约管理 * * 3) 启动了DataStreamer(写数据流程的关键服务) */ final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); //TODO FSDataOutputStream 是DFSOutputStream 进行了再一次的封装。【装饰模式】 return dfs.createWrappedOutputStream(dfsos, statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this, absF); }
所以是调用父类FSOutputSummer的方法write
public synchronized void write(int b) throws IOException { buf[count++] = (byte)b; if(count == buf.length) { //TODO 写文件 flushBuffer(); } }
protected synchronized void flushBuffer() throws IOException { //TODO 重要 flushBuffer(false, true); }
protected synchronized int flushBuffer(boolean keep, boolean flushPartial) throws IOException { int bufLen = count; int partialLen = bufLen % sum.getBytesPerChecksum(); int lenToFlush = flushPartial ? bufLen : bufLen - partialLen; if (lenToFlush != 0) { //TODO 核心的代码 //TODO HDFS文件 -》 Block文件块(128M) -》 packet(64K) = 127chunk -> chunk 512 + shunksum 4 = 516 //目录树-》目录-》文件-》Block文件块(默认128M) -》packet(64K) 大约是127个chunk -》chunk 512字节+4字节 =chunksize 516字节 writeChecksumChunks(buf, 0, lenToFlush); if (!flushPartial || keep) { count = partialLen; System.arraycopy(buf, bufLen - count, buf, 0, count); } else { count = 0; } } // total bytes left minus unflushed bytes left return count - (bufLen - lenToFlush); }
private void writeChecksumChunks(byte b[], int off, int len) throws IOException { //TODO 计算出来chunk的校验和,crc32 sum.calculateChunkedSums(b, off, len, checksum, 0); //TODO 按照chunk的大小遍历数据 for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); //TODO 一个chunk一个chunk的写数据 writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize()); } }
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src); try { //TODO 写chunk writeChunkImpl(b, offset, len, checksum, ckoff, cklen); } finally { scope.close(); } }
private synchronized void writeChunkImpl(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { dfsClient.checkOpen(); checkClosed(); if (len > bytesPerChecksum) { throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum " + bytesPerChecksum); } if (cklen != 0 && cklen != getChecksumSize()) { throw new IOException( "writeChunk() checksum size is supposed to be " + getChecksumSize() + " but found to be " + cklen); } if (currentPacket == null) { //TODO 创建packet currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno() + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + ", bytesCurBlock=" + bytesCurBlock); } } //TODO 往packet里面写 chunk的校验和 4 byte currentPacket.writeChecksum(checksum, ckoff, cklen); //TODO 往packet里面写一个chunk 512 byte currentPacket.writeData(b, offset, len); //TODO 累计一共有多少个chunk -》 packet 如果写满了127chunk 那就是一个完整的packet currentPacket.incNumChunks(); //TODO Block -> packet Block -> 128那就是写满了一个文件块 bytesCurBlock += len; // If packet is full, enqueue it for transmission // //TODO 两个条件: //1:如果写满了一个packet(127 chunk) = packet //2:一个文件块写满了 Block (128M) 2048 packet if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", src=" + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk=" + appendChunk); } //TODO 写满了一个packet,把packet入队 waitAndQueueCurrentPacket(); // If the reopened file did not end at chunk boundary and the above // write filled up its partial chunk. Tell the summer to generate full // crc chunks from now on. if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) { appendChunk = false; resetChecksumBufSize(); } if (!appendChunk) { int psize = Math.min((int) (blockSize - bytesCurBlock), dfsClient.getConf().writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. // 如果当前累计的大小 等于一个block块的大小,说明我们已经写完了一个block了。 // if (bytesCurBlock == blockSize) { //一个block写完的时候,最后一个packet是一个空的packet。 //相当于是作为一个表示,写完一个block的表示,其实里面是没有数据。用一个空包表示block写完的标志 currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); currentPacket.setSyncBlock(shouldSyncBlock); waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; } } }
private void waitAndQueueCurrentPacket() throws IOException { synchronized (dataQueue) { try { // If queue is full, then wait till we have enough space boolean firstWait = true; try { while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) { if (firstWait) { Span span = Trace.currentSpan(); if (span != null) { span.addTimelineAnnotation("dataQueue.wait"); } firstWait = false; } try { //TODO 如果队列写满了,那么就等待 dataQueue.wait(); } catch (InterruptedException e) { // If we get interrupted while waiting to queue data, we still need to get rid // of the current packet. This is because we have an invariant that if // currentPacket gets full, it will get queued before the next writeChunk. // // Rather than wait around for space in the queue, we should instead try to // return to the caller as soon as possible, even though we slightly overrun // the MAX_PACKETS length. Thread.currentThread().interrupt(); break; } } } finally { Span span = Trace.currentSpan(); if ((span != null) && (!firstWait)) { span.addTimelineAnnotation("end.wait"); } } checkClosed(); // TODO 把当前的packet加入队列 queueCurrentPacket(); } catch (ClosedChannelException e) { } } }
private void queueCurrentPacket() { synchronized (dataQueue) { if (currentPacket == null) return; currentPacket.addTraceParent(Trace.currentSpan()); //TODO 往dataQueue队列里面添加一个packet dataQueue.addLast(currentPacket); lastQueuedSeqno = currentPacket.getSeqno(); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno()); } //把当前的 currentPacket = null currentPacket = null; //TODO 这儿之所以有这个操作是因为之前一开始初始化的时候 //启动了一个DataStreamer,DataStream一直监听这这个dataQueue //如果里面没有数据就一直wait,所以现在往里面添加了packet以后 //就唤醒等待的线程,所以这儿唤醒了以后我们要知道 DataStreamer的代码就可以 //跑起来了 dataQueue.notifyAll(); } }
线程被唤醒
public void run() { long lastPacket = Time.monotonicNow(); TraceScope scope = NullScope.INSTANCE; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder //再次进来的时候这个地方就能进来了 //hasError= true if (hasError && response != null) { try { response.close(); response.join(); response = null; } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } DFSPacket one; try { // process datanode IO errors if any boolean doSleep = false; // true () if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) { //这个时候代码是可以运行到这儿的 doSleep = processDatanodeError(); } synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.monotonicNow(); //TODO 第一次进来的时候,因为没有数据所以代码走的是这儿 // dataQueue.size() == 0 while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout / 2)) || doSleep) { long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000; try { //TODO 如果dataQueue里面没有数据,代码就会阻塞在这儿。 dataQueue.wait(timeout); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.monotonicNow(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); assert one != null; } else { //TODO 往队列里面取出来packet one = dataQueue.getFirst(); // regular data packet long parents[] = one.getTraceParents(); if (parents.length > 0) { scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); // TODO: use setParents API once it's available from HTrace 3.2 // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); // scope.getSpan().setParents(parents); } } } // get new block from namenode. /** * 建立数据管道 * 向NameNode申请Block */ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } //TODO 步骤一:建立数据管道 /** * nextBlockOutputStream 这个方法里面完成了两个事: * 1)向Namenode申请block * 2) 建立数据管道 */ setPipeline(nextBlockOutputStream()); //重要 //TODO 步骤二:启动了ResponseProcessor 用来监听我们一个packet发送是否成功 initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src); } if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet Span span = null; synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { span = scope.detach(); one.setTraceSpan(span); //TODO 步骤三:从dataQueue把要发送的这个packet移除初五 dataQueue.removeFirst(); //TODO 步骤四:然后往ackQueue里面添加这个packet ackQueue.addLast(one); dataQueue.notifyAll(); } } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one); } // write out data to remote datanode TraceScope writeScope = Trace.startSpan("writeTo", span); try { //这个就是我们写数据代码 one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. //PrimaryDatanode 指的是数据管道第一個datanode tryMarkPrimaryDatanodeFailed(); //抛异常 throw e; } finally { writeScope.close(); } lastPacket = Time.monotonicNow(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // Is this block full? if (one.isLastPacketInBlock()) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } endBlock(); } if (progress != null) { progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (restartingNodeIndex.get() == -1) { DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) { setLastException((IOException) e); } else { setLastException(new IOException("DataStreamer Exception: ", e)); } //捕获到了异常 //把标识改为true hasError = true; if (errorIndex == -1 && restartingNodeIndex.get() == -1) { // Not a datanode issue streamerClosed = true; } } finally { scope.close(); } } closeInternal(); }
步骤一 建立管道
向namenode申请block
private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; StorageType[] storageTypes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; ExtendedBlock oldBlock = block; /** * * 因为申请block或者 建立数据管道,这些都是重要的操作 * 务必要执行成功,但是这些操作都涉及到网络的请求。网络的事说不准。 * 我们的代码里面不能说一次失败了就失败了,我们要进行多次尝试。 * 所以大家经常看到HDFS里面的很多地方的代码都是用的循环 * */ do { hasError = false; lastException.set(null); errorIndex = -1; success = false; DatanodeInfo[] excluded = excludedNodes.getAllPresent(excludedNodes.asMap().keySet()).keySet() .toArray(new DatanodeInfo[0]); block = oldBlock; // TODO 向NameNode申请一个block /** * 服务端那儿的操作: * 1) 创建了一个block,往文件目录树里面挂载了block的信息 * 2)在磁盘上面记录了元数据信息 * 3)在BLockMananger里面记录了block的元数据信息 * * hadoop3 */ lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); block = lb.getBlock(); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); storageTypes = lb.getStorageTypes(); // // Connect to first DataNode in the list. // TODO 其实HDFS管道的建立就是靠的这段代码完成的。 //hadoop1 hadoop2 【hadoop3】 //block 不要让我把副本再往hadoop3 success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { DFSClient.LOG.info("Abandoning " + block); //TODO 如果管道建立不成功,那么就是放弃这个block dfsClient.namenode.abandonBlock(block, fileId, src, dfsClient.clientName); block = null; DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); //hadoop3 excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); } // TODO } while (!success && --count >= 0); if (!success) { throw new IOException("Unable to create new block."); } return lb; }
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { long localstart = Time.monotonicNow(); while (true) { try { //TODO 通过RPC 调用NameNode服务端的代码 return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName().equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.info("Exception while adding a block", e); long elapsed = Time.monotonicNow() - localstart; if (elapsed > 5000) { DFSClient.LOG.info("Waiting for replication for " + (elapsed / 1000) + " seconds"); } try { DFSClient.LOG.warn( "NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { DFSClient.LOG.warn("Caught exception ", ie); } } } else { throw e; } } } } }
调用的是namenode的rpc方法
public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, String[] favoredNodes) throws IOException { checkNNStartup(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src + " fileId=" + fileId + " for " + clientName); } SetexcludedNodesSet = null; if (excludedNodes != null) { excludedNodesSet = new HashSet (excludedNodes.length); for (Node node : excludedNodes) { excludedNodesSet.add(node); } } List favoredNodesList = (favoredNodes == null) ? null : Arrays.asList(favoredNodes); //TODO 添加一个block /** * * 1) 选择三台DataNode副本机器 * 2) 修改了目录树 * 3) 存储元数据信息 * * */ LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, clientName, previous, excludedNodesSet, favoredNodesList); if (locatedBlock != null) metrics.incrAddBlockOps(); return locatedBlock; }
getAdditionalBlock的关键代码:
。。。。。。。 // choose targets for the new block to be allocated. //TODO 选择存放block的datanode主机 // HDFS 机架感知 final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock( src, replication, clientNode, excludedNodes, blockSize, favoredNodes, storagePolicyID); 。。。。。。 INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile); //TODO 修改了内存里面的目录树(修改内存里面的元数据信息) saveAllocatedBlock(src, inodesInPath, newBlock, targets); //TODO 把元数据写入到磁盘 persistNewBlock(src, pendingFile); offset = pendingFile.computeFileSize();
继续之前的方法,建立数据管道,关键代码如下
private boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { while (true) { boolean result = false; DataOutputStream out = null; try { assert null == s : "Previous socket unclosed"; assert null == blockReplyStream : "Previous blockReplyStream unclosed"; //创建了socket //socket rpc http s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); //创建输出流 OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); //创建输入流 InputStream unbufIn = NetUtils.getInputStream(s); //TODO 注意这儿是一个socket IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); unbufOut = saslStreams.out; unbufIn = saslStreams.in; //TODO 这个输出流是把客户端的数据写到DataNode上面 out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); //TODO 客户端通过这个输入流来读DataNode返回来的信息 blockReplyStream = new DataInputStream(unbufIn); // // Xmit header info to datanode // BlockConstructionStage bcs = recoveryFlag ? stage.getRecoveryStage() : stage; // We cannot change the block length in 'block' as it counts the number // of bytes ack'ed. ExtendedBlock blockCopy = new ExtendedBlock(block); blockCopy.setNumBytes(blockSize); boolean[] targetPinnings = getPinnings(nodes, true); // send the request //TODO 发送写数据的请求 //这儿其实发送的是socket请求 //datanode那儿会启动一个DataXceiver服务接受socket请求 new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); }
writeBlock的关键代码
//TODO 写数据 // 大家注意这儿写的操作就是write_block send(out, Op.WRITE_BLOCK, proto.build());
Hadoop网络其实很复杂:1)服务之间的方法的调用通信,注册,心跳 用的是Hadoop RPC2)同步元数据的时候 用的是HTTP3)写数据的时候用的是socket
在datanode端通过dataxciverServer进行数据的处理
public void run() { Peer peer = null; while (datanode.shouldRun && !datanode.shutdownForUpgrade) { try { //TODO 接收socket的请求 peer = peerServer.accept(); // Make sure the xceiver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); if (curXceiverCount > maxXceiverCount) { throw new IOException("Xceiver count " + curXceiverCount + " exceeds the limit of concurrent xcievers: " + maxXceiverCount); } //TODO 每发送过来一个block 都启动一个DataXceiver 去处理这个block new Daemon(datanode.threadGroup, DataXceiver.create(peer, datanode, this)) .start(); } catch (SocketTimeoutException ignored) { // wake up to see if should continue to run } catch (AsynchronousCloseException ace) { // another thread closed our listener socket - that's expected during shutdown, // but not in other circumstances if (datanode.shouldRun && !datanode.shutdownForUpgrade) { LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); } } catch (IOException ie) { IOUtils.cleanup(null, peer); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie); } catch (OutOfMemoryError ie) { IOUtils.cleanup(null, peer); // DataNode can run out of memory if there is too many transfers. // Log the event, Sleep for 30 seconds, other transfers may complete by // then. LOG.error("DataNode is out of memory. Will retry in 30 seconds.", ie); try { Thread.sleep(30 * 1000); } catch (InterruptedException e) { // ignore } } catch (Throwable te) { LOG.error(datanode.getDisplayName() + ":DataXceiverServer: Exiting due to: ", te); datanode.shouldRun = false; } }
DataXceiver线程的启动
/** * Read/write data from/to the DataXceiverServer. * * TODO 通过DataXceiverServer 读写 数据 */ @Override public void run() { int opsProcessed = 0; Op op = null; try { dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; try { IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut, socketIn, datanode.getXferAddress().getPort(), datanode.getDatanodeId()); input = new BufferedInputStream(saslStreams.in, HdfsConstants.SMALL_BUFFER_SIZE); socketOut = saslStreams.out; } catch (InvalidMagicNumberException imne) { if (imne.isHandshake4Encryption()) { LOG.info("Failed to read expected encryption handshake from client " + "at " + peer.getRemoteAddressString() + ". Perhaps the client " + "is running an older version of Hadoop which does not support " + "encryption"); } else { LOG.info("Failed to read expected SASL data transfer protection " + "handshake from client at " + peer.getRemoteAddressString() + ". Perhaps the client is running an older version of Hadoop " + "which does not support SASL data transfer protection"); } return; } super.initialize(new DataInputStream(input)); // We process requests in a loop, and stay around for a short timeout. // This optimistic behaviour allows the other end to reuse connections. // Setting keepalive timeout to 0 disable this behavior. do { updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1)); try { if (opsProcessed != 0) { assert dnConf.socketKeepaliveTimeout > 0; peer.setReadTimeout(dnConf.socketKeepaliveTimeout); } else { peer.setReadTimeout(dnConf.socketTimeout); } //TODO 读取我们此次数据的请求类型 option op = readOp(); } catch (InterruptedIOException ignored) { // Time out while we wait for client rpc break; } catch (IOException err) { // Since we optimistically expect the next op, it's quite normal to get EOF here. if (opsProcessed > 0 && (err instanceof EOFException || err instanceof ClosedChannelException)) { if (LOG.isDebugEnabled()) { LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else { incrDatanodeNetworkErrors(); throw err; } break; } // restore normal timeout if (opsProcessed != 0) { peer.setReadTimeout(dnConf.socketTimeout); } opStartTime = monotonicNow(); //TODO 根据操作类型处理我们的数据 processOp(op); ++opsProcessed; } while ((peer != null) && (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0)); } catch (Throwable t) { String s = datanode.getDisplayName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + " src: " + remoteAddress + " dst: " + localAddress; if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) { // For WRITE_BLOCK, it is okay if the replica already exists since // client and replication may write the same block to the same datanode // at the same time. if (LOG.isTraceEnabled()) { LOG.trace(s, t); } else { LOG.info(s + "; " + t); } } else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) { String s1 = "Likely the client has stopped reading, disconnecting it"; s1 += " (" + s + ")"; if (LOG.isTraceEnabled()) { LOG.trace(s1, t); } else { LOG.info(s1 + "; " + t); } } else { LOG.error(s, t); } } finally { if (LOG.isDebugEnabled()) { LOG.debug(datanode.getDisplayName() + ":Number of active connections is: " + datanode.getXceiverCount()); } updateCurrentThreadName("Cleaning up"); if (peer != null) { dataXceiverServer.closePeer(peer); IOUtils.closeStream(in); } } }
processOp实际是调用了writeBlock(),关键代码
public void writeBlock() throws IOException { 。。。。。。。。 if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver //TODO 创建了BlockReceiver blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy, allowLazyPersist, pinning);。。。。。。 storageUuid = blockReceiver.getStorageUuid(); } else { storageUuid = datanode.data.recoverClose( block, latestGenerationStamp, minBytesRcvd); } // // Connect to downstream machine, if appropriate //TODO 继续连接下游的机器 // if (targets.length > 0) { InetSocketAddress mirrorTarget = null; // Connect to backup machine mirrorNode = targets[0].getXferAddr(connectToDnViaHostname); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + mirrorNode); } mirrorTarget = NetUtils.createSocketAddr(mirrorNode); //TODO mirror是镜像的意思,这儿代表的副本,说白了就是它的datanode //多个副本就是递归的调用 mirrorSock = datanode.newSocket(); try { // Do not propagate allowLazyPersist to downstream DataNodes. //TODO if (targetPinnings != null && targetPinnings.length > 0) { //TODO 往下游发送socket连接 new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, false, targetPinnings[0], targetPinnings); } else { new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, false, false, targetPinnings); } // receive the block and mirror to the next target if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; //TODO 接受block blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) { if (LOG.isTraceEnabled()) { LOG.trace("TRANSFER: send close-ack"); } //TODO 返回响应 writeResponse(SUCCESS, null, replyOut); } } }
创建blockreciver()
@Override // FsDatasetSpi public synchronized ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } // create a new block FsVolumeReference ref; while (true) { try { if (allowLazyPersist) { // First try to place the block on a transient volume. ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } else { //datanode -> 配置多块磁盘 ref = volumes.getNextVolume(storageType, b.getNumBytes()); } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { datanode.getMetrics().incrRamDiskBlocksWriteFallback(); allowLazyPersist = false; continue; } throw de; } break; } FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume File f; try { //创建一个文件 f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); } catch (IOException e) { IOUtils.cleanup(null, ref); throw e; } ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return new ReplicaHandler(newReplicaInfo, ref); }
blockreciver的run方法
public void run() { boolean lastPacketInBlock = false; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (isRunning() && !lastPacketInBlock) { long totalAckTimeNanos = 0; boolean isInterrupted = false; try { Packet pkt = null; long expected = -2; PipelineAck ack = new PipelineAck(); long seqno = PipelineAck.UNKOWN_SEQNO; long ackRecvNanoTime = 0; try { //TODO 如果你不是数据管道里面的最后一个节点 if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { // read an ack from downstream datanode // TODO 读取下游数据的处理结果 ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug(myString + " got " + ack); } // Process an OOB ACK. Status oobStatus = ack.getOOBStatus(); if (oobStatus != null) { LOG.info("Relaying an out of band ack of type " + oobStatus); sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L, PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS)); continue; } seqno = ack.getSeqno(); } //如果你管道里面的最后一个节点 //我们在这个里面是看不到 他要读取下游节点的处理结果的代码 if (seqno != PipelineAck.UNKOWN_SEQNO || type == PacketResponderType.LAST_IN_PIPELINE) { //TODO pkt = waitForAckHead(seqno); if (!isRunning()) { break; } expected = pkt.seqno; if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) { throw new IOException(myString + "seqno: expected=" + expected + ", received=" + seqno); } if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) { // The total ack time includes the ack times of downstream // nodes. // The value is 0 if this responder doesn't have a downstream // DN in the pipeline. totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime; // Report the elapsed time from ack send to ack receive minus // the downstream ack time. long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos(); if (ackTimeNanos < 0) { if (LOG.isDebugEnabled()) { LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns."); } } else { datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos); } } lastPacketInBlock = pkt.lastPacketInBlock; } } catch (InterruptedException ine) { isInterrupted = true; } catch (IOException ioe) { if (Thread.interrupted()) { isInterrupted = true; } else { // continue to run even if can not read from mirror // notify client of the error // and wait for the client to shut down the pipeline mirrorError = true; LOG.info(myString, ioe); } } if (Thread.interrupted() || isInterrupted) { /* * The receiver thread cancelled this thread. We could also check any other * status updates from the receiver thread (e.g. if it is ok to write to * replyOut). It is prudent to not send any more status back to the client * because this datanode has a problem. The upstream datanode will detect that * this datanode is bad, and rightly so. * * The receiver thread can also interrupt this thread for sending an out-of-band * response upstream. */ LOG.info(myString + ": Thread is interrupted."); running = false; continue; } if (lastPacketInBlock) { // Finalize the block and close the block file finalizeBlock(startTime); } Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS; //TODO 直接往上游节点发送处理结果 sendAckUpstream(ack, expected, totalAckTimeNanos, (pkt != null ? pkt.offsetInBlock : 0), PipelineAck.combineHeader(datanode.getECN(), myStatus)); if (pkt != null) { // remove the packet from the ack queue // TODO 如果下游数据处理成功,当前datanode就会从ackQueue里面移除packet removeAckHead(); } } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { datanode.checkDiskErrorAsync(); LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption receiverThread.interrupt(); } } } catch (Throwable e) { if (running) { LOG.info(myString, e); running = false; receiverThread.interrupt(); } } } LOG.info(myString + " terminating"); }
继续前面的方法:启动了ResponseProcessor 用来监听我们一个packet发送是否成功
private void initDataStreaming() { this.setName("DataStreamer for file " + src + " block " + block); //TODO response = new ResponseProcessor(nodes); response.start(); stage = BlockConstructionStage.DATA_STREAMING; }
public void run() { setName("ResponseProcessor for block " + block); PipelineAck ack = new PipelineAck(); TraceScope scope = NullScope.INSTANCE; while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { // process responses from datanodes. try { // read an ack from the pipeline long begin = Time.monotonicNow(); //TODO 读取下游的处理结果 ack.readFields(blockReplyStream); long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { DFSClient.LOG.warn("Slow ReadProcessor read fields took " + duration + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: " + Arrays.asList(targets)); } else if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient " + ack); } long seqno = ack.getSeqno(); // processes response status from datanodes. for (int i = ack.getNumOfReplies() - 1; i >= 0 && dfsClient.clientRunning; i--) { final Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i)); // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) { restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.monotonicNow(); setRestartingNodeIndex(i); String message = "A datanode is restarting: " + targets[i]; DFSClient.LOG.info(message); throw new IOException(message); } // node error if (reply != SUCCESS) { setErrorIndex(i); // first bad datanode throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i]); } } assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack continue; } // a success ack for a data packet DFSPacket one; synchronized (dataQueue) { one = ackQueue.getFirst(); } if (one.getSeqno() != seqno) { throw new IOException("ResponseProcessor: Expecting seqno " + " for block " + block + one.getSeqno() + " but received " + seqno); } isLastPacketInBlock = one.isLastPacketInBlock(); // Fail the packet write for testing in order to force a // pipeline recovery. if (DFSClientFaultInjector.get().failPacket() && isLastPacketInBlock) { failPacket = true; throw new IOException("Failing the last packet for testing."); } // update bytesAcked block.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) { scope = Trace.continueSpan(one.getTraceSpan()); one.setTraceSpan(null); lastAckedSeqno = seqno; //TODO 如果发送成功那么就会把ackQueue里面packet移除来 ackQueue.removeFirst(); dataQueue.notifyAll(); one.releaseBuffer(byteArrayManager); } } catch (Exception e) { if (!responderClosed) { if (e instanceof IOException) { setLastException((IOException) e); } hasError = true; // If no explicit error report was received, mark the primary // node as failed. tryMarkPrimaryDatanodeFailed(); synchronized (dataQueue) { dataQueue.notifyAll(); } if (restartingNodeIndex.get() == -1) { DFSClient.LOG.warn( "DFSOutputStream ResponseProcessor exception " + " for block " + block, e); } responderClosed = true; } } finally { scope.close(); } } } //这种写法是一种很经典的写法。 void close() { responderClosed = true; //里面打断当前的线程,主要目的就是让线程快速退出。 this.interrupt(); } }
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112424458 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月22日 23时59分51秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
JavaFX图表(六)之条形图
2019-04-28
JavaFX图表(七)之散点图
2019-04-28
JavaFX图表(八)之堆积条形图
2019-04-28
JFreeChart(二)之饼图
2019-04-28
JVM发生OOM的 8 种原因、及解决办法
2019-04-28
20个高级Java面试题汇总
2019-04-28
MySQL 中的重做日志,回滚日志以及二进制日志的简单总结
2019-04-28
你真的很熟分布式和事务吗?
2019-04-28
Http 持久连接与 HttpClient 连接池
2019-04-28
Spring Boot 自动配置的 “魔法” 是如何实现的?
2019-04-28
接口方法上的注解无法被@Aspect声明的切面拦截的原因分析
2019-04-28
Mybatis 使用的 9 种设计模式,真是太有用了
2019-04-28
TCP为什么是三次握手和四次挥手
2019-04-28
web.xml 组件加载顺序
2019-04-28
关于Spring底层原理面试的那些问题,你是不是真的懂Spring?
2019-04-28
LVS三种模式的区别及负载均衡算法
2019-04-28
BATJ面试必会|Jvm 虚拟机篇
2019-04-28
Java进阶学习路线
2019-04-28
springboot整合spring @Cache和Redis
2019-04-28