(九)性能提升
发布日期:2021-11-18 17:47:43
浏览次数:5
分类:技术文章
本文共 14060 字,大约阅读时间需要 46 分钟。
文章目录
前言
提升点:
- 元数据记录,修改了一个小地方,做性能提升;
- 读写锁高频竞争,有个地方由这样一个问题,把代码做一次调整;
- 在开发过程中,遇到过上面设计模式;
Demo
没错又是这个核心的代码
** * 管理元数据 * 磁盘 * @author Administrator * * 这段代码我是模仿hadoop的源码的写的,大家一定要掌握 * 后面我们要修改这段代码 * 其实我现在写的这段代码跟hadoop的源码的相似有90%的相似 * 5% * 5% * */public class FSEdit { public static void main(String[] args) { FSEdit fs=new FSEdit(); for (int i = 0; i < 1000; i++) { new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 100; j++) { fs.logEdit("日志"); } } }).start(); } } long taxid=0L;// DoubleBuffer doubleBuffer=new DoubleBuffer(); //每个线程自己拥有的副本 ThreadLocalthreadLocal=new ThreadLocal (); //是否后台正在把数据同步到磁盘上 public boolean isSyncRunning =false; //正在同步磁盘 的内存块里面最大的一个ID号。 long maxtaxid=0L; boolean isWait=false; /** * 写元数据日志的核心方法 * @param log */ private void logEdit(String log) { // 老师性能能好吗?你都加锁了 //新能是OK的,为什么,这把锁里面是往内存里面写数据 //能支持很高的并发。 //超高并发 //线程1 1 线程2 2 线程3 3 //线程4 5 synchronized (this) { taxid++;//写在锁里面 保证全局唯一,真正执行的时候是串行的 threadLocal.set(taxid); EditLog editLog=new EditLog(taxid,log); //往内存里面写东西,把里面的内存变成可配置 doubleBuffer.write(editLog); } //释放锁 //把数据持久化到磁盘 //代码运行到这儿的时候 currentBuffer 内存里面已经有3条数据了。 //没有加锁,这叫分段加锁 //重新加锁 logFlush(); } private void logFlush() { //重新加锁 线程2 synchronized(this) { if(isSyncRunning) { //false true //获取当前线程的是事务ID //2 long localTaxid=threadLocal.get(); //2 < 3 //5 < 3 if(localTaxid <= maxtaxid) { return ; } if(isWait) { return; } isWait=true; while(isSyncRunning) { try { //一直等待 //wait这个操作是释放锁 this.wait(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } isWait=false; // } //代码就走到这儿 //syncBuffer(1,2,3) doubleBuffer.exchange(); //maxtaxid = 3 if(doubleBuffer.syncBuffer.size() > 0) { maxtaxid=doubleBuffer.getMaxTaxid(); } isSyncRunning=true; }//释放锁 //把数据持久化到磁盘,比较耗费性能的。 // 1 2 3写到磁盘 doubleBuffer.flush(); //分段加锁 synchronized (this) { //修改标志位 isSyncRunning=false; //唤醒wait; this.notifyAll(); } } /** * 我用面向对象的思想,设计一个对象 * 代表着一条元数据信息 * @author Administrator * */ public class EditLog{ //事务的ID public long taxid; public String log; public EditLog(long taxid, String log) { this.taxid = taxid; this.log = log; } @Override public String toString() { return "EditLog [taxid=" + taxid + ", log=" + log + "]"; } } public class DoubleBuffer{ //写数据,有序队列 LinkedList currentBuffer=new LinkedList (); //用来把数据持久化到磁盘上面的内存 LinkedList syncBuffer=new LinkedList (); /** * 写元数据信息 * @param editLog */ public void write(EditLog editLog){ currentBuffer.add(editLog); } /** * 把数据写到磁盘 */ public void flush() { for(EditLog editLog:syncBuffer) { //把打印出来,我们就认为这就是写到磁盘了 System.out.println(editLog); } syncBuffer.clear(); } /** * 交换一下内存 */ public void exchange() { LinkedList tmp=currentBuffer; currentBuffer=syncBuffer; syncBuffer=tmp; } /** * 获取到正在同步数据的内存里面事务ID最大的ID * @return */ public long getMaxTaxid() { return syncBuffer.getLast().taxid; } }}
为了使得taskid++可以并行化,使用 Atomic
当存在问题:自旋(并发高的情况下)、 Atomic还有其他两个问题:ABA,AtomicreferenceAtomiclong taxid=new Atomiclong (0);//
解决自旋:
LongAdder taxid=new LongAdder()
HDFS锁
hdfs普遍的锁都比较重,减小粒度;
下面主要解决BPOfferServiceclass BPOfferService { //读写锁 private final ReentrantReadWriteLock mReadWriteLock = new ReentrantReadWriteLock(); //读锁 private final Lock mReadLock = mReadWriteLock.readLock(); //写锁 private final Lock mWriteLock = mReadWriteLock.writeLock(); } //读写互斥,写写互斥,读读共享,我认为BPOffservice里面读写锁会进行高频的锁竞争,里面有很多地方用到读锁和写锁
证明:
很多地方用到写锁 (1) BPServiceactor获取到了 NamespaceInfo,设置这个数据 (2) BPServiceActor注册成功了之后设置 Datanode registration (3)准备向 NameNode注册的时候创建 DatanodeRegistration数据 (4)如果 DataNode关闭,此时就会关闭某个 BPServiceActor,此时会清空一些数据c (5)每隔3秒钟跟 Namenode进行一次心跳,如果发现 NameNode的active/ standby状态发生了变化,此时就会更新一些数据 (6)每次发送心跳给 Namenode都有可能会带回来一些指令,比如说通知 Datanode复制某个block副本到其他的DataNode上去 我们先说明一下,这个方法是用来处理每个 datanode接收到的指令的。如果指令多,那么这个方法就会高频调用,但是如果指令少,那么这个方法其实影响不大void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { writeLock(); try { if (this.bpNSInfo == null) { this.bpNSInfo = nsInfo; boolean success = false; // Now that we know the namespace ID, etc, we can pass this to the DN. // The DN can now initialize its local storage if we are the // first BP to handshake, etc. try { dn.initBlockPool(this); success = true; } finally { if (!success) { // The datanode failed to initialize the BP. We need to reset // the namespace info so that other BPService actors still have // a chance to set it, and re-initialize the datanode. this.bpNSInfo = null; } } } else { checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID"); checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID"); checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID"); } } finally { writeUnlock(); } }
void registrationSucceeded(BPServiceActor bpServiceActor, DatanodeRegistration reg) throws IOException { writeLock(); try { if (bpRegistration != null) { checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), reg.getStorageInfo().getNamespaceID(), "namespace ID"); checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), reg.getStorageInfo().getClusterID(), "cluster ID"); } else { bpRegistration = reg; } dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); // Add the initial block token secret keys to the DN's secret manager. if (dn.isBlockTokenEnabled) { dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), reg.getExportedKeys()); } } finally { writeUnlock(); } }
DatanodeRegistration createRegistration() { writeLock(); try { Preconditions.checkState(bpNSInfo != null, "getRegistration() can only be called after initial handshake"); return dn.createBPRegistration(bpNSInfo); } finally { writeUnlock(); } }
void shutdownActor(BPServiceActor actor) { writeLock(); try { if (bpServiceToActive == actor) { bpServiceToActive = null; } bpServices.remove(actor); if (bpServices.isEmpty()) { dn.shutdownBlockPool(this); } } finally { writeUnlock(); } }
void updateActorStatesFromHeartbeat( BPServiceActor actor, NNHAStatusHeartbeat nnHaState) { writeLock(); try { final long txid = nnHaState.getTxId(); final boolean nnClaimsActive = nnHaState.getState() == HAServiceState.ACTIVE; final boolean bposThinksActive = bpServiceToActive == actor; final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; if (nnClaimsActive && !bposThinksActive) { LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + "txid=" + txid); if (!isMoreRecentClaim) { // Split-brain scenario - an NN is trying to claim active // state when a different NN has already claimed it with a higher // txid. LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + txid + " but there was already a more recent claim at txid=" + lastActiveClaimTxId); return; } else { if (bpServiceToActive == null) { LOG.info("Acknowledging ACTIVE Namenode " + actor); } else { LOG.info("Namenode " + actor + " taking over ACTIVE state from " + bpServiceToActive + " at higher txid=" + txid); } bpServiceToActive = actor; } } else if (!nnClaimsActive && bposThinksActive) { LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + "txid=" + nnHaState.getTxId()); bpServiceToActive = null; } if (bpServiceToActive == actor) { assert txid >= lastActiveClaimTxId; lastActiveClaimTxId = txid; } } finally { writeUnlock(); } }
boolean processCommandFromActor(DatanodeCommand cmd, BPServiceActor actor) throws IOException { assert bpServices.contains(actor); if (cmd == null) { return true; } /* * Datanode Registration can be done asynchronously here. No need to hold * the lock. for more info refer HDFS-5014 */ if (DatanodeProtocol.DNA_REGISTER == cmd.getAction()) { // namenode requested a registration - at start or if NN lost contact // Just logging the claiming state is OK here instead of checking the // actor state by obtaining the lock LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr + " with " + actor.state + " state"); actor.reRegister(); return false; } writeLock(); try { if (actor == bpServiceToActive) { //比如需要执行的指令是负责一个block块,则需要几秒 return processCommandFromActive(cmd, actor); } else { return processCommandFromStandby(cmd, actor); } } finally { writeUnlock(); } }
很多地方用到读锁
(1)获取 blockpollid的时候加了读锁。 (2)获取 namesapceinfo的时候 (3)获取 activeNmaenode的信息 最后两个方法其实调用的频率不高,第一个方法调用的频率比较高,sendHeartBeat(每隔3秒发送心跳的时候,都会被调用。String getBlockPoolId() { readLock(); try { if (bpNSInfo != null) { return bpNSInfo.getBlockPoolID(); } else { LOG.warn("Block pool ID needed, but service not yet registered with NN", new Exception("trace")); return null; } } finally { readUnlock(); } }
NamespaceInfo getNamespaceInfo() { readLock(); try { return bpNSInfo; } finally { readUnlock(); } }
DatanodeProtocolClientSideTranslatorPB getActiveNN() { readLock(); try { if (bpServiceToActive != null) { return bpServiceToActive.bpNamenode; } else { return null; } } finally { readUnlock(); } }
读写锁互斥
正常情况其实是没多大问题的。有一个情况特殊。
假如你们公司的集群下线了节点。(会会产生大量的复制任务,这些复制任务会以指令的方式由 namenode给 datanode。还有一种情况下,我们做负载均衡,比如我们现在集群有100台服务器,然后公司有新到了10台服务器。你做的第一件事就是要做负载均衡。如果一旦做负载均衡,集群里面有会产生大量的复制任务。) 这样话我们就说在非正常的情况下,就会产生锁竞争。我们不让他进行读写锁竞争,那么性能不就上来了吗?解决读锁
String getBlockPoolId() { readLock(); try { if (bpNSInfo != null) { return bpNSInfo.getBlockPoolID(); } else { LOG.warn("Block pool ID needed, but service not yet registered with NN", new Exception("trace")); return null; } } finally { readUnlock(); } }
解决思路:
加入新属性:class BPOfferService { 。。。。。 //这个关键字的主要目的是用来保证多线程之间的可见性 volatile String blockPoolID; 。。。。}
BPServiceactor获取到了 NamespaceInfo,设置这个数据
void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { writeLock(); try { if (this.bpNSInfo == null) { this.bpNSInfo = nsInfo; boolean success = false; // Now that we know the namespace ID, etc, we can pass this to the DN. // The DN can now initialize its local storage if we are the // first BP to handshake, etc. try { dn.initBlockPool(this); success = true; } finally { if (!success) { // The datanode failed to initialize the BP. We need to reset // the namespace info so that other BPService actors still have // a chance to set it, and re-initialize the datanode. this.bpNSInfo = null; } } } else { //所以如果代码能走到这儿说明,我们的blockpoolid肯定已经获取到了 checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID"); checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID"); checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID"); //如果代码执行到这儿,说明不仅两个namenode上都获取到了信息,而且两个namenode的信息都读对上了 //************************* if(blockPoolId==null){ this.blockPoolID=bpNSInfo.getBlockPoolID(); } //********************* } } finally { writeUnlock(); } }
String getBlockPoolId() { if(this.getBlockPoolID()!=null){ return this.getBlockPoolID(); } //只要注册成功了 下面的代码就不会运行 readLock(); try { if (bpNSInfo != null) { return bpNSInfo.getBlockPoolID(); } else { LOG.warn("Block pool ID needed, but service not yet registered with NN", new Exception("trace")); return null; } } finally { readUnlock(); } }
每三秒发送一次心跳,尽可能不要进入读锁,就会避免读写锁竞争
HeartbeatResponse sendHeartBeat() throws IOException { StorageReport[] reports =dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); } VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; //TODO 发送心跳 //获取到NameNode的代理,发送心跳 return bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary); }
扩展延伸
1.JUC的知识
2.多线程,并发,锁(wait,sleep, synchronized, notify, notifyAll等关键字 3.JVM知识 4.设计模式 5.磁盘读写:NIO 6.网络: socket, HadoopRPC,HTTP转载地址:https://blog.csdn.net/weixin_37850264/article/details/112427823 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
表示我来过!
[***.240.166.169]2024年04月26日 16时40分34秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
对一致性Hash算法,Java代码实现的深入研究
2019-04-30
To Java程序员:切勿用普通for循环遍历LinkedList
2019-04-30
40个Java多线程问题总结
2019-04-30
oracle的loop等循环语句的几个用法小例子
2019-04-30
jQuery+.net实现浏览更多内容
2019-04-30
一道SQL统计试题
2019-04-30
tomcat使用spring-loaded实现应用热部署
2019-04-30
T-SQL查询笔记1:当使用联接时on和where子句的区别
2019-04-30
用dbms_scheduler创建job
2019-04-30
记一次oracle创建一个新数据库,并导入正式环境数据库备份的dmp包过程
2019-04-30
PL/SQL Developer几个使用小技巧
2019-04-30
使用JAX-WS开发WebService
2019-04-30
Chrome浏览器F12开发者工具的几个小技巧总结
2019-04-30
学习使用JUnit4进行单元测试
2019-04-30
Maven 项目生成或者update jdk变为1.5的问题
2019-04-30
IE8下面parseInt('08')、parseInt('09')会转成0
2019-04-30
Tomcat重启脚本
2019-04-30
在同一台电脑部署多个Tomcat服务
2019-04-30
局域网不能访问本机IIS网站的解决方法
2019-04-30
MySQL 安装步骤
2019-04-30