(九)性能提升
发布日期:2021-11-18 17:47:43 浏览次数:5 分类:技术文章

本文共 14060 字,大约阅读时间需要 46 分钟。

文章目录


前言

提升点:

  1. 元数据记录,修改了一个小地方,做性能提升;
  2. 读写锁高频竞争,有个地方由这样一个问题,把代码做一次调整;
  3. 在开发过程中,遇到过上面设计模式;

Demo

没错又是这个核心的代码

** * 管理元数据 * 磁盘 * @author Administrator *  * 这段代码我是模仿hadoop的源码的写的,大家一定要掌握 * 后面我们要修改这段代码 * 其实我现在写的这段代码跟hadoop的源码的相似有90%的相似 * 5% * 5% * */public class FSEdit {
public static void main(String[] args) {
FSEdit fs=new FSEdit(); for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override public void run() {
for (int j = 0; j < 100; j++) {
fs.logEdit("日志"); } } }).start(); } } long taxid=0L;// DoubleBuffer doubleBuffer=new DoubleBuffer(); //每个线程自己拥有的副本 ThreadLocal
threadLocal=new ThreadLocal
(); //是否后台正在把数据同步到磁盘上 public boolean isSyncRunning =false; //正在同步磁盘 的内存块里面最大的一个ID号。 long maxtaxid=0L; boolean isWait=false; /** * 写元数据日志的核心方法 * @param log */ private void logEdit(String log) {
// 老师性能能好吗?你都加锁了 //新能是OK的,为什么,这把锁里面是往内存里面写数据 //能支持很高的并发。 //超高并发 //线程1 1 线程2 2 线程3 3 //线程4 5 synchronized (this) {
taxid++;//写在锁里面 保证全局唯一,真正执行的时候是串行的 threadLocal.set(taxid); EditLog editLog=new EditLog(taxid,log); //往内存里面写东西,把里面的内存变成可配置 doubleBuffer.write(editLog); } //释放锁 //把数据持久化到磁盘 //代码运行到这儿的时候 currentBuffer 内存里面已经有3条数据了。 //没有加锁,这叫分段加锁 //重新加锁 logFlush(); } private void logFlush() {
//重新加锁 线程2 synchronized(this) {
if(isSyncRunning) {
//false true //获取当前线程的是事务ID //2 long localTaxid=threadLocal.get(); //2 < 3 //5 < 3 if(localTaxid <= maxtaxid) {
return ; } if(isWait) {
return; } isWait=true; while(isSyncRunning) {
try {
//一直等待 //wait这个操作是释放锁 this.wait(1000); } catch (InterruptedException e) {
// TODO Auto-generated catch block e.printStackTrace(); } } isWait=false; // } //代码就走到这儿 //syncBuffer(1,2,3) doubleBuffer.exchange(); //maxtaxid = 3 if(doubleBuffer.syncBuffer.size() > 0) {
maxtaxid=doubleBuffer.getMaxTaxid(); } isSyncRunning=true; }//释放锁 //把数据持久化到磁盘,比较耗费性能的。 // 1 2 3写到磁盘 doubleBuffer.flush(); //分段加锁 synchronized (this) {
//修改标志位 isSyncRunning=false; //唤醒wait; this.notifyAll(); } } /** * 我用面向对象的思想,设计一个对象 * 代表着一条元数据信息 * @author Administrator * */ public class EditLog{
//事务的ID public long taxid; public String log; public EditLog(long taxid, String log) {
this.taxid = taxid; this.log = log; } @Override public String toString() {
return "EditLog [taxid=" + taxid + ", log=" + log + "]"; } } public class DoubleBuffer{
//写数据,有序队列 LinkedList
currentBuffer=new LinkedList
(); //用来把数据持久化到磁盘上面的内存 LinkedList
syncBuffer=new LinkedList
(); /** * 写元数据信息 * @param editLog */ public void write(EditLog editLog){ currentBuffer.add(editLog); } /** * 把数据写到磁盘 */ public void flush() { for(EditLog editLog:syncBuffer) { //把打印出来,我们就认为这就是写到磁盘了 System.out.println(editLog); } syncBuffer.clear(); } /** * 交换一下内存 */ public void exchange() { LinkedList
tmp=currentBuffer; currentBuffer=syncBuffer; syncBuffer=tmp; } /** * 获取到正在同步数据的内存里面事务ID最大的ID * @return */ public long getMaxTaxid() { return syncBuffer.getLast().taxid; } }}

为了使得taskid++可以并行化,使用 Atomic

当存在问题:自旋(并发高的情况下)、
Atomic还有其他两个问题:ABA,Atomicreference

Atomiclong taxid=new Atomiclong (0);//

解决自旋:

LongAdder taxid=new LongAdder()

HDFS锁

hdfs普遍的锁都比较重,减小粒度;

下面主要解决BPOfferService

class BPOfferService {
//读写锁 private final ReentrantReadWriteLock mReadWriteLock = new ReentrantReadWriteLock(); //读锁 private final Lock mReadLock = mReadWriteLock.readLock(); //写锁 private final Lock mWriteLock = mReadWriteLock.writeLock(); } //读写互斥,写写互斥,读读共享,我认为BPOffservice里面读写锁会进行高频的锁竞争,里面有很多地方用到读锁和写锁

证明:

很多地方用到写锁
(1) BPServiceactor获取到了 NamespaceInfo,设置这个数据
(2) BPServiceActor注册成功了之后设置 Datanode registration
(3)准备向 NameNode注册的时候创建 DatanodeRegistration数据
(4)如果 DataNode关闭,此时就会关闭某个 BPServiceActor,此时会清空一些数据c
(5)每隔3秒钟跟 Namenode进行一次心跳,如果发现 NameNode的active/ standby状态发生了变化,此时就会更新一些数据
(6)每次发送心跳给 Namenode都有可能会带回来一些指令,比如说通知 Datanode复制某个block副本到其他的DataNode上去
我们先说明一下,这个方法是用来处理每个 datanode接收到的指令的。如果指令多,那么这个方法就会高频调用,但是如果指令少,那么这个方法其实影响不大

void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
writeLock(); try {
if (this.bpNSInfo == null) {
this.bpNSInfo = nsInfo; boolean success = false; // Now that we know the namespace ID, etc, we can pass this to the DN. // The DN can now initialize its local storage if we are the // first BP to handshake, etc. try {
dn.initBlockPool(this); success = true; } finally {
if (!success) {
// The datanode failed to initialize the BP. We need to reset // the namespace info so that other BPService actors still have // a chance to set it, and re-initialize the datanode. this.bpNSInfo = null; } } } else {
checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID"); checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID"); checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID"); } } finally {
writeUnlock(); } }
void registrationSucceeded(BPServiceActor bpServiceActor,      DatanodeRegistration reg) throws IOException {
writeLock(); try {
if (bpRegistration != null) {
checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), reg.getStorageInfo().getNamespaceID(), "namespace ID"); checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), reg.getStorageInfo().getClusterID(), "cluster ID"); } else {
bpRegistration = reg; } dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); // Add the initial block token secret keys to the DN's secret manager. if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), reg.getExportedKeys()); } } finally {
writeUnlock(); } }
DatanodeRegistration createRegistration() {
writeLock(); try {
Preconditions.checkState(bpNSInfo != null, "getRegistration() can only be called after initial handshake"); return dn.createBPRegistration(bpNSInfo); } finally {
writeUnlock(); } }
void shutdownActor(BPServiceActor actor) {
writeLock(); try {
if (bpServiceToActive == actor) {
bpServiceToActive = null; } bpServices.remove(actor); if (bpServices.isEmpty()) {
dn.shutdownBlockPool(this); } } finally {
writeUnlock(); } }
void updateActorStatesFromHeartbeat(      BPServiceActor actor,      NNHAStatusHeartbeat nnHaState) {
writeLock(); try {
final long txid = nnHaState.getTxId(); final boolean nnClaimsActive = nnHaState.getState() == HAServiceState.ACTIVE; final boolean bposThinksActive = bpServiceToActive == actor; final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; if (nnClaimsActive && !bposThinksActive) {
LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + "txid=" + txid); if (!isMoreRecentClaim) {
// Split-brain scenario - an NN is trying to claim active // state when a different NN has already claimed it with a higher // txid. LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + txid + " but there was already a more recent claim at txid=" + lastActiveClaimTxId); return; } else {
if (bpServiceToActive == null) {
LOG.info("Acknowledging ACTIVE Namenode " + actor); } else {
LOG.info("Namenode " + actor + " taking over ACTIVE state from " + bpServiceToActive + " at higher txid=" + txid); } bpServiceToActive = actor; } } else if (!nnClaimsActive && bposThinksActive) {
LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + "txid=" + nnHaState.getTxId()); bpServiceToActive = null; } if (bpServiceToActive == actor) {
assert txid >= lastActiveClaimTxId; lastActiveClaimTxId = txid; } } finally {
writeUnlock(); } }
boolean processCommandFromActor(DatanodeCommand cmd,      BPServiceActor actor) throws IOException {
assert bpServices.contains(actor); if (cmd == null) {
return true; } /* * Datanode Registration can be done asynchronously here. No need to hold * the lock. for more info refer HDFS-5014 */ if (DatanodeProtocol.DNA_REGISTER == cmd.getAction()) {
// namenode requested a registration - at start or if NN lost contact // Just logging the claiming state is OK here instead of checking the // actor state by obtaining the lock LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr + " with " + actor.state + " state"); actor.reRegister(); return false; } writeLock(); try {
if (actor == bpServiceToActive) {
//比如需要执行的指令是负责一个block块,则需要几秒 return processCommandFromActive(cmd, actor); } else {
return processCommandFromStandby(cmd, actor); } } finally {
writeUnlock(); } }

很多地方用到读锁

(1)获取 blockpollid的时候加了读锁。
(2)获取 namesapceinfo的时候
(3)获取 activeNmaenode的信息
最后两个方法其实调用的频率不高,第一个方法调用的频率比较高,sendHeartBeat(每隔3秒发送心跳的时候,都会被调用。

String getBlockPoolId() {
readLock(); try {
if (bpNSInfo != null) {
return bpNSInfo.getBlockPoolID(); } else {
LOG.warn("Block pool ID needed, but service not yet registered with NN", new Exception("trace")); return null; } } finally {
readUnlock(); } }
NamespaceInfo getNamespaceInfo() {
readLock(); try {
return bpNSInfo; } finally {
readUnlock(); } }
DatanodeProtocolClientSideTranslatorPB getActiveNN() {
readLock(); try {
if (bpServiceToActive != null) {
return bpServiceToActive.bpNamenode; } else {
return null; } } finally {
readUnlock(); } }

读写锁互斥

正常情况其实是没多大问题的。有一个情况特殊。

假如你们公司的集群下线了节点。(会会产生大量的复制任务,这些复制任务会以指令的方式由 namenode给 datanode。还有一种情况下,我们做负载均衡,比如我们现在集群有100台服务器,然后公司有新到了10台服务器。你做的第一件事就是要做负载均衡。如果一旦做负载均衡,集群里面有会产生大量的复制任务。)
这样话我们就说在非正常的情况下,就会产生锁竞争。我们不让他进行读写锁竞争,那么性能不就上来了吗?

解决读锁

String getBlockPoolId() {
readLock(); try {
if (bpNSInfo != null) {
return bpNSInfo.getBlockPoolID(); } else {
LOG.warn("Block pool ID needed, but service not yet registered with NN", new Exception("trace")); return null; } } finally {
readUnlock(); } }

解决思路:

加入新属性:

class BPOfferService {
。。。。。 //这个关键字的主要目的是用来保证多线程之间的可见性 volatile String blockPoolID; 。。。。}

BPServiceactor获取到了 NamespaceInfo,设置这个数据

void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
writeLock(); try {
if (this.bpNSInfo == null) {
this.bpNSInfo = nsInfo; boolean success = false; // Now that we know the namespace ID, etc, we can pass this to the DN. // The DN can now initialize its local storage if we are the // first BP to handshake, etc. try {
dn.initBlockPool(this); success = true; } finally {
if (!success) {
// The datanode failed to initialize the BP. We need to reset // the namespace info so that other BPService actors still have // a chance to set it, and re-initialize the datanode. this.bpNSInfo = null; } } } else {
//所以如果代码能走到这儿说明,我们的blockpoolid肯定已经获取到了 checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID"); checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID"); checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID"); //如果代码执行到这儿,说明不仅两个namenode上都获取到了信息,而且两个namenode的信息都读对上了 //************************* if(blockPoolId==null){
this.blockPoolID=bpNSInfo.getBlockPoolID(); } //********************* } } finally {
writeUnlock(); } }
String getBlockPoolId() {
if(this.getBlockPoolID()!=null){
return this.getBlockPoolID(); } //只要注册成功了 下面的代码就不会运行 readLock(); try {
if (bpNSInfo != null) {
return bpNSInfo.getBlockPoolID(); } else {
LOG.warn("Block pool ID needed, but service not yet registered with NN", new Exception("trace")); return null; } } finally {
readUnlock(); } }

每三秒发送一次心跳,尽可能不要进入读锁,就会避免读写锁竞争

HeartbeatResponse sendHeartBeat() throws IOException {
StorageReport[] reports =dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); } VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; //TODO 发送心跳 //获取到NameNode的代理,发送心跳 return bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary); }

扩展延伸

1.JUC的知识

2.多线程,并发,锁(wait,sleep, synchronized, notify, notifyAll等关键字
3.JVM知识
4.设计模式
5.磁盘读写:NIO
6.网络: socket, HadoopRPC,HTTP

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112427823 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(十)设计模式简介
下一篇:(八)二次开发

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年04月26日 16时40分34秒