(三)DataNode的剖析
发布日期:2021-11-18 17:47:38 浏览次数:6 分类:技术文章

本文共 34228 字,大约阅读时间需要 114 分钟。

文章目录


总体流程

DataNode初始化

在这里插入图片描述

DataNode注册

在这里插入图片描述

接下来讲解的是DataNode部分
1)Datanode初始化:之前分析过jps,能看到的服务就认为是RPC的服务端。
在我们平时搭建集群的过程中,我们jps是能看到DataNode服务的。所以datanode应该就是RPC的服务端。
2)DataNode的注册:HDFS是一个主从式的架构。Namenode是一个主节点。Datanode是从节点。所以我们从节点启动的时候就需要跟主节点进行注册。为什么这个知识点重要。是因为我们大数据里面95%的大数据的技术 都是主从式的架构。如果学习好了今天的代码你再去学习Flink Spark 也好,道理是一样的。
3)Datanode发送心跳:从节点要发送心跳,如果不发送心跳,我们主节点就不知道从节点是否存活。

知识点铺垫

HDFS的高可用方案的原理

  • Hadoop1.X: 我们只有一个namenode 一个datanode
  • Namenode: 管理元数据
  • Datanode: 存储数据的,为了保证数据安全,然后每个副本都有3个备份。64M
    Namenode存在问题:
    1)namenode有单点故障的问题
    2)Namenode是一个有状态(管理了元数据)的服务
    所以接下来hadoop团对就会去解决这个问题。

解决这个单点故障的问题:

1)如何保证两个namenode的元数据要时时刻刻的保持一致。
2)自动切换

与NameNode相似,为服务端,所以在main启动

public static void main(String args[]) {
if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
System.exit(0); } //TODO 核心代码 secureMain(args, null); }
public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0; try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG); //TODO 这段代码是重要的代码 //初始化DataNode DataNode datanode = createDataNode(args, null, resources); if (datanode != null) {
//TODO 阻塞起来 datanode.join(); } else {
errorCode = 1; } } catch (Throwable e) {
LOG.fatal("Exception in secureMain", e); terminate(1, e); } finally {
// We need to terminate the process here because either shutdown was called // or some disk related conditions like volumes tolerated or volumes required // condition was not met. Also, In secure mode, control will go to Jsvc // and Datanode process hangs if it does not exit. LOG.warn("Exiting Datanode"); terminate(errorCode); } }
public static DataNode createDataNode(String args[], Configuration conf,      SecureResources resources) throws IOException {
//TODO 实例化DataNode DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) {
//TODO 启动DataNode后台线程 //重要 dn.runDatanodeDaemon(); } return dn; }
public static DataNode instantiateDataNode(String args [], Configuration conf,      SecureResources resources) throws IOException {
if (conf == null) conf = new HdfsConfiguration(); if (args != null) {
// parse generic hadoop options GenericOptionsParser hParser = new GenericOptionsParser(conf, args); args = hParser.getRemainingArgs(); } if (!parseArguments(args, conf)) {
printUsage(System.err); return null; } Collection
dataLocations = getStorageLocations(conf); UserGroupInformation.setConfiguration(conf); SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_KERBEROS_PRINCIPAL_KEY); //TODO 重要的代码 return makeInstance(dataLocations, conf, resources); }

makeInstance方法里面创建了一个DataNode(conf, locations, resources)对象

DataNode(final Configuration conf,           final List
dataDirs, final SecureResources resources) throws IOException {
super(conf); this.blockScanner = new BlockScanner(this, conf); this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); this.usersWithLocalPathAccess = Arrays.asList( conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY)); this.connectToDnViaHostname = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.getHdfsBlockLocationsEnabled = conf.getBoolean( DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); this.isPermissionEnabled = conf.getBoolean( DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); this.pipelineSupportECN = conf.getBoolean( DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT); confVersion = "core-" + conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + ",hdfs-" + conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); // Determine whether we should try to pass file descriptors to clients. if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
String reason = DomainSocket.getLoadingFailureReason(); if (reason != null) {
LOG.warn("File descriptor passing is disabled because " + reason); this.fileDescriptorPassingDisabledReason = reason; } else {
LOG.info("File descriptor passing is enabled."); this.fileDescriptorPassingDisabledReason = null; } } else {
this.fileDescriptorPassingDisabledReason = "File descriptor passing was not configured."; LOG.debug(this.fileDescriptorPassingDisabledReason); } try {
hostName = getHostName(conf); LOG.info("Configured hostname is " + hostName); //TODO 启动datanode startDataNode(conf, dataDirs, resources); } catch (IOException ie) {
shutdown(); throw ie; } final int dncCacheMaxSize = conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY, DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ; //TODO 构建者设计模式 datanodeNetworkCounts = CacheBuilder.newBuilder() .maximumSize(dncCacheMaxSize) .build(new CacheLoader
>() {
@Override public Map
load(String key) throws Exception {
final Map
ret = new HashMap
(); ret.put("networkErrors", 0L); return ret; } }); }

启动DataNode

void startDataNode(Configuration conf,                      List
dataDirs, SecureResources resources ) throws IOException {
// settings global for all BPs in the Data Node this.secureResources = resources; synchronized (this) {
this.dataDirs = dataDirs; } this.conf = conf; this.dnConf = new DNConf(conf); checkSecureConfig(dnConf, conf, resources); this.spanReceiverHost = SpanReceiverHost.getInstance(conf); if (dnConf.maxLockedMemory > 0) {
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) is greater than zero and native code is not available.", DFS_DATANODE_MAX_LOCKED_MEMORY_KEY)); } if (Path.WINDOWS) {
NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory); } else {
long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit(); if (dnConf.maxLockedMemory > ulimit) {
throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) of %d bytes is more than the datanode's available" + " RLIMIT_MEMLOCK ulimit of %d bytes.", DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, dnConf.maxLockedMemory, ulimit)); } } } LOG.info("Starting DataNode with maxLockedMemory = " + dnConf.maxLockedMemory); storage = new DataStorage(); // global DN settings registerMXBean(); //TODO 初始化DataXceiver initDataXceiver(conf); //TODO 启动HttpServer服务 startInfoServer(conf); pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); // BlockPoolTokenSecretManager is required to create ipc server. this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); // Login is done by now. Set the DN user name. dnUserName = UserGroupInformation.getCurrentUser().getShortUserName(); LOG.info("dnUserName = " + dnUserName); LOG.info("supergroup = " + supergroup); //TODO 初始化RPC的服务 initIpcServer(conf); metrics = DataNodeMetrics.create(conf, getDisplayName()); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); //TODO 创建了BlockPoolManager //BlockPool,一个集群就有一个BlockPool //如果我们是联邦机制。就会有多个namenode,也就会有多个联邦,一个联邦就是一个blockpool //假设一个集群里面:4个NameNode: 2两个联邦 //联邦一:hadoop1(Active) hadoop2(StandBy)(blockPool是同一个) //联邦二:hadoop3(Active) hadoop4(StandBy)(blockPool是同一个) blockPoolManager = new BlockPoolManager(this); //TODO 重要 //里面涉及到心跳内容 blockPoolManager.refreshNamenodes(conf); // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); }

DataXceiver是DataNode用来接收客户端和其他DataNode传过来数据的服务。

private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided //TODO 接收tcp请求的 TcpPeerServer tcpPeerServer; if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources); } else {
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, DataNode.getStreamingAddr(conf)); } tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); //重要的代码 //TODO 实例化了一个DataXceiverServer //这个东西就是DataNode用来接收客户端和其他DataNode传过来数据的服务。 xserver = new DataXceiverServer(tcpPeerServer, conf, this); //设置为后台线程 this.dataXceiverServer = new Daemon(threadGroup, xserver); this.threadGroup.setDaemon(true); // auto destroy when empty if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) || conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer = getDomainPeerServer(conf, streamingAddr.getPort()); if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(domainPeerServer, conf, this)); LOG.info("Listening on UNIX domain socket: " + domainPeerServer.getBindPath()); } } this.shortCircuitRegistry = new ShortCircuitRegistry(conf); }

启动HttpServer,与namenode相似

private void startInfoServer(Configuration conf)    throws IOException {
Configuration confForInfoServer = new Configuration(conf); confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); //namenode启动的时候也启动了一个httpserver2 //TODO 用来接收http的请求。 HttpServer2.Builder builder = new HttpServer2.Builder() .setName("datanode") .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))) .addEndpoint(URI.create("http://localhost:0")) .setFindPort(true); this.infoServer = builder.build(); //TODO 往这个httpserver上面绑定了多个servlet this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class); //获取校验文件 this.infoServer.addInternalServlet(null, "/getFileChecksum/*", FileChecksumServlets.GetServlet.class); this.infoServer.setAttribute("datanode", this); this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); this.infoServer.addServlet(null, "/blockScannerReport", BlockScanner.Servlet.class); //TODO 启动了http的服务 this.infoServer.start(); InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0); // SecureDataNodeStarter will bind the privileged port to the channel if // the DN is started by JSVC, pass it along. ServerSocketChannel httpServerChannel = secureResources != null ? secureResources.getHttpServerChannel() : null; this.httpServer = new DatanodeHttpServer(conf, jettyAddr, httpServerChannel); httpServer.start(); if (httpServer.getHttpAddress() != null) {
infoPort = httpServer.getHttpAddress().getPort(); } if (httpServer.getHttpsAddress() != null) {
infoSecurePort = httpServer.getHttpsAddress().getPort(); } }

启动RPC

private void initIpcServer(Configuration conf) throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr( conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY)); // Add all the RPC protocols that the Datanode implements RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, ProtobufRpcEngine.class); ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = new ClientDatanodeProtocolServerSideTranslatorPB(this); //处理客户端和datanode之间的请求 BlockingService service = ClientDatanodeProtocolService .newReflectiveBlockingService(clientDatanodeProtocolXlator); //这个代码就是用来创建一个RPC的服务端 ipcServer = new RPC.Builder(conf) .setProtocol(ClientDatanodeProtocolPB.class) .setInstance(service) .setBindAddress(ipcAddr.getHostName()) .setPort(ipcAddr.getPort()) .setNumHandlers( conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false) .setSecretManager(blockPoolTokenSecretManager).build(); InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = new InterDatanodeProtocolServerSideTranslatorPB(this); //datanode与datanode之间进行通信协议 service = InterDatanodeProtocolService .newReflectiveBlockingService(interDatanodeProtocolXlator); DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service, ipcServer); //另外还有一个协议 TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, ipcServer); LOG.info("Opened IPC server at " + ipcServer.getListenerAddress()); // set service-level authorization security policy if (conf.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); } }

心跳内容相关功能

void refreshNamenodes(Configuration conf)      throws IOException {
LOG.info("Refresh request received for nameservices: " + conf.get (DFSConfigKeys.DFS_NAMESERVICES)); Map
> newAddressMap = DFSUtil .getNNServiceRpcAddressesForCluster(conf); synchronized (refreshNamenodesLock) {
//TODO 重要代码 doRefreshNamenodes(newAddressMap); } }
private void doRefreshNamenodes(      Map
> addrMap) throws IOException {
assert Thread.holdsLock(refreshNamenodesLock); Set
toRefresh = Sets.newLinkedHashSet(); Set
toAdd = Sets.newLinkedHashSet(); Set
toRemove; /* HDFS是一个分布式的文件系统,nameservice命名空间,指的是我们的目录 在高可用架构下,ActiveNameNode和standByNameNode,这两个管理的是同一个nameservice 联邦:里面就有多套HA架构 namenode:(hadoop1,hadoop2) (hadoop3,hadoop4) */ synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether // it's an update of the set of NNs for an existing NS, // or an entirely new nameservice. //TODO 通常情况下:HDFS集群的架构是HA架构 //nameservice(hadoop1 hadoop2) //如果是联邦架构,里面就会有多个 for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId); } else {
//TODO toAdd里面有多少有的联邦,一个联邦就是一个NameService toAdd.add(nameserviceId); } } // Step 2. Any nameservices we currently have but are no longer present // need to be removed. toRemove = Sets.newHashSet(Sets.difference( bpByNameserviceId.keySet(), addrMap.keySet())); assert toRefresh.size() + toAdd.size() == addrMap.size() : "toAdd: " + Joiner.on(",").useForNull("
").join(toAdd) + " toRemove: " + Joiner.on(",").useForNull("
").join(toRemove) + " toRefresh: " + Joiner.on(",").useForNull("
").join(toRefresh); // Step 3. Start new nameservices if (!toAdd.isEmpty()) { LOG.info("Starting BPOfferServices for nameservices: " + Joiner.on(",").useForNull("
").join(toAdd)); //TODO 遍历所有的联邦,一个联邦里面会有两个NameNode(HA) for (String nsToAdd : toAdd) { ArrayList
addrs = Lists.newArrayList(addrMap.get(nsToAdd).values()); //TODO 重要的关系 //一个联邦对应一个BPOfferService //一个联邦里面的一个NameNode就是一个BPServiceActor //也就是正常来说一个BPOfferService对应两个BPServiceActor BPOfferService bpos = createBPOS(addrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } } //TODO DataNode向NameNode进行注册和心跳 startAll(); } // Step 4. Shut down old nameservices. This happens outside // of the synchronized(this) lock since they need to call // back to .remove() from another thread if (!toRemove.isEmpty()) { LOG.info("Stopping BPOfferServices for nameservices: " + Joiner.on(",").useForNull("
").join(toRemove)); for (String nsToRemove : toRemove) { BPOfferService bpos = bpByNameserviceId.get(nsToRemove); bpos.stop(); bpos.join(); // they will call remove on their own } } // Step 5. Update nameservices whose NN list has changed if (!toRefresh.isEmpty()) { LOG.info("Refreshing list of NNs for nameservices: " + Joiner.on(",").useForNull("
").join(toRefresh)); for (String nsToRefresh : toRefresh) { BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); ArrayList
addrs = Lists.newArrayList(addrMap.get(nsToRefresh).values()); bpos.refreshNNList(addrs); } } }
//TODO DataNode向NameNode进行注册和心跳  startAll();
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction() {
@Override public Object run() throws Exception {
//TODO 遍历所有的BPOfferService 遍历所有的联邦 for (BPOfferService bpos : offerServices) {
//TODO 重要 bpos.start(); } return null; } }); } catch (InterruptedException ex) {
IOException ioe = new IOException(); ioe.initCause(ex.getCause()); throw ioe; } }
void start() {
//TODO 一个bpOfferService里面就会有多个Actor for (BPServiceActor actor : bpServices) {
//TODO DataNode进行注册和心跳 actor.start(); } }
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already return; } bpThread = new Thread(this, formatThreadName()); //run bpThread.setDaemon(true); // needed for JUnit testing //TODO 启动线程,所以我们接写来观察run方法 bpThread.start(); }

线程的start其实就是run方法

public void run() {
LOG.info(this + " starting to offer service"); /** * TODO 这儿的这个代码也可以学习 * 目的就是一定要注册上,因为注册这个功能比较重要所以使用了while循环 * 一旦注册上了以后就break */ try {
//想方设法把下面的代码运行成功 while (true) {
// init stuff try {
//TODO 注册核心代码 connectToNNAndHandshake(); break; } catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed runningState = RunningState.INIT_FAILED; if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization LOG.error("Initialization failed for " + this + " " + ioe.getLocalizedMessage()); //TODO 如果有问题sleep 5秒 sleepAndLogInterrupts(5000, "initializing"); } else {
runningState = RunningState.FAILED; LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe); return; } } } //注册结束了 runningState = RunningState.RUNNING; while (shouldRun()) {
try {
//TODO 发送心跳 offerService(); } catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex); sleepAndLogInterrupts(5000, "offering service"); } } runningState = RunningState.EXITED; } catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex); runningState = RunningState.FAILED; } finally {
LOG.warn("Ending block pool service for: " + this); cleanUp(); } }
private void connectToNNAndHandshake() throws IOException {
//TODO 获取到namenode的代理 //datanode-》namenode //datanode调用namenode的方法,往namenode里面存储datanode的信息 bpNamenode = dn.connectToNN(nnAddr); // First phase of the handshake with NN - get the namespace // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. bpos.verifyAndSetNamespaceInfo(nsInfo); // Second phase of the handshake with the NN. //TODO 注册 register(nsInfo); }
void register(NamespaceInfo nsInfo) throws IOException {
// The handshake() phase loaded the block pool storage // off disk - so update the bpRegistration object from that info //TODO 创建注册信息 //可以观察注册信息 bpRegistration = bpos.createRegistration(); LOG.info(this + " beginning handshake with NN"); while (shouldRun()) {
try {
// Use returned registration from namenode with updated fields //TODO 调用服务端的registerDatanode方法 bpRegistration = bpNamenode.registerDatanode(bpRegistration); //如果执行到这儿,说明注册过程已经完成了。 bpRegistration.setNamespaceInfo(nsInfo); break; } catch(EOFException e) {
// namenode might have just restarted LOG.info("Problem connecting to server: " + nnAddr + " :" + e.getLocalizedMessage()); sleepAndLogInterrupts(1000, "connecting to server"); } catch(SocketTimeoutException e) {
// namenode is busy LOG.info("Problem connecting to server: " + nnAddr); sleepAndLogInterrupts(1000, "connecting to server"); } } LOG.info("Block pool " + this + " successfully registered with NN"); bpos.registrationSucceeded(this, bpRegistration); // random short delay - helps scatter the BR from all DNs scheduleBlockReport(dnConf.initialBlockReportDelay); }

实际调用的是namenodeRPCServer的下面方法

public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)      throws IOException {
//是否启动起来 checkNNStartup(); verifySoftwareVersion(nodeReg); //TODO 注册DataNode namesystem.registerDatanode(nodeReg); return nodeReg; }
void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
writeLock(); try {
//TODO DataNodeManager处理关于DataNode的事 getBlockManager().getDatanodeManager().registerDatanode(nodeReg); checkSafeMode(); } finally {
writeUnlock(); } }

registerDateNode里面的关键代码就是

networktopology.add(nodeDescr);        nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());          // register new datanode                //TODO 注册DataNode        addDatanode(nodeDescr);        // also treat the registration message as a heartbeat        // no need to update its timestamp        // because its is done when the descriptor is created                //TODO 把注册上来的DataNode加入到HeartbeatManager里面        //后面进行心跳管理        heartbeatManager.addDatanode(nodeDescr);        incrementVersionCount(nodeReg.getSoftwareVersion());        startDecommissioningIfExcluded(nodeDescr);
/** Add a datanode. */  //Todo 注册DataNode说白了就是往一堆数据结构里添加信息  void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap, // remove from host2DatanodeMap the datanodeDescriptor removed // from datanodeMap before adding node to host2DatanodeMap. synchronized(datanodeMap) {
//TODO datanodeMap里面添加数据 host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node)); } //TODO 往拓扑的数据结构里面加入一条数据 networktopology.add(node); // may throw InvalidTopologyException //TODO 往内存里面加入一条数据 host2DatanodeMap.add(node); //如果以上内存数据结构里面的数据添加好了以后, //注册就完成了 checkIfClusterIsNowMultiRack(node); if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: " + "node " + node + " is added to datanodeMap."); } }

在心跳功能里面记录datanode的信息

synchronized void addDatanode(final DatanodeDescriptor d) {
// update in-service node count //往各种数据结构里面存东西 stats.add(d); //往datanodes list结构里面存进去了datanode的信息 datanodes.add(d); d.isAlive = true; }

注册结束

//注册结束了      runningState = RunningState.RUNNING;      while (shouldRun()) {
try {
//TODO 发送心跳 offerService(); } catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex); sleepAndLogInterrupts(5000, "offering service"); } }
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using" + " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec" + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec" + " Initial delay: " + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + dnConf.heartBeatInterval); // // Now loop for a long time.... //TODO 周期性 while (shouldRun()) {
try {
final long startTime = monotonicNow(); // // Every so often, send heartbeat or block-report // //TODO 心跳是每3秒进行一次 if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
// // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // lastHeartbeat = startTime; if (!dn.areHeartbeatsDisabledForTests()) {
//NameNode是不直接跟DataNode进行连接的。 //DataNode发送心跳给NameNode //NameNode接收到心跳以后,会返回来一些指令 //DataNode接收到这些指令以后,根据这些指令做对应的操作。 //TODO 发送心跳,返回来的是NameNode给的响应指令 HeartbeatResponse resp = sendHeartBeat(); assert resp != null; dn.getMetrics().addHeartbeat(monotonicNow() - startTime); // If the state of this NN has changed (eg STANDBY->ACTIVE) // then let the BPOfferService update itself. // // Important that this happens before processCommand below, // since the first heartbeat to a new active might have commands // that we should actually process. bpos.updateActorStatesFromHeartbeat( this, resp.getNameNodeHaState()); state = resp.getNameNodeHaState().getState(); if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp); } long startProcessCommands = monotonicNow(); //获取到一些namenode发送过来的指令 //TODO 里面使用了【指令设计模式】 if (!processCommand(resp.getCommands())) continue; long endProcessCommands = monotonicNow(); if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands) + "ms to process " + resp.getCommands().length + " commands from NN"); } } } if (sendImmediateIBR || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } List
cmds = blockReport(); processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); DatanodeCommand cmd = cacheReport(); processCommand(new DatanodeCommand[]{
cmd }); // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // long waitTime = dnConf.heartBeatInterval - (monotonicNow() - lastHeartbeat); synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
try {
pendingIncrementalBRperStorage.wait(waitTime); } catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted"); } } } // synchronized } catch(RemoteException re) {
String reClass = re.getClassName(); if (UnregisteredNodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn(this + " is shutting down", re); shouldServiceRun = false; return; } LOG.warn("RemoteException in offerService", re); try {
long sleepTime = Math.min(1000, dnConf.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); } } catch (IOException e) {
LOG.warn("IOException in offerService", e); } processQueueMessages(); } // while (shouldRun()) } // offerService
HeartbeatResponse sendHeartBeat() throws IOException {
StorageReport[] reports = dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); } VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; //TODO 发送心跳 //获取到NameNode的代理,发送心跳 return bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary); }

其实是调用了namenoderpcserver的sendHeartbeat

@Override // DatanodeProtocol  public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,      StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,      int xmitsInProgress, int xceiverCount,      int failedVolumes, VolumeFailureSummary volumeFailureSummary)      throws IOException {
checkNNStartup(); verifyRequest(nodeReg); //TODO 处理DataNode发送过来的心跳 return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary); }
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,      StorageReport[] reports, long cacheCapacity, long cacheUsed,      int xceiverCount, int xmitsInProgress, int failedVolumes,      VolumeFailureSummary volumeFailureSummary) throws IOException {
readLock(); try {
//get datanode commands final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; //TODO NameNode处理DataNode发送过来的心跳 DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); //create ha status final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( haContext.getState().getServiceState(), getFSImage().getLastAppliedOrWrittenTxId()); //TODO 给DataNode返回响应 return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo); } finally {
readUnlock(); } }

handelhearbeat的重要代码

synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null; try {
//TODO 从已有datanodeMap里面获取到注册过来的DataNode信息 //如果能获取到这个datanode的信息说明以前就注册过了 //但是如果是第一次是那么dataNodemap里面是没有信息的 nodeinfo = getDatanode(nodeReg); } catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{
RegisterCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{
RegisterCommand.REGISTER}; } //TODO 更新心跳的重要的信息 heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary); // If we are in safemode, do not send back any recovery / replication // requests. Don't even drain the existing queue of work. if(namesystem.isInSafeMode()) {
return new DatanodeCommand[0]; }

更新心跳的重要信息实质是调用了

1.updateHeartbeat方法里面再调用-》2.node.updateHeartbeat()-》3.updateHeartbeatState()-》然后干活的关键代码是     //TODO 更改存储的信息    setCacheCapacity(cacheCapacity);    setCacheUsed(cacheUsed);    setXceiverCount(xceiverCount);    //TODO 修改上一次的心跳时间。    setLastUpdate(Time.now());    setLastUpdateMonotonic(Time.monotonicNow());    通过心跳来判断datanode节点是否存活?    当前时间-上一次心跳时间>=15min就认为它不存活了

总结

到目前为止:

1)hadoopRPC
2)Namenode的启动流程
3)Datanode(1.初始化 2:注册 3:心跳)

detail process

在这里插入图片描述

接下来我们要分析:namenode是如何管理元数据。我们看源码,还是要进行场景驱动的方式。

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112393281 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(四)元数据管理
下一篇:(二)NameNode剖析

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月08日 09时05分38秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

诸葛io的技术架构图_基于泳道技术生成“无数”个测试环境 2019-04-21
互相引用 spring_# 技术笔记:spring、springBoot源码解析 2019-04-21
华为发布岳云鹏手机_鸿蒙2.0正式发布:明年华为手机全面升级 2019-04-21
ifpc挖filecoin_Filecoin挖矿分析全套 不容错过的干货 2019-04-21
python扫雷 高级算法_Python玩转算法—扫雷 2019-04-21
牛客网python测试考试答案_牛客网SQL题库之考试分数 2019-04-21
git获取所有branch_使用jGit 通过gitUrl 获取Git的所有分支 2019-04-21
mysql like 数字结尾_重拾MySQL之正则表达式 2019-04-21
mysql where从句_《快速念咒——MySQL自学入门指南》:第1章第8节:模糊查询LIKE——一窝兔子(上)... 2019-04-21
mysql 重置密码_mysql忘记密码如何重置密码,以及修改root密码的三种方法 2019-04-21
python-docx tables后追加内容_Mac brew安装MySQL8.0.21后忘记密码(重置密码篇) 2019-04-21
python中两个时间相减结果转为小时_Python起步(二)基础数据类型1 2019-04-21
定义泛化。举个例子_网易考拉应用的dubbo泛化调用,是如何实现的? 2019-04-21
mysql里可以用cube吗_sql server的cube操作符使用详解_mysql 2019-04-21
php mysql 图书_使用PHP+MySQL来对图书管理系统进行构建 2019-04-21
单片机c语言 int1,51单片机into、int1中断计数c语言源程序.doc 2019-04-21
c语言课程设计工资管理建库,C语言课程设计工资管理系统参考.doc 2019-04-21
c语言case中途跳出,break语句在switch结构语句中的作用是终止某个case,并跳出switch结构语句。... 2019-04-21
c51写c语言外部ram头文件,C51中访问外部RAM的方法 2019-04-21
51c语言产生随机证书,基于51单片机的随机数产生器设计-LCD1602-KEY-(电路图+程序源码)... 2019-04-21