Heritrix 3.1.0 源码解析(三十七)

时间:2023-02-21 16:35:21

今天有兴趣重新看了一下heritrix3.1.0系统里面的线程池源码,heritrix系统没有采用java的cocurrency包里面的并发框架,而是采用了线程组ThreadGroup类来实现线程池的(线程组类似于树结构,一个线程组包含多个子线程组或多个子线程,数据结构类似于composite模式,不过枝节点与叶子节点没有实现类似composite模式的共同接口)

关键类是org.archive.crawler.framework包里面的ToePool类与ToeThread类,前者继承自ThreadGroup类,后者继承自Thread类

ToeThread显然是工作线程,用于执行采集任务,构造函数初始化成员变量CrawlController controller,用于获取Frontier对象及相关处理器链

    private CrawlController controller; 
private String coreName;
private CrawlURI currentCuri; /**
* Create a ToeThread
*
* @param g ToeThreadGroup
* @param sn serial number
*/
public ToeThread(ToePool g, int sn) {
// TODO: add crawl name?
super(g,"ToeThread #" + sn);
coreName="ToeThread #" + sn + ": ";
controller = g.getController();
serialNumber = sn;
setPriority(DEFAULT_PRIORITY);
int outBufferSize = controller.getRecorderOutBufferBytes();
int inBufferSize = controller.getRecorderInBufferBytes();
httpRecorder = new Recorder(controller.getScratchDir().getFile(),
"tt" + sn + "http", outBufferSize, inBufferSize);
lastFinishTime = System.currentTimeMillis();
} /** (non-Javadoc)
* @see java.lang.Thread#run()
*/
public void run() {
String name = controller.getMetadata().getJobName();
logger.fine(getName()+" started for order '"+name+"'");
Recorder.setHttpRecorder(httpRecorder); try {
while ( true ) {
ArchiveUtils.continueCheck(); setStep(Step.ABOUT_TO_GET_URI, null); CrawlURI curi = controller.getFrontier().next(); synchronized(this) {
ArchiveUtils.continueCheck();
setCurrentCuri(curi);
currentCuri.setThreadNumber(this.serialNumber);
lastStartTime = System.currentTimeMillis();
currentCuri.setRecorder(httpRecorder);
} try {
KeyedProperties.loadOverridesFrom(curi); controller.getFetchChain().process(curi,this); controller.getFrontier().beginDisposition(curi); controller.getDispositionChain().process(curi,this); } catch (RuntimeExceptionWrapper e) {
// Workaround to get cause from BDB
if(e.getCause() == null) {
e.initCause(e.getCause());
}
recoverableProblem(e);
} catch (AssertionError ae) {
// This risks leaving crawl in fatally inconsistent state,
// but is often reasonable for per-Processor assertion problems
recoverableProblem(ae);
} catch (RuntimeException e) {
recoverableProblem(e);
} catch (InterruptedException e) {
if(currentCuri!=null) {
recoverableProblem(e);
Thread.interrupted(); // clear interrupt status
} else {
throw e;
}
} catch (*Error err) {
recoverableProblem(err);
} catch (Error err) {
// OutOfMemory and any others
seriousError(err);
} finally {
httpRecorder.endReplays();
KeyedProperties.clearOverridesFrom(curi);
} setStep(Step.ABOUT_TO_RETURN_URI, null);
ArchiveUtils.continueCheck(); synchronized(this) {
controller.getFrontier().finished(currentCuri);
controller.getFrontier().endDisposition();
setCurrentCuri(null);
}
curi = null; setStep(Step.FINISHING_PROCESS, null);
lastFinishTime = System.currentTimeMillis();
if(shouldRetire) {
break; // from while(true)
}
}
} catch (InterruptedException e) {
if(currentCuri!=null){
logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e);
}
// thread interrupted, ok to end
logger.log(Level.FINE,this.getName()+ " ended with Interruption");
} catch (Exception e) {
// everything else (including interruption)
logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);
} catch (OutOfMemoryError err) {
seriousError(err);
} finally {
controller.getFrontier().endDisposition(); } setCurrentCuri(null);
// Do cleanup so that objects can be GC.
this.httpRecorder.closeRecorders();
this.httpRecorder = null; logger.fine(getName()+" finished for order '"+name+"'");
setStep(Step.FINISHED, null);
controller = null;
}

ToePool是线程组,用于管理上面的工作线程,初始化、查看活动线程、中断或终止工作线程等

protected CrawlController controller;
protected int nextSerialNumber = 1;
protected int targetSize = 0; /**
* Constructor. Creates a pool of ToeThreads.
*
* @param c A reference to the CrawlController for the current crawl.
*/
public ToePool(AlertThreadGroup atg, CrawlController c) {
//传入父线程组
super(atg, "ToeThreads");
this.controller = c;
setDaemon(true);
} public void cleanup() {
// force all Toes waiting on queues, etc to proceed
Thread[] toes = getToes();
for(Thread toe : toes) {
if(toe!=null) {
toe.interrupt();
}
}
// this.controller = null;
} /**
* @return The number of ToeThreads that are not available (Approximation).
*/
public int getActiveToeCount() {
Thread[] toes = getToes();
int count = 0;
for (int i = 0; i < toes.length; i++) {
if((toes[i] instanceof ToeThread) &&
((ToeThread)toes[i]).isActive()) {
count++;
}
}
return count;
} /**
* @return The number of ToeThreads. This may include killed ToeThreads
* that were not replaced.
*/
public int getToeCount() {
Thread[] toes = getToes();
int count = 0;
for (int i = 0; i<toes.length; i++) {
if((toes[i] instanceof ToeThread)) {
count++;
}
}
return count;
}
//获取活动线程数组
private Thread[] getToes() {
Thread[] toes = new Thread[activeCount()+10];
this.enumerate(toes);
return toes;
} /**
* Change the number of ToeThreads.
*
* @param newsize The new number of ToeThreads.
*/
public void setSize(int newsize)
{
targetSize = newsize;
int difference = newsize - getToeCount();
if (difference > 0) {
// must create threads
for(int i = 1; i <= difference; i++) {
//启动线程
startNewThread();
}
} else {
//退出多余线程
// must retire extra threads
int retainedToes = targetSize;
Thread[] toes = this.getToes();
for (int i = 0; i < toes.length ; i++) {
if(!(toes[i] instanceof ToeThread)) {
continue;
}
retainedToes--;
if (retainedToes>=0) {
continue; // this toe is spared
}
// otherwise:
ToeThread tt = (ToeThread)toes[i];
tt.retire();
}
}
} /**
* Kills specified thread. Killed thread can be optionally replaced with a
* new thread.
*
* <p><b>WARNING:</b> This operation should be used with great care. It may
* destabilize the crawler.
*
* @param threadNumber Thread to kill
* @param replace If true then a new thread will be created to take the
* killed threads place. Otherwise the total number of threads
* will decrease by one.
*/
public void killThread(int threadNumber, boolean replace){ Thread[] toes = getToes();
for (int i = 0; i< toes.length; i++) {
if(! (toes[i] instanceof ToeThread)) {
continue;
}
ToeThread toe = (ToeThread) toes[i];
if(toe.getSerialNumber()==threadNumber) {
toe.kill();
}
} if(replace){
// Create a new toe thread to take its place. Replace toe
startNewThread();
}
}
//锁定,防止并发初始化线程
private synchronized void startNewThread() {
ToeThread newThread = new ToeThread(this, nextSerialNumber++);
newThread.setPriority(DEFAULT_TOE_PRIORITY);
newThread.start();
} public void waitForAll() {
while (true) try {
if (isAllAlive(getToes())) {
return;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
} private static boolean isAllAlive(Thread[] threads) {
for (Thread t: threads) {
if ((t != null) && (!t.isAlive())) {
return false;
}
}
return true;
}

最后,线程组的初始化及工作线程的相关管理在CrawlController对象的相关方法执行

/**
* Maximum number of threads processing URIs at the same time.
*/
int maxToeThreads;
public int getMaxToeThreads() {
return maxToeThreads;
}
@Value("25")
public void setMaxToeThreads(int maxToeThreads) {
this.maxToeThreads = maxToeThreads;
if(toePool!=null) {
toePool.setSize(this.maxToeThreads);
}
} private transient ToePool toePool; /**
* Called when the last toethread exits.
*/
protected void completeStop() {
LOGGER.fine("Entered complete stop."); statisticsTracker.getSnapshot(); // ??? this.reserveMemory = null;
if (this.toePool != null) {
this.toePool.cleanup();
}
this.toePool = null; LOGGER.fine("Finished crawl."); try {
appCtx.stop();
} catch (RuntimeException re) {
LOGGER.log(Level.SEVERE,re.getMessage(),re);
} sendCrawlStateChangeEvent(State.FINISHED, this.sExit); // CrawlJob needs to be sure all beans have received FINISHED signal before teardown
this.isStopComplete = true;
appCtx.publishEvent(new StopCompleteEvent(this));
} /**
* Operator requested for crawl to stop.
*/
public synchronized void requestCrawlStop() {
if(state == State.STOPPING) {
// second stop request; nudge the threads with interrupts
getToePool().cleanup();
}
requestCrawlStop(CrawlStatus.ABORTED);
} /**
* @return Active toe thread count.
*/
public int getActiveToeCount() {
if (toePool == null) {
return 0;
}
return toePool.getActiveToeCount();
} protected void setupToePool() {
toePool = new ToePool(alertThreadGroup,this);
// TODO: make # of toes self-optimizing
toePool.setSize(getMaxToeThreads());
toePool.waitForAll();
} /**
* @return The number of ToeThreads
*
* @see ToePool#getToeCount()
*/
public int getToeCount() {
return this.toePool == null? 0: this.toePool.getToeCount();
} /**
* @return The ToePool
*/
public ToePool getToePool() {
return toePool;
} /**
* Kills a thread. For details see
* {@link org.archive.crawler.framework.ToePool#killThread(int, boolean)
* ToePool.killThread(int, boolean)}.
* @param threadNumber Thread to kill.
* @param replace Should thread be replaced.
* @see org.archive.crawler.framework.ToePool#killThread(int, boolean)
*/
public void killThread(int threadNumber, boolean replace){
toePool.killThread(threadNumber, replace);
}

说得够清楚吧

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

本人邮箱:chenying998179@163#com (#改为.)

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/p/3213556.html

Heritrix 3.1.0 源码解析(三十七)的更多相关文章

  1. AFNetworking2&period;0源码解析&lt&semi;三&gt&semi;

    本篇说说安全相关的AFSecurityPolicy模块,AFSecurityPolicy用于验证HTTPS请求的证书,先来看看HTTPS的原理和证书相关的几个问题. HTTPS HTTPS连接建立过程 ...

  2. AFNetworking &lpar;3&period;1&period;0&rpar; 源码解析 &lt&semi;三&gt&semi;

    今天要介绍的是Reachability文件夹下的AFNetworkReachabilityManager类.通过字面意思我们就可以知道AFNetworkReachabilityManager是用来监测 ...

  3. solr&amp&semi;lucene3&period;6&period;0源码解析(三)

    solr索引操作(包括新增 更新 删除 提交 合并等)相关UML图如下 从上面的类图我们可以发现,其中体现了工厂方法模式及责任链模式的运用 UpdateRequestProcessor相当于责任链模式 ...

  4. solr&amp&semi;lucene3&period;6&period;0源码解析(四)

    本文要描述的是solr的查询插件,该查询插件目的用于生成Lucene的查询Query,类似于查询条件表达式,与solr查询插件相关UML类图如下: 如果我们强行将上面的类图纳入某种设计模式语言的话,本 ...

  5. Celery 源码解析三: Task 对象的实现

    Task 的实现在 Celery 中你会发现有两处,一处位于 celery/app/task.py,这是第一个:第二个位于 celery/task/base.py 中,这是第二个.他们之间是有关系的, ...

  6. Android事件总线(二)EventBus3&period;0源码解析

    1.构造函数 当我们要调用EventBus的功能时,比如注册或者发送事件,总会调用EventBus.getDefault()来获取EventBus实例: public static EventBus ...

  7. solr&amp&semi;lucene3&period;6&period;0源码解析(二)

    上文描述了solr3.6.0怎么采用maven管理的方式在eclipse中搭建开发环境,在solr中,为了提高搜索性能,采用了缓存机制,这里描述的是LRU缓存,这里用到了 LinkedHashMap类 ...

  8. solr&amp&semi;lucene3&period;6&period;0源码解析(一)

      本文作为系列的第一篇,主要描述的是solr3.6.0开发环境的搭建   首先我们需要从官方网站下载solr的相关文件,下载地址为http://archive.apache.org/dist/luc ...

  9. apache mina2&period;0源码解析(一)

    apache mina是一个基于java nio的网络通信框架,为TCP UDP ARP等协议提供了一致的编程模型:其源码结构展示了优秀的设计案例,可以为我们的编程事业提供参考. 依照惯例,首先搭建a ...

随机推荐

  1. Visual Studio Code——Angular2 Hello World 之 2&period;0

    最近看到一篇用Visual Studio Code开发Angular2的文章,也是一篇入门教程,地址为:使用Visual Studio Code開發Angular 2專案.这里按部就班的做了一遍,感觉 ...

  2. Java-继承,多态-0922-04

    定义类Human,具有若干属性和功能:定义其子类Man.Woman: 在主类Test中分别创建子类.父类和上转型对象,并测试其特性. 父类: package com.lianxi3; public c ...

  3. cocos2dx游戏开发——别踩白块学习笔记(二)——经典模式的实现

    一.创建GameScene以及GameLayer 就是简单创建一个Scene而已,在此就不多说啦~,可以参照我的打飞机的学习笔记(2). 二.添加一个开始栏 很简单,就是调用Block中的create ...

  4. Spring学习8-Spring事务管理(AOP&sol;声明式式事务管理)

    一.基础知识普及 声明式事务的事务属性: 一:传播行为 二:隔离级别 三:只读提示 四:事务超时间隔 五:异常:指定除去RuntimeException其他回滚异常.  传播行为: 所谓事务的传播行为 ...

  5. 工作中的 Vim 和 git

    ————————Vim———————— 1. gf 可以转到文件中指明路径的文件. 这样可以实现文件的快速切换. ctrl+o: A -> B,  返回A ctrl+6: A.B之间快速切换 2 ...

  6. bind新发现

    function foo(a,b){ this.val = a+b; } var bar = foo.bind(null, 'p1'); var baz = new bar('p2'); consol ...

  7. C&plus;&plus;中const的实现细节介绍&lpar;C&comma;C&num;同理&rpar;

    via:http://www.jb51.net/article/45755.htm 本篇文章主要是对C++中const的实现细节进行了详细的介绍,需要的朋友可以过来参考下,希望对大家有所帮助 1.什么 ...

  8. Go语言文件操作

    打开和关闭文件 os.Open()函数能够打开一个文件,返回一个*File和一个err. file.close()方法能够关闭文件. //打开和关闭文件 func main() { file,err ...

  9. POSTGRESQL中ERROR&colon; recursive query &quot&semi;t&quot&semi; column 2 has type character varying&lpar;150&rpar; in non-recursive term but type character varying overall

    最近在做项目的时候有个需求是需要查到当前登录的用户下辖所有区域的数据,并将查询出来的部门信息以如下格式展示 最高人民法院>江苏省高级人民法院>南通市中级人民法院最高人民法院>江苏省高 ...

  10. Unity AssetBundle

    Unity AssetBundle爬坑手记 - 夜阑卧听风吹雨 时间 2014-09-15 16:55:00  博客园精华区原文  http://www.cnblogs.com/ybgame/p/39 ...