(五)元数据的管理与开发
发布日期:2021-11-18 17:47:40 浏览次数:9 分类:技术文章

本文共 32450 字,大约阅读时间需要 108 分钟。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录


前言

void logEdit(final FSEditLogOp op) {
synchronized (this) {
assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled //一开始不需要等待 waitIfAutoSyncScheduled(); //最重要的就是生成了全局唯一的事务ID(日志) // name editlog_0000000000_0000000012.log // editlog_0000000013_0000000022.log //TODO 步骤一:获取当前的独一无二的事务ID long start = beginTransaction(); op.setTransactionId(txid); try {
/** * 1) namenode editlog 文件缓冲里面 * 2) journalnode的内存缓冲 * 看到的只有一个editLogStream,其实背后是由多个流的 *a)写namenode *b)写journalnode / //TODO 步骤二:把元数据写入到内存缓冲 editLogStream.write(op); } catch (IOException ex) { // All journals failed, it is handled in logSync. } finally { op.reset(); } endTransaction(start); // check if it is time to schedule an automatic sync // > 512 true !true=false if (!shouldForceSync()) { return; } //TODO 如果到这儿就说明 缓冲区存满了 isAutoSyncScheduled = true; } //TODO 把数据持久化到磁盘 logSync(); }

editLogStream 初始化

try {
//TODO Namenode初始化的时候调用这段代码 editLogStream = journalSet.startLogSegment(segmentTxId, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); } catch (IOException ex) {
throw new IOException("Unable to start log segment " + segmentTxId + ": too few journals successfully started.", ex); }
@Override  public EditLogOutputStream startLogSegment(final long txId,      final int layoutVersion) throws IOException {
//闭包 mapJournalsAndReportErrors(new JournalClosure() {
@Override public void apply(JournalAndStream jas) throws IOException {
jas.startLogSegment(txId, layoutVersion); } }, "starting log segment " + txId); //返回来的是JournalSetOutputStream //这个流里面肯定需要封装多个流 //是因为刚刚大家都是看到 editLogStream.write(日志对象) //namenode //journalnode //Set集合 return new JournalSetOutputStream(); }
private void mapJournalsAndReportErrors(      JournalClosure closure, String status) throws IOException{
List
badJAS = Lists.newLinkedList(); //journals里面我们知道有两个对象: //FileJouanlManager:针对是把数据写到namenode磁盘上面 //QuorumJouanlManager:把数据写到journalnode上面 // 数据结构 List
journals =new CopyOnWriteArrayList
(); //读写分离的数据结构,读比较多,写比较少的数据结构 for (JournalAndStream jas : journals) {
try {
//FileJouanlManager closure.apply(jas); } catch (Throwable t) {
if (jas.isRequired()) {
final String msg = "Error: " + status + " failed for required journal (" + jas + ")"; LOG.fatal(msg, t); // If we fail on *any* of the required journals, then we must not // continue on any of the other journals. Abort them to ensure that // retry behavior doesn't allow them to keep going in any way. abortAllJournals(); // the current policy is to shutdown the NN on errors to shared edits // dir. There are many code paths to shared edits failures - syncs, // roll of edits etc. All of them go through this common function // where the isRequired() check is made. Applying exit policy here // to catch all code paths. terminate(1, msg); } else {
LOG.error("Error: " + status + " failed for (journal " + jas + ")", t); badJAS.add(jas); } } } disableAndReportErrorOnJournals(badJAS); if (!NameNodeResourcePolicy.areResourcesAvailable(journals, minimumRedundantJournals)) {
String message = status + " failed for too many journals"; LOG.error("Error: " + message); throw new IOException(message); } }
上面的journals是一个读写分离的数据结构,它的add方法有两个方法进行调用;
void add(JournalManager j, boolean required, boolean shared) {
JournalAndStream jas = new JournalAndStream(j, required, shared); //所以这个journals里面现在是有两个对象: //FileJournalManager //quorumJounalManager journals.add(jas); }
private synchronized void initJournals(List
dirs) {
int minimumRedundantJournals = conf.getInt( DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT); synchronized(journalSetLock) {
journalSet = new JournalSet(minimumRedundantJournals); //重要的代码 //dirs这个目录时候从HDFS的配置文件里面解析出来的。 // for (URI u : dirs) {
//配置文件 boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf) .contains(u); //如果发下你是本地文件系统 if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u); if (sd != null) {
//如果是本地文件系统,就会创建一个FileJournalnodeManager对象 //这个对象就是用来管理把元数据写入到namenode的 journalSet.add(new FileJournalManager(conf, sd, storage), required, sharedEditsDirs.contains(u)); } } else {
//如果不是本地文件系统会针对journalnode创建一个对象。 //这个对象就是用来管理把元数据写入到journlanode. journalSet.add(createJournal(u), required, sharedEditsDirs.contains(u)); } } }

每个流都要调用apply方法,其实就是调用上一个函数的startLogSegment

有两个类实现了这个接口

synchronized public EditLogOutputStream startLogSegment(long txid,      int layoutVersion) throws IOException {
try {
currentInProgress = NNStorage.getInProgressEditsFile(sd, txid); //获取当前流 //如果是FileJournalManager这个对象getCurrentStream 获取当前流 //EditLogFileOutputStream write EditLogOutputStream stm = new EditLogFileOutputStream(conf, currentInProgress, outputBufferCapacity); stm.create(layoutVersion); return stm; } catch (IOException e) {
LOG.warn("Unable to start log segment " + txid + " at " + currentInProgress + ": " + e.getLocalizedMessage()); errorReporter.reportErrorOnFile(currentInProgress); throw e; } }
@Override  public EditLogOutputStream startLogSegment(long txId, int layoutVersion)      throws IOException {
Preconditions.checkState(isActiveWriter, "must recover segments before starting a new one"); QuorumCall
q = loggers.startLogSegment(txId, layoutVersion); loggers.waitForWriteQuorum(q, startSegmentTimeoutMs, "startLogSegment(" + txId + ")"); return new QuorumOutputStream(loggers, txId, outputBufferCapacity, writeTxnsTimeoutMs); }

所以logEdit的 步骤二:把元数据写入到内存缓冲;

上面两个类也同时实现了write的接口:

@Override  public void write(FSEditLogOp op) throws IOException {
//双缓冲 doubleBuf.writeOp(op); }
public void writeOp(FSEditLogOp op) throws IOException {
//把数据写入bufCurrent内存里面。 bufCurrent.writeOp(op); }

同理 logEdit的最后步骤 //TODO 把数据持久化到磁盘logSync();在刷写磁盘的时候会调用到flush

public void flush(boolean durable) throws IOException {
numSync++; long start = monotonicNow(); flushAndSync(durable); long end = monotonicNow(); totalTimeSync += (end - start); }

其中 flushAndSync(durable);是一个接口由两个类实现该接口

public void flushAndSync(boolean durable) throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream"); } if (doubleBuf.isFlushed()) {
LOG.info("Nothing to flush"); return; } preallocate(); // preallocate file if necessary doubleBuf.flushTo(fp); if (durable && !shouldSkipFsyncForTests && !shouldSyncWritesAndSkipFsync) {
fc.force(false); // metadata updates not needed } }
protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes(); if (numReadyBytes > 0) {
int numReadyTxns = buf.countReadyTxns(); long firstTxToFlush = buf.getFirstReadyTxId(); assert numReadyTxns > 0; // Copy from our double-buffer into a new byte array. This is for // two reasons: // 1) The IPC code has no way of specifying to send only a slice of // a larger array. // 2) because the calls to the underlying nodes are asynchronous, we // need a defensive copy to avoid accidentally mutating the buffer // before it is sent. DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes); buf.flushTo(bufToSend); assert bufToSend.getLength() == numReadyBytes; byte[] data = bufToSend.getData(); assert data.length == bufToSend.getLength(); //把数据写入到journlanode QuorumCall
qcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); //等待写入到journalnode集群处理结果 loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits"); // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); } }

loggers在之前的方法已经进行了初始化

public QuorumCall
sendEdits( long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
Map
> calls = Maps.newHashMap(); //遍历所有loggers //每个AsyncLogger对象代表就是一个journalnode for (AsyncLogger logger : loggers) {
ListenableFuture
future = //往journalnode去发送日志。 logger.sendEdits(segmentTxId, firstTxnId, numTxns, data); calls.put(logger, future); } return QuorumCall.create(calls); }

sendedit()的接口被以下实现:

public ListenableFuture
sendEdits( final long segmentTxId, final long firstTxnId, final int numTxns, final byte[] data) {
try {
reserveQueueSpace(data.length); } catch (LoggerTooFarBehindException e) {
return Futures.immediateFailedFuture(e); } // When this batch is acked, we use its submission time in order // to calculate how far we are lagging. final long submitNanos = System.nanoTime(); ListenableFuture
ret = null; try {
//异步发送 ret = singleThreadExecutor.submit(new Callable
() {
@Override public Void call() throws IOException {
throwIfOutOfSync(); long rpcSendTimeNanos = System.nanoTime(); try {
//获取一个代理 //这个是RPC的调用 //journalnode 一定起服务什么? // JournalRpcServer(RPC的客户端,获取服务端的代理) getProxy().journal(createReqInfo(), segmentTxId, firstTxnId, numTxns, data); } catch (IOException e) {
QuorumJournalManager.LOG.warn( "Remote journal " + IPCLoggerChannel.this + " failed to " + "write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) + ". Will try to write to this JN again after the next " + "log roll.", e); synchronized (IPCLoggerChannel.this) {
outOfSync = true; } throw e; } finally {
long now = System.nanoTime(); long rpcTime = TimeUnit.MICROSECONDS.convert( now - rpcSendTimeNanos, TimeUnit.NANOSECONDS); long endToEndTime = TimeUnit.MICROSECONDS.convert( now - submitNanos, TimeUnit.NANOSECONDS); metrics.addWriteEndToEndLatency(endToEndTime); metrics.addWriteRpcLatency(rpcTime); if (rpcTime / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
QuorumJournalManager.LOG.warn( "Took " + (rpcTime / 1000) + "ms to send a batch of " + numTxns + " edits (" + data.length + " bytes) to " + "remote journal " + IPCLoggerChannel.this); } } synchronized (IPCLoggerChannel.this) {
highestAckedTxId = firstTxnId + numTxns - 1; lastAckNanos = submitNanos; } return null; } }); } finally {
if (ret == null) {
// it didn't successfully get submitted, // so adjust the queue size back down. unreserveQueueSpace(data.length); } else {
// It was submitted to the queue, so adjust the length // once the call completes, regardless of whether it // succeeds or fails. Futures.addCallback(ret, new FutureCallback
() {
@Override public void onFailure(Throwable t) {
unreserveQueueSpace(data.length); } @Override public void onSuccess(Void t) {
unreserveQueueSpace(data.length); } }); } } return ret; }

通过远程调用journal方法

public void journal(RequestInfo reqInfo,      long segmentTxId, long firstTxnId,      int numTxns, byte[] records) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId()) .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); }

journaljournal(reqInfo, segmentTxId, firstTxnId, numTxns, records)方法里面存在讲消息刷写到磁盘

curSegment.writeRaw(records, 0, records.length);    curSegment.setReadyToFlush();    StopWatch sw = new StopWatch();    sw.start();    curSegment.flush(shouldFsync);    sw.stop();

StandbyCheckpointer

* TODO StandbyCheckpointer 是一个运行在standBynamenode上的一个线程。 * 他会周期性的对命名空间做checkpoint的操作(说白了就是把 内存里面目录树的信息持久化到磁盘上面) * 并且会把这个份数据上传到active namenode(用来替换 active namednoe上面的fsimage) *  * hadoop1: *   secondaryNamenode: *       如果要合并元数据,它是需要去namenode上面去读取数据的。然后 *       在自己的合并里面合并,合并完了以后再去替换 namenode里面的磁盘上面fsimage *   standByNamenode: *       他要做chekpoint的时候是不需要去读取active namenode里面的元数据。 *       元数据在他自己的内存里面,他本来就有。它要是想做checkpoint它只需要把 *       自己内存里面的元数据写到磁盘上面就可以,然后把磁盘上面的fsimage *       上传到active namenode里面去替换 avtive namenode的fsimage文件就可以。
private void doWork() {
final long checkPeriod = 1000 * checkpointConf.getCheckPeriod(); // Reset checkpoint time so that we don't always checkpoint // on startup. lastCheckpointTime = monotonicNow(); while (shouldRun) {
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage(); if (!needRollbackCheckpoint) {
try {
//TODO 每隔60检查以下是否需要做checkpoint Thread.sleep(checkPeriod); } catch (InterruptedException ie) {
} if (!shouldRun) {
break; } } try {
// We may have lost our ticket since last checkpoint, log in again, just in case if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); } final long now = monotonicNow(); //TODO checkpoint条件一 //这儿是计算以下,我们上一次checkpoint 现在最新的数据差了多少数据? //或者说大概的意思就是说我们现在有多少条日志没有checkpoint了。 final long uncheckpointed = countUncheckpointedTxns(); //TODO checkpoint条件二 //当前时间 - 上一次checkpoint的时间。 //说白了这个变量代表的意思就是 已经有多久没有做checkpoint了。 final long secsSinceLast = (now - lastCheckpointTime) / 1000; boolean needCheckpoint = needRollbackCheckpoint; if (needCheckpoint) {
LOG.info("Triggering a rollback fsimage for rolling upgrade."); //TODO 条件一:如果超过100万条日志没有做checkpoint,那么就需要做一次。 } else if (uncheckpointed >= checkpointConf.getTxnCount()) {
LOG.info("Triggering checkpoint because there have been " + uncheckpointed + " txns since the last checkpoint, which " + "exceeds the configured threshold " + checkpointConf.getTxnCount()); needCheckpoint = true; //TODO 条件二: 如果超过一个小时没有做checkpoint了,那么需要做一次。 } else if (secsSinceLast >= checkpointConf.getPeriod()) {
LOG.info("Triggering checkpoint because it has been " + secsSinceLast + " seconds since the last checkpoint, which " + "exceeds the configured interval " + checkpointConf.getPeriod()); needCheckpoint = true; } synchronized (cancelLock) {
if (now < preventCheckpointsUntil) {
LOG.info("But skipping this checkpoint since we are about to failover!"); canceledCount++; continue; } assert canceler == null; canceler = new Canceler(); } //TODO 满足条件 if (needCheckpoint) {
//TODO 执行checkpoint doCheckpoint(); // reset needRollbackCheckpoint to false only when we finish a ckpt // for rollback image if (needRollbackCheckpoint && namesystem.getFSImage().hasRollbackFSImage()) {
namesystem.setCreatedRollbackImages(true); namesystem.setNeedRollbackFsImage(false); } lastCheckpointTime = now; } } catch (SaveNamespaceCancelledException ce) {
LOG.info("Checkpoint was cancelled: " + ce.getMessage()); canceledCount++; } catch (InterruptedException ie) {
LOG.info("Interrupted during checkpointing", ie); // Probably requested shutdown. continue; } catch (Throwable t) {
LOG.error("Exception in doCheckpoint", t); } finally {
synchronized (cancelLock) {
canceler = null; } } } }

EditLogTailer

* TODO EditLogTailer是一个后台线程,启动了以后会周期性的去journalnode集群上面去 * 读取元数据日志,然后再把这些元数据日志应用到自己的元数据里面(内存+磁盘) */
private void doWork() {
while (shouldRun) {
try {
if (tooLongSinceLastLoad() && lastRollTriggerTxId < lastLoadedTxnId) {
//回滚 triggerActiveLogRoll(); } if (!shouldRun) {
break; } namesystem.cpLockInterruptibly(); try {
//TODO 重要的代码 doTailEdits(); } finally {
namesystem.cpUnlock(); } } catch (EditLogInputException elie) {
LOG.warn("Error while reading edits from disk. Will try again.", elie); } catch (InterruptedException ie) {
// interrupter should have already set shouldRun to false continue; } catch (Throwable t) {
LOG.fatal("Unknown error encountered while tailing edits. " + "Shutting down standby NN.", t); terminate(1, t); } try {
//TODO 每隔60秒 StandByNameNode 去Journalnode获取一下日志 Thread.sleep(sleepTimeMs); } catch (InterruptedException e) {
LOG.warn("Edit log tailer interrupted", e); } } } }
void doTailEdits() throws IOException, InterruptedException {
// Write lock needs to be interruptible here because the // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if we're not interruptible, it will // deadlock. namesystem.writeLockInterruptibly(); try {
//TODO 加载当前自己的元数据日志 FSImage image = namesystem.getFSImage(); //TODO 获取当前日志的最后一条日志的事务ID是多少 //1000 long lastTxnId = image.getLastAppliedTxId(); if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId); } Collection
streams; try {
//这个地方是重要的代码 //需要去journlanode上面去读取元数据 //我现在的事务id 1000,所以我去journlanode上面去读取 //日志的时候,只需要去读取 1001后面的日志就可以。 //TODO 设置获取Journalnode获取日志的流 streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false); } catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress // edits file hasn't been started yet. LOG.warn("Edits tailer failed to find any streams. Will try again " + "later.", ioe); return; } if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size()); } // Once we have streams to load, errors encountered are legitimate cause // for concern, so we don't catch them here. Simple errors reading from // disk are ignored. long editsLoaded = 0; try {
//TODO 去Journalnode加载日志 editsLoaded = image.loadEdits(streams, namesystem); } catch (EditLogInputException elie) {
editsLoaded = elie.getNumEditsLoaded(); throw elie; } finally {
if (editsLoaded > 0 || LOG.isDebugEnabled()) {
LOG.info(String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId)); } } if (editsLoaded > 0) {
lastLoadTimeMs = monotonicNow(); } lastLoadedTxnId = image.getLastAppliedTxId(); } finally {
namesystem.writeUnlock(); } }

TODO 去Journalnode加载日志

private long loadEdits(Iterable
editStreams, FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.LOADING_EDITS); long prevLastAppliedTxId = lastAppliedTxId; try {
//TODO 构建了FSEDitLogLoader FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId); // Load latest edits for (EditLogInputStream editIn : editStreams) {
LOG.info("Reading " + editIn + " expecting start txid #" + (lastAppliedTxId + 1)); try {
//TODO 加载日志 loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery); } finally {
// Update lastAppliedTxId even in case of error, since some ops may // have been successfully applied before the error. //TODO 记录最后的一个事务ID //1000 -> 2000 lastAppliedTxId = loader.getLastAppliedTxId(); } // If we are in recovery mode, we may have skipped over some txids. if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
lastAppliedTxId = editIn.getLastTxId(); } } } finally {
FSEditLog.closeAllStreams(editStreams); // update the counts updateCountForQuota(target.getBlockManager().getStoragePolicySuite(), target.dir.rootDir); } prog.endPhase(Phase.LOADING_EDITS); return lastAppliedTxId - prevLastAppliedTxId; }
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,      StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try {
long startTime = monotonicNow(); FSImage.LOG.info("Start loading edits file " + edits.getName()); //TODO 重要代码 long numEdits = loadEditRecords(edits, false, expectedStartingTxId, startOpt, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); return numEdits; } finally {
edits.close(); fsNamesys.writeUnlock(); prog.endStep(Phase.LOADING_EDITS, step); } }
long loadEditRecords(EditLogInputStream in, boolean closeOnExit,      long expectedStartingTxId, StartupOption startOpt,      MetaRecoveryContext recovery) throws IOException {
FSDirectory fsDir = fsNamesys.dir; EnumMap
> opCounts = new EnumMap
>(FSEditLogOpCodes.class); if (LOG.isTraceEnabled()) {
LOG.trace("Acquiring write lock to replay edit log"); } fsNamesys.writeLock(); fsDir.writeLock(); long recentOpcodeOffsets[] = new long[4]; Arrays.fill(recentOpcodeOffsets, -1); long expectedTxId = expectedStartingTxId; long numEdits = 0; long lastTxId = in.getLastTxId(); long numTxns = (lastTxId - expectedStartingTxId) + 1; StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(in); prog.setTotal(Phase.LOADING_EDITS, step, numTxns); Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); long lastLogTime = monotonicNow(); long lastInodeId = fsNamesys.dir.getLastInodeId(); try {
while (true) {
try {
FSEditLogOp op; try {
/** * * TODO 通过一个输入流去读取数据 * op有可能是上传文件的日志 * 也有可能是创建目录的日志 * 因为我们这次分析的是创建目录 * */ op = in.readOp(); if (op == null) {
break; } } catch (Throwable e) {
// Handle a problem with our input check203UpgradeFailure(in.getVersion(true), e); String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId); FSImage.LOG.error(errorMessage, e); if (recovery == null) {
// We will only try to skip over problematic opcodes when in // recovery mode. throw new EditLogInputException(errorMessage, e, numEdits); } MetaRecoveryContext.editLogLoaderPrompt( "We failed to read txId " + expectedTxId, recovery, "skipping the bad section in the log"); in.resync(); continue; } recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = in.getPosition(); if (op.hasTransactionId()) {
if (op.getTransactionId() > expectedTxId) {
MetaRecoveryContext.editLogLoaderPrompt("There appears " + "to be a gap in the edit log. We expected txid " + expectedTxId + ", but got txid " + op.getTransactionId() + ".", recovery, "ignoring missing " + " transaction IDs"); } else if (op.getTransactionId() < expectedTxId) {
MetaRecoveryContext.editLogLoaderPrompt("There appears " + "to be an out-of-order edit in the edit log. We " + "expected txid " + expectedTxId + ", but got txid " + op.getTransactionId() + ".", recovery, "skipping the out-of-order edit"); continue; } } try {
if (LOG.isTraceEnabled()) {
LOG.trace("op=" + op + ", startOpt=" + startOpt + ", numEdits=" + numEdits + ", totalEdits=" + totalEdits); } //TODO 把获取到的元数据作用到自己的元数据里 long inodeId = applyEditLogOp(op, fsDir, startOpt, in.getVersion(true), lastInodeId); if (lastInodeId < inodeId) {
lastInodeId = inodeId; } } catch (RollingUpgradeOp.RollbackException e) {
throw e; } catch (Throwable e) {
LOG.error("Encountered exception on operation " + op, e); if (recovery == null) {
throw e instanceof IOException? (IOException)e: new IOException(e); } MetaRecoveryContext.editLogLoaderPrompt("Failed to " + "apply edit log operation " + op + ": error " + e.getMessage(), recovery, "applying edits"); } // Now that the operation has been successfully decoded and // applied, update our bookkeeping. incrOpCount(op.opCode, opCounts, step, counter); if (op.hasTransactionId()) {
lastAppliedTxId = op.getTransactionId(); expectedTxId = lastAppliedTxId + 1; } else {
expectedTxId = lastAppliedTxId = expectedStartingTxId; } // log progress if (op.hasTransactionId()) {
long now = monotonicNow(); if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1; int percent = Math.round((float) deltaTxId / numTxns * 100); LOG.info("replaying edit log: " + deltaTxId + "/" + numTxns + " transactions completed. (" + percent + "%)"); lastLogTime = now; } } numEdits++; totalEdits++; } catch (RollingUpgradeOp.RollbackException e) {
LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback."); break; } catch (MetaRecoveryContext.RequestStopException e) {
MetaRecoveryContext.LOG.warn("Stopped reading edit log at " + in.getPosition() + "/" + in.length()); break; } } } finally {
fsNamesys.dir.resetLastInodeId(lastInodeId); if(closeOnExit) {
in.close(); } fsDir.writeUnlock(); fsNamesys.writeUnlock(); if (LOG.isTraceEnabled()) {
LOG.trace("replaying edit log finished"); } if (FSImage.LOG.isDebugEnabled()) {
dumpOpCounts(opCounts); } } return numEdits; }

applyEditLogOp里面有这个关键代码,更新自己内存的日志,与前面相似:

。。。。。。.。。.。.。。.。.。..。。 //TODO 创建目录的日志    case OP_MKDIR: {
//根据匹配规则我们这次的日志 //应该是一个创建目录的日志。 MkdirOp mkdirOp = (MkdirOp)op; inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion, lastInodeId); //TODO 把数据作用于自己的元数据里面。 FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId, renameReservedPathsOnUpgrade(mkdirOp.path, logVersion), mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp); break; }

到这里反思以下,它是如何读到日志文件的呢

在上面loadEditRecords()方法里面存在:

try {
/** * * TODO 通过一个输入流去读取数据 * op有可能是上传文件的日志 * 也有可能是创建目录的日志 * 因为我们这次分析的是创建目录 * */ op = in.readOp();
public FSEditLogOp readOp() throws IOException {
FSEditLogOp ret; if (cachedOp != null) {
ret = cachedOp; cachedOp = null; return ret; } //TODO 重要 return nextOp(); }

本地调用的是这个方法:

@Override  protected FSEditLogOp nextOp() throws IOException {
//TODO 重要 return nextOpImpl(false); }
private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
FSEditLogOp op = null; switch (state) {
case UNINIT: try {
init(true); } catch (Throwable e) {
LOG.error("caught exception initializing " + this, e); if (skipBrokenEdits) {
return null; } Throwables.propagateIfPossible(e, IOException.class); } Preconditions.checkState(state != State.UNINIT); return nextOpImpl(skipBrokenEdits); case OPEN: //TODO 通过reader读取日志 op = reader.readOp(skipBrokenEdits); if ((op != null) && (op.hasTransactionId())) {
long txId = op.getTransactionId(); if ((txId >= lastTxId) && (lastTxId != HdfsConstants.INVALID_TXID)) {
// // Sometimes, the NameNode crashes while it's writing to the // edit log. In that case, you can end up with an unfinalized edit log // which has some garbage at the end. // JournalManager#recoverUnfinalizedSegments will finalize these // unfinished edit logs, giving them a defined final transaction // ID. Then they will be renamed, so that any subsequent // readers will have this information. // // Since there may be garbage at the end of these "cleaned up" // logs, we want to be sure to skip it here if we've read everything // we were supposed to read out of the stream. // So we force an EOF on all subsequent reads. // long skipAmt = log.length() - tracker.getPos(); if (skipAmt > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("skipping " + skipAmt + " bytes at the end " + "of edit log '" + getName() + "': reached txid " + txId + " out of " + lastTxId); } tracker.clearLimit(); IOUtils.skipFully(tracker, skipAmt); } } } break; case CLOSED: break; // return null } return op; }

其实reader封装的就是一个http,通过http去进行数据的读取,其实就是一个RPC,直接远程调用getjournal()

总体流程

在这里插入图片描述

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112408255 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(六)HDFS写流程之初始化工作
下一篇:(四)元数据管理

发表评论

最新留言

不错!
[***.144.177.141]2024年03月25日 17时34分10秒