java大并发数据保存方案

时间:2022-04-06 08:44:32

做了几年.net,如今终于要做java了。

  • 需求:

    线下终端会定时上传gps位置到服务端,服务端收到数据保存到mysql数据库,当线下终端过多时,问题出现了,首当其冲的是数据库连接池经常会崩溃,单个tomcat到100并发就会抛出异常。

  • 解决思路:

    原来是收到一条数据就保存一条数据,现在改为将收到的数据暂存到一个数据池,当满100条数据时再用saveBatch一次性保存,这样终端上传100次其实只建立了一次数据库连接,减轻数据库压力。如果只是这样还有一个不足,就是当数据池的数据一直都不满100条时,永远都不会保存到数据库,所以再加一个守护线程,每2分钟检查一次,如果数据池有数据,就全部保存到数据库。

java大并发数据保存方案

  • 该方案执行效率,如下图,共1000次请求,100次并发的情况

java大并发数据保存方案

  • 具体实现:

1、数据池代码

public class GpsPool {
private static final Logger log = Logger.getLogger(GpsPool.class.getName());
public static Queue<ClientGPS> queGps=new LinkedList<ClientGPS>();//暂存数据,等数据超过100条或者超过时间后自动保存
private static Object lock=new Object();
//private ExecutorService s=Executors.newSingleThreadExecutor();
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());
/**
* 一次性最多取出100个数据
* @return
*/
public static List<ClientGPS> poll100(){
List<ClientGPS> gpss=new ArrayList<ClientGPS>();
synchronized (lock) {
int length=100;
if(queGps.size()<100)
length=queGps.size();
for (int i = 0; i < length; i++) { gpss.add(queGps.poll());
}
}
return gpss;
}
public void saveGps(ClientGPS gps){
if (null==gps||queGps.size()>2000) {//大于2000暂不处理
return;
}
if (queGps.size()>100) {
queGps.offer(gps);
threadPool.execute(new GpsSaveThread());
System.out.println("add2pool");
}else{
queGps.offer(gps);
} }
}

2、数据保存线程

class GpsSaveThread implements Runnable{
private static final Logger log = Logger.getLogger(GpsTask.class.getName());
ClientGPSService gpsService;
@Override
public void run() {
gpsService=ServiceLocator.getBean(ClientGPSService.class);
try {
List<ClientGPS> gpss;
while (GpsPool.queGps.size() > 100) {
gpss = GpsPool.poll100();
if (gpss.size() > 0) {
try {
gpsService.saveBatch(gpss.toArray(), gpss.size());
} catch (Exception e) {
saveError(e);
try {//失败后再试一次
gpsService.saveBatch(gpss.toArray(), gpss.size());
} catch (Exception ex) {
}
}
} }
} catch (Exception e) {
saveError(e);
} } }

3、守护线程代码

public class GpsTask implements Runnable{

    private static final Logger log = Logger.getLogger(GpsTask.class.getName());
ClientGPSService gpsService;
@Override
public void run() {
gpsService=ServiceLocator.getBean(ClientGPSService.class);
while (true) {
try {
Thread.sleep(1000*60*2);//2分钟检查一次
List<ClientGPS> gpss= GpsPool.poll100();
GpsPool.queGps.clear();
if(gpss.size()>0){
try {
gpsService.saveBatch(gpss.toArray(),gpss.size());
} catch (Exception e) {
saveError(e);
}
}
} catch (InterruptedException e) {
saveError(e);
}catch(Exception e){
saveError(e);
}
} }
}

守护线程代码

  • 抛砖引玉

目前这个方案有点问题,就是并发过千时,cpu会达到100%,期待有更好的方案。