异步任务执行之-队列

时间:2023-01-28 00:17:14
public class WorkingQueue<T extends BaseRequest> {

private static Logger log = Logger.getLogger(WorkingQueue.class);
private BlockingQueue<T> queue;
private final ThreadFactory threadFactory;
private Thread thread;
private AtomicBoolean started = new AtomicBoolean(false);
private volatile boolean shouldContinue = false;

public WorkingQueue() {
this(null);
}

public WorkingQueue(final ThreadFactory tf) {
this.queue = new LinkedBlockingQueue<T>();
this.threadFactory = tf == null ? Executors.defaultThreadFactory() : tf;
this.thread = null;
}

public void start() {
if (started.getAndSet(true)) {
// I prefer if we throw a runtime IllegalStateException here,
// but I want to maintain semantic backward compatibility.
// So it is returning immediately here
return;
}
shouldContinue = true;
thread = threadFactory.newThread(new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
while (shouldContinue) {
try {
T req = queue.take();
req.getLazyExecutor().lazyExecute(req);
} catch (Exception e) {
log.error("Unexpected message caught... Shouldn't be here", e);
}
}
}
});
thread.start();
}

public void stop() {
started.set(false);
shouldContinue = false;
thread.interrupt();
}

public void execute(T request) {
if (!started.get()) {
start();
}
queue.add(request);
}
}
 
<pre name="code" class="java">public class WorkingService<T extends BaseRequest> {private WorkingQueue<T> workingQueue;public WorkingService() {workingQueue = new WorkingQueue<T>();}public void start() {workingQueue.start();}public void execute(final T t, LazyExecutable<T> callback ) {t.setLazyExecutor(callback);workingQueue.execute(t);}public void stop() {if (workingQueue != null) {workingQueue.stop();}}}

public interface LazyExecutable<T extends BaseRequest> {
public void lazyExecute(T request);
}

public class BaseRequest {

@SuppressWarnings("rawtypes")
private LazyExecutable executor;

@SuppressWarnings("rawtypes")
public void setLazyExecutor(LazyExecutable callback) {
this.executor = callback;
}

@SuppressWarnings("rawtypes")
public LazyExecutable getLazyExecutor() {
return executor;
}
}

调用的时候:
 
<pre name="code" class="java">public class Client {

public static void main(String[] args) {

WorkingService<WriteFileRequest> service = new WorkingService<WriteFileRequest>();
service.start();

WriteFileRequest request = new WriteFileRequest();
request.setFilename("file1.txt");

//这里不会阻塞
service.execute(request, new LazyExecutable<WriteFileRequest>(){
@Override
public void lazyExecute(WriteFileRequest request) {
//这里可以用线程池并发执行,也可以单线程的执行
System.out.println("wite file:"+request.getFilename());
}
});
System.out.println("over");
}

public static class WriteFileRequest extends BaseRequest{

private String filename;
private byte[] data;
public String getFilename() {
return filename;
}
public void setFilename(String filename) {
this.filename = filename;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
}

}