【JAVA】关于项目中的异步文件上传问题

时间:2024-03-21 19:13:27

【JAVA】关于项目中的异步文件上传问题

1.问题:
项目中客户文件上传请求过程http请求等待时间太长,考虑将文件业务处理部分采用异步方式实现,整个异步部分封装原先文件处理的业务逻辑,主线程及时返回客户上传请求结果.

请求示例:
【JAVA】关于项目中的异步文件上传问题
2.原本代码:

    /**
     * 文件上传
     *
     * @param file
     * @param paramStr Param
     * @return
     */
    @PostMapping("/fileAsyncUploadServlet")
    //文件接收:MultipartFile
	//JSON接收:String
    public String upload(@RequestParam("file") MultipartFile file, @RequestParam("param") String paramStr) {

	//参数校验业务逻辑...
	
	//调用文件处理方法(文件分析处理,上传云服务器等)...
	
	return result;
	
}	

3.更新代码:

//主启动类
@EnableAsync

//异步方法(文件处理)
@Async

//修改application.yaml
spring:
  async:
    core:
      pool:
        size: 5
    max:
      pool:
        size: 50
    queue:
      capacity: 500
//注:线程池参数设置技巧
//http://www.imooc.com/article/5887
      
package com.aisp.quality.check.config;

import java.util.concurrent.Executor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

  @Value("${spring.async.core.pool.size}")
  private int corePoolSize;
  @Value("${spring.async.max.pool.size}")
  private int maxPoolSize;
  @Value("${spring.async.queue.capacity}")
  private int queueCapacity;

  /**
   * ThreadPoolTaskExecutor settings
   */
  @Override
  @Bean(name = "taskExecutor")
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(corePoolSize);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setAllowCoreThreadTimeOut(true);
    executor.setThreadNamePrefix("async-executor-");
    return new ExceptionHandlingAsyncTaskExecutor(executor);
  }

  @Override
  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new SimpleAsyncUncaughtExceptionHandler();
  }
}
package com.aisp.quality.check.config;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.AsyncTaskExecutor;

@Slf4j
public class ExceptionHandlingAsyncTaskExecutor implements AsyncTaskExecutor,
    InitializingBean, DisposableBean {

  private final AsyncTaskExecutor executor;

  public ExceptionHandlingAsyncTaskExecutor(AsyncTaskExecutor executor) {
    this.executor = executor;
  }

  @Override
  public void execute(Runnable task) {
    executor.execute(createWrappedRunnable(task));
  }

  @Override
  public void execute(Runnable task, long startTimeout) {
    executor.execute(createWrappedRunnable(task), startTimeout);
  }

  private <T> Callable<T> createCallable(final Callable<T> task) {
    return () -> {
      try {
        return task.call();
      } catch (Exception e) {
        handle(e);
        throw e;
      }
    };
  }

  private Runnable createWrappedRunnable(final Runnable task) {
    return () -> {
      try {
        task.run();
      } catch (Exception e) {
        handle(e);
      }
    };
  }

  protected void handle(Exception e) {
    log.error("Caught async exception", e);
  }

  @Override
  public Future<?> submit(Runnable task) {
    return executor.submit(createWrappedRunnable(task));
  }

  @Override
  public <T> Future<T> submit(Callable<T> task) {
    return executor.submit(createCallable(task));
  }

  @Override
  public void destroy() throws Exception {
    if (executor instanceof DisposableBean) {
      DisposableBean bean = (DisposableBean) executor;
      bean.destroy();
    }
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    if (executor instanceof InitializingBean) {
      InitializingBean bean = (InitializingBean) executor;
      bean.afterPropertiesSet();
    }
  }
}

4.问题:

使用MultipartFile接收文件后,传递file给文件处理业务时出现问题,file经常性传递失败(接收到的file为null原本同步文件处理时未发现该问题),尤其是针对较大的文件,尝试使用字节数组和流文件方式传递发现问题可以解决。

//修改前
@Override
public void dealFile(MutipartFile file, String fileName, UserFileInfo userFileInfo) {
    //...
}

//修改后
@Override
@Async
public void dealFile(InputStream file, String fileName, UserFileInfo userFileInfo) {
    //...
}