AsyncDispatcher,直接看代码
@Override
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName(dispatcherThreadName);
eventHandlingThread.start();
}
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
} Class<? extends Enum> type = event.getType().getDeclaringClass(); try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
stopped = true;
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
-------------------------------------------------------------------------------------------------------------------------------------
一个handler的实现
public class JobHistoryEventHandler extends AbstractService implements EventHandler<JobHistoryEvent> {
protected BlockingQueue<JobHistoryEvent> eventQueue = new LinkedBlockingQueue<JobHistoryEvent>();
protected boolean handleTimelineEvent = false;
protected AsyncDispatcher atsEventDispatcher = null;
......
@Override
public void handle(JobHistoryEvent event) {
try {
eventQueue.put(event);
// Process it for ATS (if enabled)
if (handleTimelineEvent) {
atsEventDispatcher.getEventHandler().handle(event);
}
}
}
}