企业搜索引擎开发之连接器connector(二十八)
发布日期:2021-09-08 01:45:19 浏览次数:49 分类:技术文章

本文共 11009 字,大约阅读时间需要 36 分钟。

通常一个SnapshotRepository仓库对象对应一个DocumentSnapshotRepositoryMonitor监视器对象,同时也对应一个快照存储器对象,它们的关联是通过监视器管理对象DocumentSnapshotRepositoryMonitorManagerImpl实现的

DocumentSnapshotRepositoryMonitorManagerImpl类要实现那些行为,先查看其实现接口DocumentSnapshotRepositoryMonitorManager定义的方法规范

/** * Management interface to {
@link DocumentSnapshotRepositoryMonitor} threads. * * @since 2.8 */public interface DocumentSnapshotRepositoryMonitorManager { /** * Ensures all monitor threads are running. * * @param checkpoint for the last completed document or null if none have * been completed. * @throws RepositoryException */ void start(String checkpoint) throws RepositoryException; /** * Stops all the configured {
@link DocumentSnapshotRepositoryMonitor} threads. */ void stop(); /** * Removes persisted state for {
@link DocumentSnapshotRepositoryMonitor} * threads. After calling this {
@link DocumentSnapshotRepositoryMonitor} * threads will no longer be able to resume from where they left off last * time. */ void clean(); /** * Returns the number of {
@link DocumentSnapshotRepositoryMonitor} threads * that are alive. This method is for testing purposes. */ int getThreadCount(); /** * Returns the {
@link CheckpointAndChangeQueue} for this * {
@link DocumentSnapshotRepositoryMonitorManager} */ CheckpointAndChangeQueue getCheckpointAndChangeQueue(); /** Returns whether we are after a start() call and before a stop(). */ boolean isRunning(); /** * Receives information specifying what is guaranteed to be delivered to GSA. * Every entry in passed in Map is a monitor name and MonitorCheckpoint. * The monitor of that name can expect that all documents before and including * document related with MonitorCheckpoint will be delivered to GSA. * This information is for the convenience and efficiency of the Monitor so * that it knows how many changes it has to resend. It's valid for a monitor * to ignore these updates if it feels like it for some good reason. * FileConnectorSystemMonitor instances use this information to trim their * file system snapshots. */ void acceptGuarantees(Map
guarantees); /** * Receives {
@link TraversalSchedule} from TraversalManager which is * {
@link TraversalScheduleAware}. */ void setTraversalSchedule(TraversalSchedule traversalSchedule);}

然后再来看DocumentSnapshotRepositoryMonitorManagerImpl类怎么实现上述接口中定义的行为

先来了解相关属性及如何初始化它们的

private volatile TraversalSchedule traversalSchedule;//监控器线程  private final List
threads = Collections.synchronizedList(new ArrayList
()); //监控器映射容器 private final Map
fileSystemMonitorsByName = Collections.synchronizedMap(new HashMap
()); private boolean isRunning = false; // Monitor threads start in off state. private final List
> repositories; private final File snapshotDir; private final ChecksumGenerator checksumGenerator; //CheckpointAndChange对象容器(List) private final CheckpointAndChangeQueue checkpointAndChangeQueue; //Change对象容器(阻塞队列) private final ChangeQueue changeQueue; private final DocumentSnapshotFactory documentSnapshotFactory;/** * Constructs {
@link DocumentSnapshotRepositoryMonitorManagerImpl} * for the {
@link DiffingConnector}. * * @param repositories a {
@code List} of {
@link SnapshotRepository * SnapshotRepositorys} * @param documentSnapshotFactory a {
@link DocumentSnapshotFactory} * @param snapshotDir directory to store {
@link SnapshotRepository} * @param checksumGenerator a {
@link ChecksumGenerator} used to * detect changes in a document's content * @param changeQueue a {
@link ChangeQueue} * @param checkpointAndChangeQueue a * {
@link CheckpointAndChangeQueue} */ public DocumentSnapshotRepositoryMonitorManagerImpl( List
> repositories, DocumentSnapshotFactory documentSnapshotFactory, File snapshotDir, ChecksumGenerator checksumGenerator, ChangeQueue changeQueue, CheckpointAndChangeQueue checkpointAndChangeQueue) { this.repositories = repositories; this.documentSnapshotFactory = documentSnapshotFactory; this.snapshotDir = snapshotDir; this.checksumGenerator = checksumGenerator; this.changeQueue = changeQueue; this.checkpointAndChangeQueue = checkpointAndChangeQueue; }

下面我们再来看它的start方法,在该方法中,主要动作为分别为调用checkpointAndChangeQueue对象的start方法,初始化各个仓库对象相关联的快照存储对象SnapshotStore,最后是启动各个仓库对象的监控器实例

/**   * 启动方法   */  /** Go from "cold" to "warm" including CheckpointAndChangeQueue. */  public void start(String connectorManagerCheckpoint)      throws RepositoryException {    try {        //启动 获取Change(主要动作:从json格式队列文件加载monitorPoints和checkpointAndChangeList队列)      checkpointAndChangeQueue.start(connectorManagerCheckpoint);    } catch (IOException e) {      throw new RepositoryException("Failed starting CheckpointAndChangeQueue.",          e);    }    //MonitorCheckpoint容器    Map
monitorPoints = checkpointAndChangeQueue.getMonitorRestartPoints(); Map
snapshotStores = null; //加载monitorName与SnapshotStore映射容器 try { snapshotStores = recoverSnapshotStores(connectorManagerCheckpoint, monitorPoints); } catch (SnapshotStoreException e) { throw new RepositoryException("Snapshot recovery failed.", e); } catch (IOException e) { throw new RepositoryException("Snapshot recovery failed.", e); } catch (InterruptedException e) { throw new RepositoryException("Snapshot recovery interrupted.", e); } //启动监控线程 startMonitorThreads(snapshotStores, monitorPoints); isRunning = true; }

在初始化每个仓库对象的快照存储对象SnapshotStore时,同时传入相关联的MonitorCheckPoint对象实例,必要时修复快照文件

/* For each start path gets its monitor recovery files in state were monitor   * can be started. */  /**   * 加载monitorName与SnapshotStore映射容器   * @param connectorManagerCheckpoint   * @param monitorPoints   * @return   * @throws IOException   * @throws SnapshotStoreException   * @throws InterruptedException   */  private Map
recoverSnapshotStores( String connectorManagerCheckpoint, Map
monitorPoints) throws IOException, SnapshotStoreException, InterruptedException { Map
snapshotStores = new HashMap
(); for (SnapshotRepository
repository : repositories) { String monitorName = makeMonitorNameFromStartPath(repository.getName()); File dir = new File(snapshotDir, monitorName); boolean startEmpty = (connectorManagerCheckpoint == null) || (!monitorPoints.containsKey(monitorName)); if (startEmpty) { LOG.info("Deleting " + repository.getName() + " global checkpoint=" + connectorManagerCheckpoint + " monitor checkpoint=" + monitorPoints.get(monitorName)); //删除该快照目录 delete(dir); } else { //修复该快照目录 SnapshotStore.stitch(dir, monitorPoints.get(monitorName), documentSnapshotFactory); } SnapshotStore snapshotStore = new SnapshotStore(dir, documentSnapshotFactory); snapshotStores.put(monitorName, snapshotStore); } return snapshotStores; }

下面继续跟踪启动监控器线程的方法

/**   * 启动监控线程(貌似MonitorCheckpoint与SnapshotStore与monitor有映射关系)   * Creates a {
@link DocumentSnapshotRepositoryMonitor} thread for each * startPath. * * @throws RepositoryDocumentException if any of the threads cannot be * started. */ private void startMonitorThreads(Map
snapshotStores, Map
monitorPoints) throws RepositoryDocumentException { for (SnapshotRepository
repository : repositories) { String monitorName = makeMonitorNameFromStartPath(repository.getName()); //monitorName snapshotStores映射 //快照存储器(读写器) SnapshotStore snapshotStore = snapshotStores.get(monitorName); //创建监控线程 Thread monitorThread = newMonitorThread(repository, snapshotStore, monitorPoints.get(monitorName)); threads.add(monitorThread); LOG.info("starting monitor for <" + repository.getName() + ">"); monitorThread.setName(repository.getName()); monitorThread.setDaemon(true); monitorThread.start(); } }

监控器对象的创建在下面的方法

/**   * 创建监控线程   * Creates a {
@link DocumentSnapshotRepositoryMonitor} thread for the provided * folder. * * @throws RepositoryDocumentException if {
@code startPath} is not readable, * or if there is any problem reading or writing snapshots. */ private Thread newMonitorThread( SnapshotRepository
repository, SnapshotStore snapshotStore, MonitorCheckpoint startCp) throws RepositoryDocumentException { //注意monitorName String monitorName = makeMonitorNameFromStartPath(repository.getName()); //document在监控线程里面处理 DocumentSnapshotRepositoryMonitor monitor = new DocumentSnapshotRepositoryMonitor(monitorName, repository, snapshotStore, changeQueue.newCallback(), DOCUMENT_SINK, startCp, documentSnapshotFactory); monitor.setTraversalSchedule(traversalSchedule); LOG.fine("Adding a new monitor for " + monitorName + ": " + monitor); fileSystemMonitorsByName.put(monitorName, monitor); return new Thread(monitor); }

stop方法实现监控器线程的停止

/**   * 停止监控器   */  private void flagAllMonitorsToStop() {    for (SnapshotRepository
repository : repositories) { String monitorName = makeMonitorNameFromStartPath(repository.getName()); DocumentSnapshotRepositoryMonitor monitor = fileSystemMonitorsByName.get(monitorName); if (null != monitor) { monitor.shutdown(); } else { LOG.fine("Unable to stop non existent monitor thread for " + monitorName); } } } /** * 停止监控器线程 */ /* @Override */ public synchronized void stop() { for (Thread thread : threads) { thread.interrupt(); } for (Thread thread : threads) { try { thread.join(MAX_SHUTDOWN_MS); if (thread.isAlive()) { LOG.warning("failed to stop background thread: " + thread.getName()); } } catch (InterruptedException e) { // Mark this thread as interrupted so it can be dealt with later. Thread.currentThread().interrupt(); } } threads.clear(); /* in case thread.interrupt doesn't stop monitors */ flagAllMonitorsToStop(); fileSystemMonitorsByName.clear(); changeQueue.clear(); this.isRunning = false; }

在flagAllMonitorsToStop()方法中调用监控器对象的monitor.shutdown()方法,设置监控器对象 的标识属性

/* The monitor should exit voluntarily if set to false */  private volatile boolean isRunning = true;

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接  

转载地址:https://blog.csdn.net/weixin_34310369/article/details/85634213 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Unity Chan 2D Asset
下一篇:《演讲之禅:一位技术演讲家的自白》读书笔记

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年04月13日 05时06分14秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章