Java多线程Master-Worker模式

时间:2023-03-10 00:45:06
Java多线程Master-Worker模式

Java多线程Master-Worker模式,多适用于需要大量重复工作的场景中。

例如:使用Master-Worker计算0到100所有数字的立方的和

  1.Master接收到100个任务,每个任务需要0到100中每个数字的立方,这里为了效果,每个任务再sleep一秒,

   Master需要将这些任务放到一个支持高并发的非阻塞队列queue中如:ConcurrentLinkedQueue<E>。

  2.Master创建10个worker去执行这100个任务,并准备一个支持高并发且线程安全的hashMap作为结果集的容器如:ConcurrentHashMap。

  3.每个worker需要循环的从queue中获取任务然后执行,执行完毕后把结果放到hashMap中,直到queue为空,所有任务执行完毕后退出。

  4.Master循环判断结果集hashMap中是否有已经执行完毕的结果,如果有就使用,使用完毕就立即移除该结果,直到所有的线程都退出。

  5.所有任务执行完毕,Master也处理完所有任务的结果,程序结束

Master不需要等待所有的任务执行完毕就可以处理已完成的任务结果,Master和worker可以同时进行工作,这样节省了大量等待worker执行结束的时间

Master源码

package masterWorker;

import java.lang.Thread.State;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import org.junit.Test; public class Master {
public static void main(String[] args) {
long befor = System.currentTimeMillis(); //任务队列
Queue<Integer> taskQueue = new ConcurrentLinkedQueue<>();
//工人map
Map<String,Thread> workers = new HashMap<>();
//结果集
Map<String,Long> resultMap = new ConcurrentHashMap<>();
//最终结果
long result = 0; //提交100个任务
for (int i = 0; i < 100; i++) {
taskQueue.add(i);
} //添加10个工人
for (int i = 0; i < 10; i++) {
workers.put(i+"", new Thread(new Worker(taskQueue,resultMap),"工人"+i));
} //启动所有工人线程
Collection<Thread> threads = workers.values();
for (Thread thread : threads) {
thread.start();
} while(resultMap.size() > 0 || !isComplete(workers)){
Set<String> keySet = resultMap.keySet(); //每次从resultMap中取一个结果出来进行使用
String key = null;
for (String string : keySet) {
if(string != null){
key = string;
break;
}
}
Long value = null;
if(key != null){
value = resultMap.get(key);
} //能取到结果就使用,没有结果继续循环
if(value != null){
//获取到一个运算结果就使用
result = result+value;
//使用后从结果集中移除
resultMap.remove(key);
} }
long after = System.currentTimeMillis();
System.out.println("结果耗时:"+(after - befor));
System.out.println(result); } /**
* 判断所有的工人是否已经完成工作
* @param workers
* @return
*/
private static boolean isComplete(Map<String,Thread> workers){
for (Entry<String, Thread> entry : workers.entrySet()) {
if(entry.getValue().getState() != State.TERMINATED){
return false;
}
}
return true;
} @Test
public void test() throws InterruptedException{
long befor = System.currentTimeMillis();
long result = 0; for (int i = 0; i < 100; i++) { long cube = i*i*i;
result = result+cube;
Thread.sleep(100); }
long after = System.currentTimeMillis();
System.out.println("结果耗时:"+(after - befor));
System.out.println(result);
}
}

  

  

Worker源码

package masterWorker;

import java.util.Map;
import java.util.Queue; public class Worker implements Runnable{
private Queue<Integer> queue;
private Map<String,Long> resultMap; public Worker(Queue<Integer> queue,Map<String,Long> resultMap) {
this.queue = queue;
this.resultMap = resultMap;
} @Override
public void run() {
//不断循环从队列中取出任务进行运算,直到队列为空
while(true){
if(queue.peek() != null){
String name = Thread.currentThread().getName();
int poll =(int) queue.poll();
long result = poll*poll*poll;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
resultMap.put(poll+"", result);
System.out.println(name+"完成"+poll+"的运算,结果为:"+result);
}else{
break;
}
}
}
}

  

经典的Master-Worker源码实例

package masterWorker.classics;
import java.util.Map;
import java.util.Set; public class Main { /**
* @param args
*/
public static void main(String[] args) {
//固定使用5个Worker,并指定Worker
Master m = new Master(new PlusWorker(), Runtime.getRuntime().availableProcessors());
//提交100个子任务
for(int i=0;i<100;i++){
m.submit(i);
}
//开始计算
m.execute();
int re= 0;
//保存最终结算结果
Map<String ,Object> resultMap =m.getResultMap(); //不需要等待所有Worker都执行完成,即可开始计算最终结果
while(resultMap.size()>0 || !m.isComplete()){
Set<String> keys = resultMap.keySet();
String key =null;
for(String k:keys){
key=k;
break;
}
Integer i =null;
if(key!=null){
i=(Integer)resultMap.get(key);
}
if(i!=null){
//最终结果
re+=i;
}
if(key!=null){
//移除已经被计算过的项
resultMap.remove(key);
} }
System.out.println(re);
} } package masterWorker.classics;
import java.util.Map;
import java.util.Queue; public class Worker implements Runnable{ //任务队列,用于取得子任务
protected Queue<Object> workQueue;
//子任务处理结果集
protected Map<String ,Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue){
this.workQueue= workQueue;
} public void setResultMap(Map<String ,Object> resultMap){
this.resultMap=resultMap;
}
//子任务处理的逻辑,在子类中实现具体逻辑
public Object handle(Object input){
return input;
} @Override
public void run() { while(true){
//获取子任务
Object input= workQueue.poll();
if(input==null){
break;
}
//处理子任务
Object re = handle(input);
resultMap.put(Integer.toString(input.hashCode()), re);
}
} } package masterWorker.classics;
public class PlusWorker extends Worker { @Override
public Object handle(Object input) { Integer i =(Integer)input;
return i*i*i;
} } package masterWorker.classics;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; public class Master { // 任务队列
protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
// Worker进程队列
protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
// 子任务处理结果集
protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
// 是否所有的子任务都结束了
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
} }
return true;
} // Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量
public Master(Worker worker, int countWorker) { worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
for (int i = 0; i < countWorker; i++) {
threadMap.put(Integer.toString(i),
new Thread(worker, Integer.toString(i)));
} } // 提交一个任务
public void submit(Object job) {
workQueue.add(job);
} // 返回子任务结果集
public Map<String, Object> getResultMap() {
return resultMap;
} // 开始运行所有的Worker进程,进行处理
public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start(); }
} }