Gobrs-Async 1.2.9-RELEASE 高性能并发编程框架

时间:2023-01-03 12:11:58

Gobrs-Async 1.2.9-RELEASE 高性能并发编程框架

前言

大家好! 元旦快乐!很高兴又和大家见面啦!在新的一年里祝大家元旦快乐 大吉大利!心想事成!平平安安!万事如意!

Gobrs-Async 又给大家带来了诸多好用、高效、实用的功能啦。新的一年用全新的开发思维去迎接全新的美好未来吧!

各位技术小伙伴们,系好安全带准备发车啦 Let's Go !!!

超时任务

顾名思义其实就是单一任务可以通过设置超时时间决定该任务是否可以继续执行。开发者在开发 IO、CPU计算任务时是非常有用的。

使用方式

使用 @Task注解中的 timeoutInMilliseconds 属性进行配置。

  • timeoutInMilliseconds 固定使用毫秒数
package com.gobrs.async.test.task.timeout;

import com.gobrs.async.core.TaskSupport;
import com.gobrs.async.core.anno.Task;
import com.gobrs.async.core.task.AsyncTask;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Task(timeoutInMilliseconds = 300)
public class CaseTimeoutTaskA extends AsyncTask {
    int i = 10000;

    @Override
    public void prepare(Object o) {
        log.info(this.getName() + " 使用线程---" + Thread.currentThread().getName());
    }

    @SneakyThrows
    @Override
    public String task(Object o, TaskSupport support) {
        System.out.println("CaseTimeoutTaskA Begin");
        Thread.sleep(400);
        for (int i1 = 0; i1 < i; i1++) {
            i1 += i1;
        }
        System.out.println("CaseTimeoutTaskA Finish");
        return "result";
    }
}

超时监听线程池配置

所谓的监听线程池配置即:监听任务超时的线程池的核心线程数,关于为什么要配置该参数。请查看下方的 特别说明。

gobrs:
  async:
    config:
      rules:
        - name: "chain1"
          content: "taskA->taskB->taskC"
      timeout-core-size: 200 # 核心线程数量

特别说明

  • 超时任务不支持线程复用,因为需要通过控制线程超时来进行逻辑判断,如果支持线程复用,可能会出现中断正在复用线程的任务执行
  • 熔断降级原理参考Hystrix 方式处理,如果对Hystrix不熟悉可以借鉴 Hystrix 熔断降级原理解析
  • 因线程调度的原因,超时时间可能存在10ms内的误差,可忽略!

线程复用

小提问

试想下方任务流程中,在理想情况下最少用几个线程即可完成流程的执行? taskA->taskB,TaskC->taskD->taskE,taskF 是6个线程 还是5、4、3、2 你可能已经想到了答案是 2个线程(main线程除外)即可完成该复杂的多线程流程。 为什么呢? 因为在并发执行时, TaskB、TaskC 又或者是 TaskE、TaskF 都只有两个并发的任务同时存在,所以决定使用线程数量个数的根本条件是有多少个并发任务同时执行 那么看下gobrs-async 此时有多少个线程在执行

Gobrs-Async 1.2.9-RELEASE 高性能并发编程框架

测试用例

地址

运行结果

主线程使用main
使用main
TaskA
2022-12-11 13:58:22.581  INFO 8039 --- [           main] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskA] execution
使用main
TaskC
使用pool-2-thread-1
TaskB
2022-12-11 13:58:22.694  INFO 8039 --- [           main] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [TaskC] execution
2022-12-11 13:58:22.694  INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskB] execution
使用pool-2-thread-1
TaskD
2022-12-11 13:58:22.804  INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskD] execution
使用pool-2-thread-1
使用pool-2-thread-2
TaskE
2022-12-11 13:58:22.909  INFO 8039 --- [pool-2-thread-2] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskE] execution
TaskF
2022-12-11 13:58:22.910  INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask      : <11780902254572224> [taskF] execution
2022-12-11 13:58:22.913  INFO 8039 --- [           main] com.gobrs.async.core.TaskLoader          : 【ProcessTrace】Total cost: 440ms | traceId = 11780902254572224 | 【task】taskA cost :103ms【state】:success; ->【task】taskB cost :103ms【state】:success; ->【task】TaskC cost :103ms【state】:success; ->【task】taskD cost :106ms【state】:success; ->【task】taskE cost :101ms【state】:success; ->【task】taskF cost :101ms【state】:success; 
耗时462

结论

通过日志可以看到 TaskC使用了 TaskA的线程执行任务, 因TaskBTaskC 是并行的, 所以此时需要开辟新线程执行TaskB,等到TaskB执行完成后, TaskD继续使用 TaskB的 线程 pool-2-thread-1 执行任务, 此时TaskC执行完成后 发现其子任务已经被 TaskB释放后的线程拿到执行权,则不需要使用自身线程执行任务。 同理任务流程 继续往下执行。 整个流程中一共使用 3个线程(包含main线程)。

线程池隔离

介绍

Gobrs-Async 提供线程池配置隔离机制。 不同的规则可以使用不同的线程池处理任务,防止某任务规则出现线程池性能瓶颈后影响其他规则流程的运行。 如果规则不做线程池配置。 那么默认会使用统一的线程池配置。 如果也没有做统一的线程池配置。则SDK会默认使用 Executors.newCachedThreadPool() 作为默认的线程池。

自定义固定线程池(API方式)

Gobrs-Async 默认使用的是 Executors.newCachedThreadPool() 的线程池, 如果你想自定义线程池。满足自己的线程池需求。 只需要 继承GobrsThreadPoolConfiguration 重写doInitialize方法,如下配置:

@Configuration
public class ThreadPoolConfig extends GobrsThreadPoolConfiguration {

    @Override
    protected void doInitialize(GobrsAsyncThreadPoolFactory factory) {
        /**
         * 自定义线程池
         */
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(300, 500, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue());

        //  ExecutorService executorService = Executors.newCachedThreadPool();
        factory.setDefaultThreadPoolExecutor(threadPoolExecutor);
        //  factory.setThreadPoolExecutor("ruleName",threadPoolExecutor); // 设置规则隔离的线程池 ruleName 为 yml中配置的规则名称
    }
}

配置方式YML

默认线程池配置

gobrs:
  async:
    config:
      ## 如果规则没有制定 线程池 则使用 统一的线程池配置 如果通过 API 的方式动态更新了线程池 则使用动态更新 替换配置文件线程池配置 参见: ThreadPoolConfig
      thread-pool:
        core-pool-size: 1000
        max-pool-size: 2000

自定义规则配置

如果开发者针对流程规则做了单独的线程池配置。 那么会优先使用规则自定义的配置。也就是如下:caseOne流程 会使用 线程池配置为corePoolSize: 10 maxPoolSize: 20

gobrs:
  async:
    config:
      ## 如果规则没有制定 线程池 则使用 统一的线程池配置 如果通过 API 的方式动态更新了线程池 则使用动态更新 替换配置文件线程池配置 参见: ThreadPoolConfig
      thread-pool:
        core-pool-size: 1000
        max-pool-size: 2000
      rules:
        - name: "caseOne"
          content: "caseOneTaskA->caseOneTaskB,caseOneTaskC,caseOneTaskD"
          threadPool:
            corePoolSize: 10
            maxPoolSize: 20
        # 官方场景二 https://async.sizegang.cn/pages/2f844b/#%E5%9C%BA%E6%99%AF%E4%BA%8C
        - name: "caseTwo"
          content: "caseTwoTaskA->caseTwoTaskB->caseTwoTaskC,caseTwoTaskD"
          threadPool:
            corePoolSize: 30
            maxPoolSize: 40

热更新线程池配置

开发者可能有这种苦恼,线程池在运行时是在项目初始化的时候从application.yml中 加载的, 一旦程序运行起来之后,就无法修改使用的线程池了。 如果自己公司有分布式配置中心,可以实时更新程序内存的应用的话,那么gobrs也为你提供了入口。

在我们公司是有自己的热更新组件的,所有可以如下使用:

配置中心配置

{
corePoolSize: 210,
maxPoolSize: 600,
keepAliveTime: 30,
capacity: 10000,
threadNamePrefix: "m-detail"
rejectedExecutionHandler: "CallerRunsPolicy"
}
@Slf4j
@Configuration
public class ThreadPoolConfig {

    @Autowired
    private GobrsAsyncThreadPoolFactory factory;

    @Resource
    private DUCCConfigService duccConfigService;

    @PostConstruct
    public void gobrsThreadPoolExecutor() {
        // 从配置中心拿到 线程池配置规则 DuccConstant.GOBRS_ASYNC_THREAD_POOL 为线程池配置在配置中心的key
        String config = duccConfigService.getString(DuccConstant.GOBRS_ASYNC_THREAD_POOL);
        ThreadPool threadPool = JSONObject.parseObject(config, ThreadPool.class);
         
        // 通过gobrs-async 提供的构造器进行构造线程池
        ThreadPoolExecutor executor = ThreadPoolBuilder.buildByThreadPool(threadPool);
        factory.setDefaultThreadPoolExecutor(executor); // 设置默认线程池
        //     factory.setThreadPoolExecutor("ruleName",threadPoolExecutor);  // 设置规则隔离线程池
        
        listenerDucc();
    }
    
    // 监听配置中心 线程池改动
    private void listenerDucc() {
        duccConfigService.addListener(new DuccListener(DuccConstant.GOBRS_ASYNC_THREAD_POOL, property -> {
            log.warn("监听到DUCC配置GobrsAsync 线程池配置变更,property:{}", JSON.toJSONString(property.getValue()));
            ThreadPool threadPool = JSONObject.parseObject(property.getValue().toString(), ThreadPool.class);
            ThreadPoolExecutor executor = ThreadPoolBuilder.buildByThreadPool(threadPool);
            factory.setThreadPoolExecutor(executor);
            // 线程池更新成功
            log.warn("GobrsAsync thread pool update success");
        }));
    }

}

配置优先级

实时更新配置 > API配置 > (yml、yaml、properties) 配置

插件机制

Gobrs-Async 使用 SPI机制建设插件体系,使用者只需要引入 SDK提供的插件依赖即可完成插件介入。目前 所支持的插件包括以下两种。后续会持续更新!!!

监控系列

skywalking 插件

skywalking 是全链路监控平台,因为skywalking 不兼容多线程traceId,所以Gobrs-Async 提供skywalking插件

pom.xml

<dependency>
    <groupId>io.github.memorydoc</groupId>
    <artifactId>gobrs-async-skywalking-plugin</artifactId>
    <version>1.2.9-RELEASE</version>
</dependency>

只需引入依赖即可完成与skywalking完美适配。是不是感觉很神奇!

日志系列

全链路traceId插件

各位开发同学已经都知道,全链路traceId是打印在日志里的方便链路追踪的序列号。 有了它你可以轻松追踪线上问题,简单好用。

pom.xml

<dependency>
    <groupId>io.github.memorydoc</groupId>
    <artifactId>gobrs-async-trace-plugin</artifactId>
    <version>1.2.9-RELEASE</version>
</dependency>

静态注入

需要在SpringBoot启动类中编写

static {
    GobrsLogger.logger();
}

总结

Gobrs-Async 提供高性能多线程管理多线程并发编排的功能,后续会持续推出多线程任务可视化管理(线程池监控、日志分析、动态任务等),小伙伴们如果想在多线程编程的世界里畅游,请尽快关注Gobrs-Async 并且给项目一个小❤️记得star哦~

您的star是我最大的开源动力!!!

友情链接

更多关于 快速接入性能压测 的报告请访问官方查看。

官网地址

Gitee

GitHub