Java实现多线程的三种方式

时间:2023-03-08 17:18:56
Java实现多线程的三种方式

  Java多线程实现方式主要有三种:继承Thread类、实现Runnable接口、使用ExecutorService、Callable、Future实现有返回结果的多线程。前两种方式启动的线程没有返回值,启动后与主线程没有任何关系,主线程也不知道子线程是否执行结束;后一种方式线程有返回值,启动后主线程可以根据线程对象来判断显示是否结束以及获取线程执行结果,前者多用于,当执行一个主要任务时需要执行一个辅助耗时任务,但是主任务并不关心辅助任务是否执行成功,执行成功的结果是什么,后者多用于执行主任务时需要执行一个耗时的辅助任务,但是主任务的执行结果或者执行流向依赖于辅助任务的执行结果。

  1、继承Thread的方式实现线程

 package com.luwei.test.thread;

 public class ThreadMain {

     public static void main(String[] args) {
System.out.println("主线程开始执行......");
SubThread subThread = new SubThread();
subThread.start();
System.out.println("主线程执行结束......");
}
} class SubThread extends Thread { @Override
public void run() {
// 执行子线程业务逻辑
System.out.println("子线程启动开始执行子线程业务");
} }

  2、实现Runnable接口实现线程

 package com.luwei.test.thread;

 public class ThreadMain {

     public static void main(String[] args) {
System.out.println("主线程开始执行......");
SubThread subThread = new SubThread();
Thread thread = new Thread(subThread);
thread.start();
System.out.println("主线程执行结束......");
}
} class SubThread implements Runnable { @Override
public void run() {
// 执行子线程业务逻辑
System.out.println("子线程启动开始执行子线程业务");
} }

  3、实现Callable接口方式实现由返回值的线程

 package com.tinno.adsserver.biz;

 import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import javax.annotation.Resource; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import com.tinno.adsserver.base.BaseOutput;
import com.tinno.adsserver.constants.EnumType.HoProtocol;
import com.tinno.adsserver.constants.EnumType.HoRevenueType;
import com.tinno.adsserver.constants.EnumType.HoStatus;
import com.tinno.adsserver.constants.HoApiConst;
import com.tinno.adsserver.constants.MainConst;
import com.tinno.adsserver.constants.OutputCodeConst;
import com.tinno.adsserver.model.ho.HoApiResult;
import com.tinno.adsserver.model.ho.HoThreadBack;
import com.tinno.adsserver.service.HoSyncService;
import com.tinno.adsserver.utils.DateUtil;
import com.tinno.adsserver.utils.HoApiUtil;
import com.tinno.network.core.paging.QueryFilter; /**
* <Description> HasOffer操作Biz<br>
*
* @author lu.wei<br>
* @email 1025742048@qq.com<br>
* @date 2016年12月29日 <br>
* @version 1.0<br>
* @since V1.0<br>
* @see com.tinno.adsserver.biz <br>
*/
@Service("hasOfferOperateBiz")
public class HasOfferOperateBiz {
private Logger logger = LoggerFactory.getLogger(getClass()); @Resource(name = "hoSyncService")
private HoSyncService hoSyncService; public BaseOutput syncOfferToHasOffer(QueryFilter filter) throws InterruptedException, ExecutionException {
BaseOutput out = new BaseOutput(); Date now = new Date();
long tm = now.getTime();
int sum = 0;
int addCnt = 0;
int disableCnt = 0;
int updateCnt = 0;
int addFailCnt = 0;
int disableFailCnt = 0;
int updateFailCnt = 0;
List<String> addFailIds = new ArrayList<String>();
List<String> disableFailIds = new ArrayList<String>();
List<String> updateFailIds = new ArrayList<String>(); filter.getParam().put(MainConst.KEY_TM, String.valueOf(tm));
// 为了防止长事务所表,将业务层的操作拆分出来
// 1、进行预占数据
int occupyCnt = hoSyncService.updateNotSyncOfferToOccupy(filter); // 2、获取占用数据
List<Map<String, Object>> changedOffers = new ArrayList<Map<String, Object>>();
if (occupyCnt > 0) {
changedOffers = hoSyncService.selectFullOccupyChangeOffer(filter);
} // 3、生成失效时间
Calendar calendar = Calendar.getInstance();
calendar.setTime(now);
calendar.add(Calendar.YEAR, 1);
Date yearDate = calendar.getTime();
String expirDate = DateUtil.transferDateToStr(yearDate, DateUtil.DF_PATTERN_YYYY_MM_DD_HHMMSS);
// 4、开始处理数据
if (null != changedOffers && !changedOffers.isEmpty()) {
sum = changedOffers.size();
// 计算需要启动的线程数量
// 每300条记录启动一个线程
int threadCnt = (int) Math.ceil(sum / 300.0);
ExecutorService pool = Executors.newFixedThreadPool(threadCnt);
List<Future<HoThreadBack>> futures = new ArrayList<Future<HoThreadBack>>();
for (int i = 0; i < threadCnt; i++) {
int startIndex = i * 300;
int endIndex = (i + 1) * 300;
if (endIndex > sum) {
endIndex = sum;
}
List<Map<String, Object>> subOfferData = changedOffers.subList(startIndex, endIndex); Future<HoThreadBack> future = getDealSyncOfferFuture(pool, subOfferData, filter, expirDate);
futures.add(future);
} int executeCnt = 0;
List<HoThreadBack> backs = new ArrayList<HoThreadBack>();
while (executeCnt < threadCnt) {
List<Future<HoThreadBack>> forEachFutures = new ArrayList<Future<HoThreadBack>>();
for (Future<HoThreadBack> future : futures) {
if (future.isDone()) {
backs.add(future.get());
forEachFutures.add(future);
executeCnt++;
}
}
futures.removeAll(forEachFutures);
}
pool.shutdown();
if (!backs.isEmpty()) {
for (HoThreadBack back : backs) {
if (OutputCodeConst.SUCCESS.equals(back.getCode())) {
addCnt += back.getAddCnt();
updateCnt += back.getUpdateCnt();
disableCnt += back.getDisableCnt();
addFailCnt += back.getAddFailCnt();
updateFailCnt += back.getUpdateFailCnt();
disableFailCnt += back.getDisableFailCnt(); addFailIds.addAll(back.getAddFailIds());
updateFailIds.addAll(back.getUpdateFailIds());
disableFailIds.addAll(back.getDisableFailIds());
}
else {
logger.error(back.getMsg());
}
}
}
} // 释放占用数据
int unOccupyCnt = hoSyncService.updateOccupyedOfferToReleased(filter); StringBuffer sb = new StringBuffer("");
sb.append(" 执行同步Offer数据如下: 需要同步的数据总量为:").append(occupyCnt);
sb.append("; 能够同步的数据量为:").append(sum);
sb.append("; 新增成功的数据量为:").append(addCnt);
sb.append("; 更新成功的数据量为:").append(updateCnt);
sb.append("; 禁用成功的数据量为:").append(disableCnt);
sb.append("; 新增失败的数据量为:").append(addFailCnt);
sb.append("; 更新失败的数据量为:").append(updateFailCnt);
sb.append("; 禁用失败的数据量为:").append(disableFailCnt);
sb.append("; 释放的数据量为:").append(unOccupyCnt);
sb.append("; 新增失败的Offer为:").append(addFailIds);
sb.append("; 更新失败的Offer为:").append(updateFailIds);
sb.append("; 禁用失败的Offer为:").append(disableFailIds);
out.setMsg(out.getMsg() + sb.toString()); return out;
} /**
*
* <Description> 获取执行Future<br>
*
* @author lu.wei<br>
* @email 1025742048@qq.com<br>
* @date 2016年12月29日 上午9:55:50
* @param threadPool
* @param offerDatas
* @param filter
* @param expirDate
* @return <br>
*/
private Future<HoThreadBack> getDealSyncOfferFuture(ExecutorService threadPool, final List<Map<String, Object>> offerDatas,
final QueryFilter filter, final String expirDate) {
return threadPool.submit(new Callable<HoThreadBack>() {
@Override
public HoThreadBack call() throws Exception {
return updateOfferToHO(offerDatas, filter, expirDate);
}
});
} /**
*
* <Description> 同步Offer到HasOffers<br>
*
* @author lu.wei<br>
* @email 1025742048@qq.com<br>
* @date 2016年12月29日 下午3:20:39
* @param offerDatas
* @param filter
* @param expirDate
* @return <br>
*/
private HoThreadBack updateOfferToHO(List<Map<String, Object>> offerDatas, QueryFilter filter, String expirDate) {
String threadName = Thread.currentThread().getName();
HoThreadBack threadBack = new HoThreadBack();
int sum = offerDatas.size();
int addCnt = 0;
int disableCnt = 0;
int updateCnt = 0;
int addFailCnt = 0;
int disableFailCnt = 0;
int updateFailCnt = 0;
List<Map<String, String>> newHasOfferIds = new ArrayList<Map<String, String>>();
List<String> needUpdateIds = new ArrayList<String>(); logger.info("-----------------------线程{}执行-------------------------------------Begin", threadName); BaseOutput out = new BaseOutput();
String hasOfferId = null;
int i = 1;
for (Map<String, Object> offerData : offerDatas) {
logger.info("线程 {} 总共需要执行的数据 {} 当前正在执行的是第{}个", threadName, sum, i);
String id = String.valueOf(offerData.get("id"));
filter.getParam().put("offerId", id);
Object hasofferIdObj = offerData.get("hasofferId");
// 执行新增操作
if (null == hasofferIdObj || "".equals(String.valueOf(hasofferIdObj))) {
long begin = System.currentTimeMillis();
// 稍后批量执行新增操作
// hasOfferId = hoSyncService.addRemoteHoOffer(offerData,
// expirDate);
hasOfferId = addRemoateHoOffer(offerData, expirDate);
logger.info("线程 {} 执行新增Offer({})到 HasOffer耗时{}", threadName, id, (System.currentTimeMillis() - begin));
if (null != hasOfferId) {
addCnt += 1;
Map<String, String> idMap = new HashMap<String, String>();
idMap.put(MainConst.KEY_ID, id);
idMap.put(MainConst.KEY_CA_HASOFFERSOFFERID, hasOfferId);
newHasOfferIds.add(idMap);
}
else {
addFailCnt++;
threadBack.getAddFailIds().add(id);
}
}
// 执行更新操作
else {
hasOfferId = String.valueOf(hasofferIdObj);
String status = String.valueOf(offerData.get("status"));
// 修改Offer为禁用状态
if ("0".equals(status)) {
out = updateRemoteHoOfferToDisabled(offerData); if (out.getCode().equals(OutputCodeConst.SUCCESS)) {
disableCnt += 1;
needUpdateIds.add(id);
}
else {
disableFailCnt++;
threadBack.getDisableFailIds().add(id);
}
}
// 执行更新目前只更新Offer Url
else {
out = updateRemoteHoOffer(offerData);
if (out.getCode().equals(OutputCodeConst.SUCCESS)) {
updateCnt += 1;
needUpdateIds.add(id);
}
else {
updateFailCnt++;
threadBack.getUpdateFailIds().add(id);
}
}
}
i++;
} // 存在新增Offer
if (!newHasOfferIds.isEmpty()) {
filter.getExtraParam().put(MainConst.KEY_DATAS, newHasOfferIds);
hoSyncService.updateUnOfferdOfferToNotChange(filter);
} // 存在需要更新的Offer
if (!needUpdateIds.isEmpty()) {
filter.getExtraParam().put(MainConst.KEY_NEED_UPDATE_IDS, needUpdateIds);
hoSyncService.updateOfferdOfferToNotChange(filter);
} threadBack.setCode(out.getCode());
threadBack.setMsg(out.getMsg());
threadBack.setAddCnt(addCnt);
threadBack.setUpdateCnt(updateCnt);
threadBack.setDisableCnt(disableCnt);
threadBack.setAddFailCnt(addFailCnt);
threadBack.setDisableFailCnt(disableFailCnt);
threadBack.setUpdateFailCnt(updateFailCnt); logger.info(threadBack.toString());
logger.info("-----------------------线程{}执行-------------------------------------End", threadName);
return threadBack;
} /**
*
* <Description> 将Offer新增到HasOffer<br>
*
* @author lu.wei<br>
* @email 1025742048@qq.com<br>
* @date 2016年12月30日 下午2:09:52
* @param offerData
* @param expirDate
* @return <br>
*/
private String addRemoateHoOffer(Map<String, Object> offerData, String expirDate) {
Object idObj = offerData.get("id");
logger.info("---------------addRemoteHoOffer({})-----------------------Begin", idObj); String hasOfferId = null;
try {
Map<String, Object> params = new HashMap<String, Object>();
params.put(HoApiConst.KEY_ADVERTISER_ID, String.valueOf(offerData.get("advertiserId")));
params.put(MainConst.KEY_NAME, String.valueOf(offerData.get(MainConst.KEY_NAME)));
if (null != offerData.get(MainConst.KEY_DESCRIPTION)) {
params.put(MainConst.KEY_DESCRIPTION, String.valueOf(offerData.get(MainConst.KEY_DESCRIPTION)));
}
params.put(HoApiConst.KEY_PREVIEW_URL, String.valueOf(offerData.get("clickUrl")));
params.put(HoApiConst.KEY_OFFER_URL, String.valueOf(offerData.get("clickUrl")));
params.put(HoApiConst.KEY_STATUS, HoStatus.Active.getValue());
params.put(HoApiConst.KEY_EXPIRATION_DATE, expirDate);
params.put(MainConst.KEY_CURRENCY, String.valueOf(offerData.get(MainConst.KEY_CURRENCY)));
params.put(MainConst.KEY_PROTOCOL, HoProtocol.HttpiFramePixel.getValue());
logger.info(params.toString());
if (null != offerData.get("revenueType")) {
String localRevenueType = String.valueOf(offerData.get("revenueType"));
String revenueType = HoRevenueType.CPC.getValue();
if ("cpi".equals(localRevenueType)) {
revenueType = HoRevenueType.CPA_FLAT.getValue();
}
params.put(HoApiConst.KEY_REVENUE_TYPE, revenueType);
}
if (null != offerData.get("revenueRate")) {
params.put(HoApiConst.KEY_MAX_PAYOUT, String.valueOf(offerData.get("revenueRate")));
} HoApiResult addResult = HoApiUtil.getHoOperateResult(HoApiConst.OFFER_CREATE_OFFER, params);
String status = addResult.getResponse().getStatus();
if (null != status && HoApiConst.KEY_STATUS_SUCCESS.equals(status)) {
logger.info("HasOffers执行新增Offer成功!");
Map<String, String> data = addResult.getResponse().getDatas().get(0);
hasOfferId = data.get(MainConst.KEY_ID);
}
else {
logger.error("执行HasOffer接口失败: {} ", addResult.getResponse().getErrorMessage());
}
}
catch (Exception e) {
logger.error("执行addRemoteHoOffer({})接口失败: {} ", idObj, e.getMessage(), e);
}
logger.info("---------------addRemoteHoOffer({})-----------------------End", idObj);
return hasOfferId;
} /**
*
* <Description> 更新远程HasOffer信息<br>
*
* @author lu.wei<br>
* @email 1025742048@qq.com<br>
* @date 2016年12月30日 下午2:24:14
* @param offerData
* @return <br>
*/
private BaseOutput updateRemoteHoOffer(Map<String, Object> offerData) {
Object idObj = offerData.get("id");
logger.info("---------------updateRemoteHoOffer({})-----------------------Begin", idObj);
BaseOutput out = new BaseOutput();
try {
String hasOfferId = String.valueOf(offerData.get("hasofferId"));
Map<String, Object> dataParams = new HashMap<String, Object>();
dataParams.put(HoApiConst.KEY_ADVERTISER_ID, String.valueOf(offerData.get("advertiserId")));
dataParams.put(MainConst.KEY_NAME, String.valueOf(offerData.get(MainConst.KEY_NAME)));
if (null != offerData.get(MainConst.KEY_DESCRIPTION)) {
dataParams.put(MainConst.KEY_DESCRIPTION, String.valueOf(offerData.get(MainConst.KEY_DESCRIPTION)));
}
dataParams.put(HoApiConst.KEY_PREVIEW_URL, String.valueOf(offerData.get("clickUrl")));
dataParams.put(HoApiConst.KEY_OFFER_URL, String.valueOf(offerData.get("clickUrl")));
dataParams.put(HoApiConst.KEY_STATUS, HoStatus.Active.getValue());
dataParams.put(MainConst.KEY_CURRENCY, String.valueOf(offerData.get(MainConst.KEY_CURRENCY)));
dataParams.put(MainConst.KEY_PROTOCOL, HoProtocol.HttpiFramePixel.getValue()); if (null != offerData.get("revenueType")) {
String localRevenueType = String.valueOf(offerData.get("revenueType"));
String revenueType = HoRevenueType.CPC.getValue();
if ("cpi".equals(localRevenueType)) {
revenueType = HoRevenueType.CPA_FLAT.getValue();
}
dataParams.put(HoApiConst.KEY_REVENUE_TYPE, revenueType);
}
if (null != offerData.get("revenueRate")) {
dataParams.put(HoApiConst.KEY_DEFAULT_PAYOUT, String.valueOf(offerData.get("revenueRate")));
} Map<String, String> urlParams = new HashMap<String, String>();
urlParams.put(MainConst.KEY_ID, hasOfferId); HoApiResult addResult = HoApiUtil.getHoOperateResult(HoApiConst.OFFER_UPDATE_OFFER, urlParams, dataParams);
String status = addResult.getResponse().getStatus();
if (null != status && HoApiConst.KEY_STATUS_SUCCESS.equals(status)) {
logger.info("HasOffers执行修改Offer成功!");
}
else {
out.setCode(OutputCodeConst.UNKNOWN_ERROR);
out.setMsg("HasOffers执行修改Offer失败!");
logger.error("执行updateRemoteHoOffer({})接口失败: {} ", idObj, addResult.getResponse().getErrorMessage());
}
}
catch (Exception e) {
out.setCode("-999");
out.setMsg("HasOffers执行修改Offer状态失败!");
logger.error("执行updateRemoteHoOffer({})接口失败: {} ", idObj, e.getMessage(), e);
}
logger.info("---------------updateRemoteHoOffer({})-----------------------End", idObj);
return out;
} /**
*
* <Description> 禁用远程HasOffer<br>
*
* @author lu.wei<br>
* @email 1025742048@qq.com<br>
* @date 2016年12月30日 下午2:26:54
* @param offerData
* @return <br>
*/
private BaseOutput updateRemoteHoOfferToDisabled(Map<String, Object> offerData) {
Object idObj = offerData.get("id");
logger.info("---------------updateRemoteHoOfferToDisabled({})-----------------------Begin", idObj); BaseOutput out = new BaseOutput();
try {
String hasOfferId = String.valueOf(offerData.get("hasofferId")); Map<String, Object> dataParams = new HashMap<String, Object>();
dataParams.put(HoApiConst.KEY_STATUS, HoStatus.Expired.getValue()); Map<String, String> urlParams = new HashMap<String, String>();
urlParams.put(MainConst.KEY_ID, hasOfferId); HoApiResult addResult = HoApiUtil.getHoOperateResult(HoApiConst.OFFER_UPDATE_OFFER, urlParams, dataParams);
String status = addResult.getResponse().getStatus();
if (null != status && HoApiConst.KEY_STATUS_SUCCESS.equals(status)) {
logger.info("HasOffers执行修改Offer状态成功!");
}
else {
out.setCode("-999");
out.setMsg("HasOffers执行修改Offer状态失败!");
logger.error("执行updateRemoteHoOfferToDisabled({})接口失败: {} ", idObj, addResult.getResponse().getErrorMessage());
}
}
catch (Exception e) {
out.setCode("-999");
out.setMsg("HasOffers执行修改Offer状态失败!");
logger.error("执行updateRemoteHoOfferToDisabled({})接口失败: {} ", idObj, e.getMessage(), e);
}
logger.info("---------------updateRemoteHoOfferToDisabled({})-----------------------End", idObj);
return out;
}
}

说明:

  在使用多线程的过成功为了减少类的开发经常会使用匿名内部类的方式来启动线程,这样减少线程类的开发,同时还可以让匿名内部类的访问外部类的内容,如下

 @RequestMapping("/add")
@ResponseBody
public String add(final HttpServletRequest request, HttpServletResponse response) {
BaseOutput outPut = new BaseOutput();
try {
QueryFilter filter = new QueryFilter(request);
logger.info(filter.toString());
String country = filter.getParam().get(MainConst.KEY_COUNTRY);
String email = filter.getParam().get(MainConst.KEY_EMAIL);
String zipcode = filter.getParam().get(MainConst.KEY_ZIPCODE);
if(!StringUtil.isEmpty(country) && !StringUtil.isEmpty(email) && !StringUtil.isEmpty(zipcode)) {
filter.getParam().clear();
filter.getParam().put(MainConst.KEY_EMAIL, email);
int cnt = advertiserService.getAdvertiserCnt(filter);
if(cnt == 0) {
filter = new QueryFilter(request);
outPut = advertiserService.add(filter);
} else {
outPut.setCode(OutputCodeConst.EMAIL_IS_EXITS);
outPut.setMsg(OutputCodeConst.getMsg(OutputCodeConst.EMAIL_IS_EXITS));
}
}
else {
outPut.setCode(OutputCodeConst.INPUT_PARAM_IS_NOT_FULL);
outPut.setMsg("Country and email and zipcode is needed.");
}
} catch (Exception e) {
logger.error("新增异常!", e);
outPut.setCode(OutputCodeConst.UNKNOWN_ERROR);
outPut.setMsg("新增异常! " + e.getMessage());
} //新增成功后同步
if(OutputCodeConst.SUCCESS.equals(outPut.getCode())) {
//新增后同步到HasOffer
String hsSyncSwitch = applicationConfiConst.getHsSyncSwitch();
if(null != hsSyncSwitch && HoApiConst.HS_SYNC_SWITCH_OPEN.equals(hsSyncSwitch)) {
//多线程方式进行处理
new Thread(new Runnable() {
@Override
public void run() {
long begin = new Date().getTime();
logger.info("新增同步开始, 开始时间: {} ", begin);
try {
QueryFilter filter = new QueryFilter(request);
hasOfferOperateBiz.syncAdvertiserToHasOffer(filter);
} catch(Exception e) {
logger.error("新增同步广告商出错:{} ", e.getMessage(), e);
} long end = new Date().getTime();
logger.info("新增同步结束, 结束时间: {} ", end);
logger.info("新增同步累计耗时: {} ", (end - begin));
}
}).start(); }
} return JsonUtil.objectToJson(outPut);
}
 private Future<HoThreadBack> getDealSyncOfferFuture(ExecutorService threadPool, final List<Map<String, Object>> offerDatas,
final QueryFilter filter, final String expirDate) {
return threadPool.submit(new Callable<HoThreadBack>() {
@Override
public HoThreadBack call() throws Exception {
return updateOfferToHO(offerDatas, filter, expirDate);
}
});
}