(八)二次开发
发布日期:2021-11-18 17:47:42
浏览次数:7
分类:技术文章
本文共 17132 字,大约阅读时间需要 57 分钟。
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
前言
源码内容的总结
1) namenode启动的流程2) Datanode的启动流程(初始化,注册,心跳)3) 元数据的管理 双缓冲机制 standBy Namenode从journalnode同步元数据 standBy Namenode合并元数据,替换active namenode的fsImage文件4)上传文件里面有一个契约的机制创建空间申请block写文件(chunk(chunk+chunksum) -> packet -> block -> INodeFile -> INodeDirectory)
双缓冲机制的回顾:
package com.kaikeba.hdfsdemo.lesson02;import java.util.LinkedList;/** * 管理元数据 * 磁盘 * @author Administrator * * 这段代码我是模仿hadoop的源码的写的,大家一定要掌握 * 后面我们要修改这段代码 * 其实我现在写的这段代码跟hadoop的源码的相似有90%的相似 * 5% * 5% * */public class FSEdit { public static void main(String[] args) { FSEdit fs=new FSEdit(); for (int i = 0; i < 1000; i++) { new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 100; j++) { fs.logEdit("日志"); } } }).start(); } } long taxid=0L;// DoubleBuffer doubleBuffer=new DoubleBuffer(); //每个线程自己拥有的副本 ThreadLocalthreadLocal=new ThreadLocal (); //是否后台正在把数据同步到磁盘上 public boolean isSyncRunning =false; //正在同步磁盘 的内存块里面最大的一个ID号。 long maxtaxid=0L; boolean isWait=false; /** * 写元数据日志的核心方法 * @param log */ private void logEdit(String log) { // 老师性能能好吗?你都加锁了 //新能是OK的,为什么,这把锁里面是往内存里面写数据 //能支持很高的并发。 //超高并发 //线程1 1 线程2 2 线程3 3 //线程4 5 synchronized (this) { taxid++; threadLocal.set(taxid); EditLog editLog=new EditLog(taxid,log); //往内存里面写东西 doubleBuffer.write(editLog); } //释放锁 //把数据持久化到磁盘 //代码运行到这儿的时候 currentBuffer 内存里面已经有3条数据了。 //没有加锁,这叫分段加锁 //重新加锁 logFlush(); } private void logFlush() { //重新加锁 线程2 synchronized(this) { if(isSyncRunning) { //false true //获取当前线程的是事务ID //2 long localTaxid=threadLocal.get(); //2 < 3 //5 < 3 if(localTaxid <= maxtaxid) { return ; } if(isWait) { return; } isWait=true; while(isSyncRunning) { try { //一直等待 //wait这个操作是释放锁 this.wait(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } isWait=false; // } //代码就走到这儿 //syncBuffer(1,2,3) doubleBuffer.exchange(); //maxtaxid = 3 if(doubleBuffer.syncBuffer.size() > 0) { maxtaxid=doubleBuffer.getMaxTaxid(); } isSyncRunning=true; }//释放锁 //把数据持久化到磁盘,比较耗费性能的。 // 1 2 3写到磁盘 doubleBuffer.flush(); //分段加锁 synchronized (this) { //修改标志位 isSyncRunning=false; //唤醒wait; this.notifyAll(); } } /** * 我用面向对象的思想,设计一个对象 * 代表着一条元数据信息 * @author Administrator * */ public class EditLog{ //事务的ID public long taxid; public String log; public EditLog(long taxid, String log) { this.taxid = taxid; this.log = log; } @Override public String toString() { return "EditLog [taxid=" + taxid + ", log=" + log + "]"; } } public class DoubleBuffer{ //写数据,有序队列 LinkedList currentBuffer=new LinkedList (); //用来把数据持久化到磁盘上面的内存 LinkedList syncBuffer=new LinkedList (); /** * 写元数据信息 * @param editLog */ public void write(EditLog editLog){ currentBuffer.add(editLog); } /** * 把数据写到磁盘 */ public void flush() { for(EditLog editLog:syncBuffer) { //把打印出来,我们就认为这就是写到磁盘了 System.out.println(editLog); } syncBuffer.clear(); } /** * 交换一下内存 */ public void exchange() { LinkedList tmp=currentBuffer; currentBuffer=syncBuffer; syncBuffer=tmp; } /** * 获取到正在同步数据的内存里面事务ID最大的ID * @return */ public long getMaxTaxid() { return syncBuffer.getLast().taxid; } }}
void logEdit(final FSEditLogOp op) { synchronized (this) { assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled //一开始不需要等待 waitIfAutoSyncScheduled(); //最重要的就是生成了全局唯一的事务ID(日志) // name editlog_0000000000_0000000012.log // editlog_0000000013_0000000022.log //TODO 步骤一:获取当前的独一无二的事务ID long start = beginTransaction(); op.setTransactionId(txid); try { /** * 1) namenode editlog 文件缓冲里面 * 2) journalnode的内存缓冲 */ //TODO 步骤二:把元数据写入到内存缓冲 editLogStream.write(op); } catch (IOException ex) { // All journals failed, it is handled in logSync. } finally { op.reset(); } endTransaction(start); // check if it is time to schedule an automatic sync // > 512 true !true=false,进行元数据初始化的条件 if (!shouldForceSync()) { return; } //TODO 如果到这儿就说明 缓冲区存满了 isAutoSyncScheduled = true; } //TODO 把数据持久化到磁盘 logSync(); }
public boolean shouldForceSync() { // 如果当前记录日志的这个内存的大小 如果 大于 512kb = true return bufCurrent.size() >= initBufferSize; }
存在的问题:
Namenode设计了两块内存,但是每块内存的值定死了是512K。如果bufReady内存正在把数据同步到磁盘,然后bufCurrent已经写满了,那么就会让Namenode处于wait状态,不干活,影响性能。- 上面的initBufferSize是不可配置的,写死的,在超高并发的情况下,写内存一下子就满了,但是同步内存还没写完,所以要把initBufferSize变成可配置的,可解决namenode的短暂不工作,读取配置文件core-site.xml,读取配置文件;
第二个问题
Bug分析
Nmaenode 的高可用机制在有元数据写入的时候,同时也要把数据写入到Journalnode集群。如果一个元数据请求信息一次能写入大多数的Journalnode就写入成功。但是写的过程如果超时了,Namenode捕获到timeout异常就会退出。如果写超时是因为Journalnode服务异常而超时了,那么Namenode为了数据安全退出是对的。但是大家想一下,如果写的时候刚好Namenode发生Full GC了,Full GC有可能是几十秒,甚至是几分钟,这样就导致Namenode timeout异常退出了显然有问题。
void logEdit(final FSEditLogOp op) { synchronized (this) { assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled //一开始不需要等待 waitIfAutoSyncScheduled(); //最重要的就是生成了全局唯一的事务ID(日志) // name editlog_0000000000_0000000012.log // editlog_0000000013_0000000022.log //TODO 步骤一:获取当前的独一无二的事务ID long start = beginTransaction(); op.setTransactionId(txid); try { /** * 1) namenode editlog 文件缓冲里面 * 2) journalnode的内存缓冲 */ //TODO 步骤二:把元数据写入到内存缓冲 editLogStream.write(op); } catch (IOException ex) { // All journals failed, it is handled in logSync. } finally { op.reset(); } endTransaction(start); // check if it is time to schedule an automatic sync // > 512 true !true=false if (!shouldForceSync()) { return; } //TODO 如果到这儿就说明 缓冲区存满了 isAutoSyncScheduled = true; } //TODO 把数据持久化到磁盘 logSync(); }
public void logSync() { long syncStart = 0; // Fetch the transactionId of this thread. long mytxid = myTransactionId.get().txid; boolean sync = false; try { EditLogOutputStream logStream = null; synchronized (this) { try { printStatistics(false); //TODO 如果有人已经在刷磁盘了,当前线程就不用刷写磁盘了 while (mytxid > synctxid && isSyncRunning) { try { //释放锁 wait(1000); } catch (InterruptedException ie) { } } // // If this transaction was already flushed, then nothing to do // if (mytxid <= synctxid) { numTransactionsBatchedInSync++; if (metrics != null) { // Metrics is non-null only when used inside name node metrics.incrTransactionsBatchedInSync(); } return; } // now, this thread will do the sync syncStart = txid; isSyncRunning = true; sync = true; // swap buffers try { if (journalSet.isEmpty()) { throw new IOException("No journals available to flush"); } //TODO 交换内存缓冲 editLogStream.setReadyToFlush(); } catch (IOException e) { final String msg = "Could not sync enough journals to persistent storage " + "due to " + e.getMessage() + ". " + "Unsynced transactions: " + (txid - synctxid); LOG.fatal(msg, new Exception()); synchronized(journalSetLock) { IOUtils.cleanup(LOG, journalSet); } terminate(1, msg); } } finally { // Prevent RuntimeException from blocking other log edit write //TODO 恢复标志位 和 唤醒等待的线程 doneWithAutoSyncScheduling(); } //editLogStream may become null, //so store a local variable for flush. logStream = editLogStream; }//TODO 释放锁 // do the sync long start = monotonicNow(); try { if (logStream != null) { //把数据写到磁盘 //默默的在刷写磁盘就可以。 //因为这个是比较耗费时间的操作,有可能耗费几十毫秒。 /** * 內存一: 服務于namenode的内存 * 內存二: 服務于journalnode的内存 */ //TODO 把数据写入到磁盘 logStream.flush(); } } catch (IOException ex) { synchronized (this) { //打印日志 final String msg = "Could not sync enough journals to persistent storage. " + "Unsynced transactions: " + (txid - synctxid); //如果我们的程序里面发生了fatal 级别日志,这个错误 //就是灾难型的错误。 LOG.fatal(msg, new Exception()); synchronized(journalSetLock) { IOUtils.cleanup(LOG, journalSet); } //执行这段代码 terminate(1, msg); } } long elapsed = monotonicNow() - start; if (metrics != null) { // Metrics non-null only when used inside name node metrics.addSync(elapsed); } } finally { // Prevent RuntimeException from blocking other log edit sync synchronized (this) { if (sync) { synctxid = syncStart; //TODO 恢复标志位 isSyncRunning = false; } //TODO 唤醒线程 this.notifyAll(); } } }
protected void flushAndSync(boolean durable) throws IOException { int numReadyBytes = buf.countReadyBytes(); if (numReadyBytes > 0) { int numReadyTxns = buf.countReadyTxns(); long firstTxToFlush = buf.getFirstReadyTxId(); assert numReadyTxns > 0; // Copy from our double-buffer into a new byte array. This is for // two reasons: // 1) The IPC code has no way of specifying to send only a slice of // a larger array. // 2) because the calls to the underlying nodes are asynchronous, we // need a defensive copy to avoid accidentally mutating the buffer // before it is sent. DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes); buf.flushTo(bufToSend); assert bufToSend.getLength() == numReadyBytes; byte[] data = bufToSend.getData(); assert data.length == bufToSend.getLength(); //把数据写入到journlanode QuorumCallqcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); //TODO 这是一个阻塞的方法等待写入到journalnode集群处理结果 loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits"); // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); } }
flushAndSync接口由其实现
protected void flushAndSync(boolean durable) throws IOException { int numReadyBytes = buf.countReadyBytes(); if (numReadyBytes > 0) { int numReadyTxns = buf.countReadyTxns(); long firstTxToFlush = buf.getFirstReadyTxId(); assert numReadyTxns > 0; // Copy from our double-buffer into a new byte array. This is for // two reasons: // 1) The IPC code has no way of specifying to send only a slice of // a larger array. // 2) because the calls to the underlying nodes are asynchronous, we // need a defensive copy to avoid accidentally mutating the buffer // before it is sent. DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes); buf.flushTo(bufToSend); assert bufToSend.getLength() == numReadyBytes; byte[] data = bufToSend.getData(); assert data.length == bufToSend.getLength(); //把数据写入到journlanode QuorumCallqcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); //TODO 这是一个阻塞的方法等待写入到journalnode集群处理结果 loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits"); // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); } }
Map waitForWriteQuorum(QuorumCall q, int timeoutMs, String operationName) throws IOException { //假设我们是5个journalnode //这个值就是3 int majority = getMajoritySize(); try { q.waitFor( loggers.size(), // either all respond 5 majority, // or we get a majority successes 3 majority, // or we get a majority failures, 3 timeoutMs, operationName);//timeoutMs 20秒 超过20s就退出 } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " + "quorum of nodes to respond."); } catch (TimeoutException e) { throw new IOException("Timed out waiting " + timeoutMs + "ms for a " + "quorum of nodes to respond."); } //2 < 3 if (q.countSuccesses() < majority) { q.rethrowException("Got too many exceptions to achieve quorum size " + getMajorityString()); } //返回结果 return q.getResults(); }
public synchronized void waitFor( int minResponses, int minSuccesses, int maxExceptions, int millis, String operationName) throws InterruptedException, TimeoutException { //获取当前时间: st=17:00:00 //进来当前时间是 st=18:00:00 long st = Time.monotonicNow(); nextLogTime=17:00:06,下一次打印日志的时间 //nextLogTime 18:00:06 long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD); //et = 17:00:20 设置好到了什么时候就属于超时。 //et = 18:00:20 long et = st + millis; /******************************************/ StopWitch stopWitch=new StopWitch(); /***********************************/ while (true) { //多次尝试的意思 //************************* stopWitch.restart(); //************************************************************ checkAssertionErrors(); // 5 >0 journalnode响应的个数(失败了也行,成功了也行,都要给个响应) >= 5 if (minResponses > 0 && countResponses() >= minResponses) return; // 3 > 0 journalnode返回成功的个数 >= 3 if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return; // 3 > 0 journalnode返回异常响应的个数 > 3 if (maxExceptions >= 0 && countExceptions() > maxExceptions) return; //journalnode是用来记录元数据的,如果journalndoe不给响应那么说明journalndoe挂了 //我这个时候不能让用户继续操作了,namenode挂了,自我保护。 // 元数据 -》 namenode -----> journlanode <---- standbynamenode //发生了fullgc 30秒 //获取当前时间:17:00:02 //获取当前时间:17:00:07 //18:00:30 long now = Time.monotonicNow(); //如果 :17:00:02 > 17:00:06 //如果 :17:00:07 > 17:00:06 //18:00:30 > 18:00:06 if (now > nextLogTime) { long waited = now - st; String msg = String.format( "Waited %s ms (timeout=%s ms) for a response for %s", waited, millis, operationName); if (!successes.isEmpty()) { msg += ". Succeeded so far: [" + Joiner.on(",").join(successes.keySet()) + "]"; } if (!exceptions.isEmpty()) { msg += ". Exceptions so far: [" + getExceptionMapString() + "]"; } if (successes.isEmpty() && exceptions.isEmpty()) { msg += ". No responses yet."; } if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) { QuorumJournalManager.LOG.warn(msg); } else { QuorumJournalManager.LOG.info(msg); } //nextLogTime 修改写一次打印日志的时间 17:00:08 //nextLogtime = 18:00:31 nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS; } //18 =17:00:20 - 17:00:02 //13 =17:00:20 - 17:00:07 // -10=18:00:20 - 18:00:30 long rem = et - now; //如果这个地方的值小于0,那么说明就是超时了,超时的话就报一个timeout异常。 //18 <= 0 false //-10 <=0 if (rem <= 0) { long elapse=stopWitch.getElapse(); //如果超过6秒那么就是发生full了,因为这段代码正常情况下是不会超过6秒。 if(elapse > JOURNLANODE_FULLGC_TIME_THRESHOLD) { et = et + elapse; }else { throw new TimeoutException(); } } stopWitch.restart(); //rem=4 min(rem=18,4) // rem=1 min(13,1) rem = Math.min(rem, nextLogTime - now); //rem=4 max(4,1) //rem=1 max(1,1) rem = Math.max(rem, 1); //wait(4) //wait(1) //40秒 wait(rem); // hadoop2.7.0源码 -》 hadoop2.7.4被人修复了 long elapse=stopWitch.getElapse(); if((elapse-rem) > JOURNLANODE_FULLGC_TIME_THRESHOLD) { et = et + elapse - rem; } } }
Namenode如果发生full gc ,namenode的工作线程是不运行的。
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112424915 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
不错!
[***.144.177.141]2024年04月12日 01时27分58秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
【Leetcode刷题篇】leetcode160 相交链表
2019-04-26
【Leetcode刷题篇】leetcode169 多数元素
2019-04-26
【Leetcode刷题篇】leetcode461 汉明距离
2019-04-26
【Leetcode刷题篇】leetcode204 计数质数
2019-04-26
【Leetcode刷题篇】leetcode70 爬楼梯
2019-04-26
【Leetcode刷题篇】leetcode739 每日温度
2019-04-26
【Leetcode刷题篇】leetcode121买卖股票的最佳时机
2019-04-26
【面试篇】Java多线程并发-Java关键字volatile详解
2019-04-26
【面试篇】Java的代理模式-静态代理和动态代理详解
2019-04-26
【面试篇】 Java对象拷贝(对象克隆 对象复制)
2019-04-26
【Leetcode刷题篇】leetcode64 最小路径和
2019-04-26
【Leetcode刷题篇】leetcode79 单词搜索
2019-04-26
【Leetcode刷题篇】leetcode300 最长上升子序列
2019-04-26
【Leetcode刷题篇】leetcode394 字符串解码
2019-04-26
【Leetcode刷题篇】leetcode152 乘积最大数组
2019-04-26
【Leetcode刷题篇】leetcode56 合并区间
2019-04-26
【Leetcode刷题篇】leetcode210 课程表II
2019-04-26
【Leetcode刷题篇】leetcode207 课程表
2019-04-26
【Leetcode刷题篇】leetcode322 零钱兑换
2019-04-26
【Leetcode刷题篇】leetcode437 路径总和III
2019-04-26