(八)二次开发
发布日期:2021-11-18 17:47:42 浏览次数:7 分类:技术文章

本文共 17132 字,大约阅读时间需要 57 分钟。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录


前言

源码内容的总结

1)	namenode启动的流程2)	Datanode的启动流程(初始化,注册,心跳)3)	元数据的管理   双缓冲机制   standBy Namenode从journalnode同步元数据   standBy Namenode合并元数据,替换active namenode的fsImage文件4)上传文件里面有一个契约的机制创建空间申请block写文件(chunk(chunk+chunksum) -> packet -> block -> INodeFile -> INodeDirectory)

双缓冲机制的回顾:

package com.kaikeba.hdfsdemo.lesson02;import java.util.LinkedList;/** * 管理元数据 * 磁盘 * @author Administrator *  * 这段代码我是模仿hadoop的源码的写的,大家一定要掌握 * 后面我们要修改这段代码 * 其实我现在写的这段代码跟hadoop的源码的相似有90%的相似 * 5% * 5% * */public class FSEdit {
public static void main(String[] args) {
FSEdit fs=new FSEdit(); for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override public void run() {
for (int j = 0; j < 100; j++) {
fs.logEdit("日志"); } } }).start(); } } long taxid=0L;// DoubleBuffer doubleBuffer=new DoubleBuffer(); //每个线程自己拥有的副本 ThreadLocal
threadLocal=new ThreadLocal
(); //是否后台正在把数据同步到磁盘上 public boolean isSyncRunning =false; //正在同步磁盘 的内存块里面最大的一个ID号。 long maxtaxid=0L; boolean isWait=false; /** * 写元数据日志的核心方法 * @param log */ private void logEdit(String log) {
// 老师性能能好吗?你都加锁了 //新能是OK的,为什么,这把锁里面是往内存里面写数据 //能支持很高的并发。 //超高并发 //线程1 1 线程2 2 线程3 3 //线程4 5 synchronized (this) {
taxid++; threadLocal.set(taxid); EditLog editLog=new EditLog(taxid,log); //往内存里面写东西 doubleBuffer.write(editLog); } //释放锁 //把数据持久化到磁盘 //代码运行到这儿的时候 currentBuffer 内存里面已经有3条数据了。 //没有加锁,这叫分段加锁 //重新加锁 logFlush(); } private void logFlush() {
//重新加锁 线程2 synchronized(this) {
if(isSyncRunning) {
//false true //获取当前线程的是事务ID //2 long localTaxid=threadLocal.get(); //2 < 3 //5 < 3 if(localTaxid <= maxtaxid) {
return ; } if(isWait) {
return; } isWait=true; while(isSyncRunning) {
try {
//一直等待 //wait这个操作是释放锁 this.wait(1000); } catch (InterruptedException e) {
// TODO Auto-generated catch block e.printStackTrace(); } } isWait=false; // } //代码就走到这儿 //syncBuffer(1,2,3) doubleBuffer.exchange(); //maxtaxid = 3 if(doubleBuffer.syncBuffer.size() > 0) {
maxtaxid=doubleBuffer.getMaxTaxid(); } isSyncRunning=true; }//释放锁 //把数据持久化到磁盘,比较耗费性能的。 // 1 2 3写到磁盘 doubleBuffer.flush(); //分段加锁 synchronized (this) {
//修改标志位 isSyncRunning=false; //唤醒wait; this.notifyAll(); } } /** * 我用面向对象的思想,设计一个对象 * 代表着一条元数据信息 * @author Administrator * */ public class EditLog{
//事务的ID public long taxid; public String log; public EditLog(long taxid, String log) {
this.taxid = taxid; this.log = log; } @Override public String toString() {
return "EditLog [taxid=" + taxid + ", log=" + log + "]"; } } public class DoubleBuffer{
//写数据,有序队列 LinkedList
currentBuffer=new LinkedList
(); //用来把数据持久化到磁盘上面的内存 LinkedList
syncBuffer=new LinkedList
(); /** * 写元数据信息 * @param editLog */ public void write(EditLog editLog){ currentBuffer.add(editLog); } /** * 把数据写到磁盘 */ public void flush() { for(EditLog editLog:syncBuffer) { //把打印出来,我们就认为这就是写到磁盘了 System.out.println(editLog); } syncBuffer.clear(); } /** * 交换一下内存 */ public void exchange() { LinkedList
tmp=currentBuffer; currentBuffer=syncBuffer; syncBuffer=tmp; } /** * 获取到正在同步数据的内存里面事务ID最大的ID * @return */ public long getMaxTaxid() { return syncBuffer.getLast().taxid; } }}
void logEdit(final FSEditLogOp op) {
synchronized (this) {
assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled //一开始不需要等待 waitIfAutoSyncScheduled(); //最重要的就是生成了全局唯一的事务ID(日志) // name editlog_0000000000_0000000012.log // editlog_0000000013_0000000022.log //TODO 步骤一:获取当前的独一无二的事务ID long start = beginTransaction(); op.setTransactionId(txid); try {
/** * 1) namenode editlog 文件缓冲里面 * 2) journalnode的内存缓冲 */ //TODO 步骤二:把元数据写入到内存缓冲 editLogStream.write(op); } catch (IOException ex) {
// All journals failed, it is handled in logSync. } finally {
op.reset(); } endTransaction(start); // check if it is time to schedule an automatic sync // > 512 true !true=false,进行元数据初始化的条件 if (!shouldForceSync()) {
return; } //TODO 如果到这儿就说明 缓冲区存满了 isAutoSyncScheduled = true; } //TODO 把数据持久化到磁盘 logSync(); }
public boolean shouldForceSync() {
// 如果当前记录日志的这个内存的大小 如果 大于 512kb = true return bufCurrent.size() >= initBufferSize; }

存在的问题:

Namenode设计了两块内存,但是每块内存的值定死了是512K。如果bufReady内存正在把数据同步到磁盘,然后bufCurrent已经写满了,那么就会让Namenode处于wait状态,不干活,影响性能。

  • 上面的initBufferSize是不可配置的,写死的,在超高并发的情况下,写内存一下子就满了,但是同步内存还没写完,所以要把initBufferSize变成可配置的,可解决namenode的短暂不工作,读取配置文件core-site.xml,读取配置文件;

第二个问题

在这里插入图片描述

Bug分析

Nmaenode 的高可用机制在有元数据写入的时候,同时也要把数据写入到Journalnode集群。如果一个元数据请求信息一次能写入大多数的Journalnode就写入成功。但是写的过程如果超时了,Namenode捕获到timeout异常就会退出。如果写超时是因为Journalnode服务异常而超时了,那么Namenode为了数据安全退出是对的。但是大家想一下,如果写的时候刚好Namenode发生Full GC了,Full GC有可能是几十秒,甚至是几分钟,这样就导致Namenode timeout异常退出了显然有问题。

void logEdit(final FSEditLogOp op) {
synchronized (this) {
assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled //一开始不需要等待 waitIfAutoSyncScheduled(); //最重要的就是生成了全局唯一的事务ID(日志) // name editlog_0000000000_0000000012.log // editlog_0000000013_0000000022.log //TODO 步骤一:获取当前的独一无二的事务ID long start = beginTransaction(); op.setTransactionId(txid); try {
/** * 1) namenode editlog 文件缓冲里面 * 2) journalnode的内存缓冲 */ //TODO 步骤二:把元数据写入到内存缓冲 editLogStream.write(op); } catch (IOException ex) {
// All journals failed, it is handled in logSync. } finally {
op.reset(); } endTransaction(start); // check if it is time to schedule an automatic sync // > 512 true !true=false if (!shouldForceSync()) {
return; } //TODO 如果到这儿就说明 缓冲区存满了 isAutoSyncScheduled = true; } //TODO 把数据持久化到磁盘 logSync(); }
public void logSync() {
long syncStart = 0; // Fetch the transactionId of this thread. long mytxid = myTransactionId.get().txid; boolean sync = false; try {
EditLogOutputStream logStream = null; synchronized (this) {
try {
printStatistics(false); //TODO 如果有人已经在刷磁盘了,当前线程就不用刷写磁盘了 while (mytxid > synctxid && isSyncRunning) {
try {
//释放锁 wait(1000); } catch (InterruptedException ie) {
} } // // If this transaction was already flushed, then nothing to do // if (mytxid <= synctxid) {
numTransactionsBatchedInSync++; if (metrics != null) {
// Metrics is non-null only when used inside name node metrics.incrTransactionsBatchedInSync(); } return; } // now, this thread will do the sync syncStart = txid; isSyncRunning = true; sync = true; // swap buffers try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush"); } //TODO 交换内存缓冲 editLogStream.setReadyToFlush(); } catch (IOException e) {
final String msg = "Could not sync enough journals to persistent storage " + "due to " + e.getMessage() + ". " + "Unsynced transactions: " + (txid - synctxid); LOG.fatal(msg, new Exception()); synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet); } terminate(1, msg); } } finally {
// Prevent RuntimeException from blocking other log edit write //TODO 恢复标志位 和 唤醒等待的线程 doneWithAutoSyncScheduling(); } //editLogStream may become null, //so store a local variable for flush. logStream = editLogStream; }//TODO 释放锁 // do the sync long start = monotonicNow(); try {
if (logStream != null) {
//把数据写到磁盘 //默默的在刷写磁盘就可以。 //因为这个是比较耗费时间的操作,有可能耗费几十毫秒。 /** * 內存一: 服務于namenode的内存 * 內存二: 服務于journalnode的内存 */ //TODO 把数据写入到磁盘 logStream.flush(); } } catch (IOException ex) {
synchronized (this) {
//打印日志 final String msg = "Could not sync enough journals to persistent storage. " + "Unsynced transactions: " + (txid - synctxid); //如果我们的程序里面发生了fatal 级别日志,这个错误 //就是灾难型的错误。 LOG.fatal(msg, new Exception()); synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet); } //执行这段代码 terminate(1, msg); } } long elapsed = monotonicNow() - start; if (metrics != null) {
// Metrics non-null only when used inside name node metrics.addSync(elapsed); } } finally {
// Prevent RuntimeException from blocking other log edit sync synchronized (this) {
if (sync) {
synctxid = syncStart; //TODO 恢复标志位 isSyncRunning = false; } //TODO 唤醒线程 this.notifyAll(); } } }
protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes(); if (numReadyBytes > 0) {
int numReadyTxns = buf.countReadyTxns(); long firstTxToFlush = buf.getFirstReadyTxId(); assert numReadyTxns > 0; // Copy from our double-buffer into a new byte array. This is for // two reasons: // 1) The IPC code has no way of specifying to send only a slice of // a larger array. // 2) because the calls to the underlying nodes are asynchronous, we // need a defensive copy to avoid accidentally mutating the buffer // before it is sent. DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes); buf.flushTo(bufToSend); assert bufToSend.getLength() == numReadyBytes; byte[] data = bufToSend.getData(); assert data.length == bufToSend.getLength(); //把数据写入到journlanode QuorumCall
qcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); //TODO 这是一个阻塞的方法等待写入到journalnode集群处理结果 loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits"); // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); } }

flushAndSync接口由其实现

protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes(); if (numReadyBytes > 0) {
int numReadyTxns = buf.countReadyTxns(); long firstTxToFlush = buf.getFirstReadyTxId(); assert numReadyTxns > 0; // Copy from our double-buffer into a new byte array. This is for // two reasons: // 1) The IPC code has no way of specifying to send only a slice of // a larger array. // 2) because the calls to the underlying nodes are asynchronous, we // need a defensive copy to avoid accidentally mutating the buffer // before it is sent. DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes); buf.flushTo(bufToSend); assert bufToSend.getLength() == numReadyBytes; byte[] data = bufToSend.getData(); assert data.length == bufToSend.getLength(); //把数据写入到journlanode QuorumCall
qcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); //TODO 这是一个阻塞的方法等待写入到journalnode集群处理结果 loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits"); // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); } }
Map
waitForWriteQuorum(QuorumCall
q, int timeoutMs, String operationName) throws IOException {
//假设我们是5个journalnode //这个值就是3 int majority = getMajoritySize(); try {
q.waitFor( loggers.size(), // either all respond 5 majority, // or we get a majority successes 3 majority, // or we get a majority failures, 3 timeoutMs, operationName);//timeoutMs 20秒 超过20s就退出 } catch (InterruptedException e) {
Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " + "quorum of nodes to respond."); } catch (TimeoutException e) {
throw new IOException("Timed out waiting " + timeoutMs + "ms for a " + "quorum of nodes to respond."); } //2 < 3 if (q.countSuccesses() < majority) {
q.rethrowException("Got too many exceptions to achieve quorum size " + getMajorityString()); } //返回结果 return q.getResults(); }
public synchronized void waitFor(      int minResponses, int minSuccesses, int maxExceptions,      int millis, String operationName)      throws InterruptedException, TimeoutException {
//获取当前时间: st=17:00:00 //进来当前时间是 st=18:00:00 long st = Time.monotonicNow(); nextLogTime=17:00:06,下一次打印日志的时间 //nextLogTime 18:00:06 long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD); //et = 17:00:20 设置好到了什么时候就属于超时。 //et = 18:00:20 long et = st + millis; /******************************************/ StopWitch stopWitch=new StopWitch(); /***********************************/ while (true) {
//多次尝试的意思 //************************* stopWitch.restart(); //************************************************************ checkAssertionErrors(); // 5 >0 journalnode响应的个数(失败了也行,成功了也行,都要给个响应) >= 5 if (minResponses > 0 && countResponses() >= minResponses) return; // 3 > 0 journalnode返回成功的个数 >= 3 if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return; // 3 > 0 journalnode返回异常响应的个数 > 3 if (maxExceptions >= 0 && countExceptions() > maxExceptions) return; //journalnode是用来记录元数据的,如果journalndoe不给响应那么说明journalndoe挂了 //我这个时候不能让用户继续操作了,namenode挂了,自我保护。 // 元数据 -》 namenode -----> journlanode <---- standbynamenode //发生了fullgc 30秒 //获取当前时间:17:00:02 //获取当前时间:17:00:07 //18:00:30 long now = Time.monotonicNow(); //如果 :17:00:02 > 17:00:06 //如果 :17:00:07 > 17:00:06 //18:00:30 > 18:00:06 if (now > nextLogTime) {
long waited = now - st; String msg = String.format( "Waited %s ms (timeout=%s ms) for a response for %s", waited, millis, operationName); if (!successes.isEmpty()) {
msg += ". Succeeded so far: [" + Joiner.on(",").join(successes.keySet()) + "]"; } if (!exceptions.isEmpty()) {
msg += ". Exceptions so far: [" + getExceptionMapString() + "]"; } if (successes.isEmpty() && exceptions.isEmpty()) {
msg += ". No responses yet."; } if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
QuorumJournalManager.LOG.warn(msg); } else {
QuorumJournalManager.LOG.info(msg); } //nextLogTime 修改写一次打印日志的时间 17:00:08 //nextLogtime = 18:00:31 nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS; } //18 =17:00:20 - 17:00:02 //13 =17:00:20 - 17:00:07 // -10=18:00:20 - 18:00:30 long rem = et - now; //如果这个地方的值小于0,那么说明就是超时了,超时的话就报一个timeout异常。 //18 <= 0 false //-10 <=0 if (rem <= 0) {
long elapse=stopWitch.getElapse(); //如果超过6秒那么就是发生full了,因为这段代码正常情况下是不会超过6秒。 if(elapse > JOURNLANODE_FULLGC_TIME_THRESHOLD) {
et = et + elapse; }else {
throw new TimeoutException(); } } stopWitch.restart(); //rem=4 min(rem=18,4) // rem=1 min(13,1) rem = Math.min(rem, nextLogTime - now); //rem=4 max(4,1) //rem=1 max(1,1) rem = Math.max(rem, 1); //wait(4) //wait(1) //40秒 wait(rem); // hadoop2.7.0源码 -》 hadoop2.7.4被人修复了 long elapse=stopWitch.getElapse(); if((elapse-rem) > JOURNLANODE_FULLGC_TIME_THRESHOLD) {
et = et + elapse - rem; } } }
Namenode如果发生full gc ,namenode的工作线程是不运行的。

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112424915 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(九)性能提升
下一篇:(七)HDFS写数据之写入过程

发表评论

最新留言

不错!
[***.144.177.141]2024年04月12日 01时27分58秒