解决YED画图CPU占用100%的问题

时间:2023-01-21 23:33:55

背景

公司很多项目都会用到YED包来画关联方的关联图,当某个公司的4度关联方个数过多以及关联关系太过复杂时,使用YED画图会造成CPU占用一直100%并且长时间占用不释放CPU资源。所以在使用YED画图的代码流程设计需要一些技巧来保证YED画图不会阻塞主线程,并且确保当图片长时间无法绘制成功时,需要有一种机制强制终止绘制,避免CPU资源长时间无意义的浪费。

解决方案思考

  1. 找出YED无法完成绘制的原因。从根本上解决。
  2. 如果无法从根本上解决至少需要解决长时间CPU100%占用。

尝试解决

首先尝试了找寻问题的根本原因,提出3个假设:由于数据有问题?YED代码bug?数据量过大造成内存不足? 然而遗憾的是时间有限从这3方面入手都并没有进展,如果各位同事有兴趣可以尝试找出根本原因,有进展可与我联系。

所以开始尝试超时终止绘制方案,具体思路如下:

  1. 在需要YED画图时,使用别的线程进行绘制避免主线程阻塞
  2. 使用线程池管理线程,避免高并发时线程无限制增加
  3. 通过其他线程等待一定时间,如果绘制还是没有完成中断绘制尝试

阅读YED源代码发现,项目中使用的YED版本并不响应线程中断,所以只有使用stop()方法强制终止线程,这样我们就需要在线程池中取到需要终止的线程,考虑自定义一个连接池,连接池中维护一个以任务为key,以执行这个任务线程为value的map,让我们在需要时可以使用任务取到对应的线程并终止它。

同时应产品要求如果4度图片绘制失败需要尝试3度图片绘制,3度失败再尝试2度,所以提交任务时需要传入一个供用户回调的接口。

用以上思路代码实现大致如下:

自定义线程池如下:

public class TraceThreadPoolExecutor extends ThreadPoolExecutor { // 1 minutes private static final int MONITOR_SHUTDOWN_TIMEOUT = 60; private final Map<Runnable, Thread> tracer = new ConcurrentHashMap<>(); private final ExecutorService monitor; /** * Submit a job and monitor elapse time, if job elapse time greater than * timeout threshold, force stop it. * * @param job * @param timeOut * @param timeUnit * @param handler When job timeout, call it. * @return */ public Future<?> submitJob(Runnable job, int timeOut, TimeUnit timeUnit, TimeOutHandler handler) { Future<?> future = submit(job); monitor.submit(new MonitorJob(job, future, timeOut, timeUnit, handler)); return future; } /** * Stop executor & monitor, if timeout, force stop. */ public void stop() { /** Stop monitor executor */ shutdownExecutorQuietly(monitor); /** Stop worker executor */ shutdownExecutorQuietly(this); } private void shutdownExecutorQuietly(ExecutorService executor) { if (null != executor) { try { executor.shutdown(); if (!executor.isShutdown()) { try { executor.awaitTermination(MONITOR_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException ignore) { } /** 等待超时或被中断,直接强制停止 */ if (!executor.isShutdown()) { executor.shutdownNow(); } } } catch (Exception ignore) { // do nothing } } } @Override protected void beforeExecute(Thread worker, Runnable job) { traceThread(job, worker); } @Override protected void afterExecute(Runnable job, Throwable error) { removeTraceThread(job); } @SuppressWarnings("rawtypes") private class MonitorJob implements Runnable { private final Future<?> future; private final Runnable originJob; private final int timeOut; private final TimeUnit timeUnit; private final TimeOutHandler handler; public MonitorJob(Runnable originJob, Future<?> future, int timeOut, TimeUnit timeUnit, TimeOutHandler handler) { this.originJob = originJob; this.future = future; this.timeOut = timeOut; this.timeUnit = timeUnit; this.handler = handler; } @Override public void run() { if (null != future) { try { future.get(timeOut, timeUnit); } catch (InterruptedException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException timeOut) { /** Stop worker thread force */ Thread worker = getTraceThread((FutureTask) future); if (null != worker && worker.isAlive()) { worker.stop(); } /** Do callback */ if (null != handler) { handler.doHandle(originJob); } } } } } private void traceThread(Runnable job, Thread worker) { tracer.put(job, worker); } private Thread getTraceThread(Runnable job) { return tracer.get(job); } private void removeTraceThread(Runnable job) { tracer.remove(job); } private TraceThreadPoolExecutor(int workerThreads, int monitorThreads) { super(workerThreads, workerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); monitor = Executors.newFixedThreadPool(monitorThreads); } public static TraceThreadPoolExecutor createFixedThreadPool(int workerThreads, int monitorThreads) { return new TraceThreadPoolExecutor(workerThreads, monitorThreads); } } 

并发任务处理代码如下:

@Component
@Lazy(false) public class TasksHandler { @Value("${system.processor.num:2}") private int processors; @Value("${system.monitor.num:2}") private int monitorProcessors; /** * 日志 */ private final Logger logger = LoggerFactory.getLogger(TasksHandler.class); private TraceThreadPoolExecutor executor; @Autowired private JobScheduleService jobScheduleService; @Autowired private FileFactory fileFactory; @PostConstruct public void initialize() { executor = TraceThreadPoolExecutor.createFixedThreadPool(processors, monitorProcessors); List<JobSchedule> jobSchedules = jobScheduleService.selectByStatuses(Lists.newArrayList(NEW, CREATING)); jobSchedules.forEach(jobSchedule -> submitJob(jobSchedule)); } @PreDestroy public void stop() { executor.stop(); } public void submitJob(JobSchedule jobSchedule) { Map<Object, Object> params = new HashMap<>(); switch (jobSchedule.getJobType()) { case "DNA": params.put("dnaDeep", 4); break; case "HTML": break; case "PDF": params.put("bothTemplate", true); break; } executor.submitJob(new TaskThread(fileFactory, jobScheduleService, jobSchedule, params), 5, TimeUnit.MINUTES, job -> { switch (jobSchedule.getJobType()) { case "DNA": recursionHandler(jobSchedule, params); break; case "HTML": logger.info("HTML第一次生成超时,稍后进行重试"); retryHandler(jobSchedule, params); break; case "PDF": logger.info("PDF第一次生成超时,稍后进行重试"); retryHandler(jobSchedule, params); break; } }); } private void recursionHandler(JobSchedule jobSchedule, Map params) { if ((int) params.get("dnaDeep") == 1) return; logger.info("{}度DNA生成超时,开始尝试生成{}度DNA", params.get("dnaDeep"), (int) params.get("dnaDeep") - 1); params.put("dnaDeep", (int) params.get("dnaDeep") - 1); executor.submitJob(new TaskThread(fileFactory, jobScheduleService, jobSchedule, params), 2, TimeUnit.MINUTES, dnaJob -> { recursionHandler(jobSchedule, params); }); } private void retryHandler(JobSchedule jobSchedule, Map params) { executor.submitJob(new TaskThread(fileFactory, jobScheduleService, jobSchedule, params), 5, TimeUnit.MINUTES, job -> { switch (jobSchedule.getJobType()) { case "HTML": logger.info("HTML生成超时 id:{}, companyName:{}", jobSchedule.getId(), jobSchedule.getCompanyName()); break; case "PDF": logger.info("PDF生成超时 id:{}, companyName:{}", jobSchedule.getId(), jobSchedule.getCompanyName()); break; } }); } }