java 注解实现一个可配置线程池的方法示例

时间:2021-09-05 05:12:31

前言

项目需要多线程执行一些task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码:

poolconfig(线程池核心配置参数):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
 * <h1>线程池核心配置(<b style="color:#cd0000">基本线程池数量、最大线程池数量、队列初始容量、线程连接保持活动秒数(默认60s)</b>)</h1>
 *
 * <blockquote><code>
 * <table border="1px" style="border-color:gray;" width="100%"><tbody>
 * <tr><th style="color:green;text-align:left;">
 * 属性名称
 * </th><th style="color:green;text-align:left;">
 * 属性含义
 * </th></tr>
 * <tr><td>
 * queuecapacity
 * </td><td>
 * 基本线程池数量
 * </td></tr>
 * <tr><td>
 * count
 * </td><td>
 * 最大线程池数量
 * </td></tr>
 * <tr><td>
 * maxcount
 * </td><td>
 * 队列初始容量
 * </td></tr>
 * <tr><td>
 * alivesec
 * </td><td>
 * 线程连接保持活动秒数(默认60s)
 * </td></tr>
 * </tbody></table>
 * </code></blockquote>
 
 */
public class poolconfig {
 
 private int queuecapacity = 200;
 
 private int count = 0;
 
 private int maxcount = 0;
 
 private int alivesec;
 
 public int getqueuecapacity() {
 return queuecapacity;
 }
 
 public void setqueuecapacity(int queuecapacity) {
 this.queuecapacity = queuecapacity;
 }
 
 public void setcount(int count) {
 this.count = count;
 }
 
 public void setmaxcount(int maxcount) {
 this.maxcount = maxcount;
 }
 
 public void setalivesec(int alivesec) {
 this.alivesec = alivesec;
 }
 
 public int getcount() {
 return count;
 }
 
 public int getmaxcount() {
 return maxcount;
 }
 
 public int getalivesec() {
 return alivesec;
 }
}

threadpoolconfig(线程池配置 yml配置项以thread开头):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.arraylist;
import java.util.hashmap;
import java.util.list;
import java.util.map;
 
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.stereotype.component;
 
/**
 * <h1>线程池配置(<b style="color:#cd0000">线程池核心配置、各个业务处理的任务数量</b>)</h1>
 *
 * <blockquote><code>
 * <table border="1px" style="border-color:gray;" width="100%"><tbody>
 * <tr><th style="color:green;text-align:left;">
 * 属性名称
 * </th><th style="color:green;text-align:left;">
 * 属性含义
 * </th></tr>
 * <tr><td>
 * pool
 * </td><td>
 * 线程池核心配置
 * 【{@link poolconfig}】
 * </td></tr>
 * <tr><td>
 * count
 * </td><td>
 * 线程池各个业务任务初始的任务数
 * </td></tr>
 * </tbody></table>
 * </code></blockquote>
 
 */
@component
@configurationproperties(prefix="thread")
public class threadpoolconfig {
 
 private poolconfig pool = new poolconfig();
 
 map<string, integer> count = new hashmap<>();
 
 public poolconfig getpool() {
 return pool;
 }
 
 public void setpool(poolconfig pool) {
 this.pool = pool;
 }
 
 public map<string, integer> getcount() {
 return count;
 }
 
}

定义task注解,方便使用:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@target(elementtype.type)
@retention(retentionpolicy.runtime)
@documented
@component
public @interface excutortask {
 
 /**
 * the value may indicate a suggestion for a logical excutortask name,
 * to be turned into a spring bean in case of an autodetected excutortask .
 * @return the suggested excutortask name, if any
 */
 string value() default "";
 
}

通过反射获取使用task注解的任务集合:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class beans {
 
 private static final char prefix = '.';
 
 public static concurrentmap<string, string> scanbeanclassnames(){
 concurrentmap<string, string> beanclassnames = new concurrenthashmap<>();
 classpathscanningcandidatecomponentprovider provider = new classpathscanningcandidatecomponentprovider(false);
   provider.addincludefilter(new annotationtypefilter(excutortask.class));
   for(package pkg : package.getpackages()){
   string basepackage = pkg.getname();
     set<beandefinition> components = provider.findcandidatecomponents(basepackage);
     for (beandefinition component : components) {
     string beanclassname = component.getbeanclassname();
     try {
    class<?> clazz = class.forname(component.getbeanclassname());
    boolean isannotationpresent = clazz.isannotationpresent(zimatask.class);
    if(isannotationpresent){
     zimatask task = clazz.getannotation(excutortask.class);
     string aliasname = task.value();
     if(aliasname != null && !"".equals(aliasname)){
     beanclassnames.put(aliasname, component.getbeanclassname());
     }
    }
    } catch (classnotfoundexception e) {
    e.printstacktrace();
    }
     beanclassnames.put(beanclassname.substring(beanclassname.lastindexof(prefix) + 1), component.getbeanclassname());
     }
   }
   return beanclassnames;
  }
}

 线程执行类taskpool:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@component
public class taskpool {
 
 public threadpooltaskexecutor pooltaskexecutor;
 
 @autowired
 private threadpoolconfig threadpoolconfig;
 
 @autowired
 private applicationcontext context;
 
 private final integer max_pool_size = 2000;
 
 private poolconfig poolcfg;
 
 private map<string, integer> taskscount;
 
 private concurrentmap<string, string> beanclassnames;
 
 @postconstruct
  public void init() {
 
 beanclassnames = beans.scanbeanclassnames();
   
   pooltaskexecutor = new threadpooltaskexecutor();
   
   poolcfg = threadpoolconfig.getpool();
 
 taskscount = threadpoolconfig.getcount();
 
 int corepoolsize = poolcfg.getcount(),
  maxpoolsize = poolcfg.getmaxcount(),
  queuecapacity = poolcfg.getqueuecapacity(),
  minpoolsize = 0, maxcount = (corepoolsize << 1);
 
 for(string taskname : taskscount.keyset()){
  minpoolsize += taskscount.get(taskname);
 }
 
 if(corepoolsize > 0){
  if(corepoolsize <= minpoolsize){
  corepoolsize = minpoolsize;
  }
 }else{
  corepoolsize = minpoolsize;
 }
 
 if(queuecapacity > 0){
  pooltaskexecutor.setqueuecapacity(queuecapacity);
 }
 
 if(corepoolsize > 0){
  if(max_pool_size < corepoolsize){
  corepoolsize = max_pool_size;
  }
  pooltaskexecutor.setcorepoolsize(corepoolsize);
 }
 
 if(maxpoolsize > 0){
  if(maxpoolsize <= maxcount){
  maxpoolsize = maxcount;
  }
  if(max_pool_size < maxpoolsize){
  maxpoolsize = max_pool_size;
  }
  pooltaskexecutor.setmaxpoolsize(maxpoolsize);
 }
 
 if(poolcfg.getalivesec() > 0){
  pooltaskexecutor.setkeepaliveseconds(poolcfg.getalivesec());
 }
 
 pooltaskexecutor.initialize();
  }
  
 public void execute(class<?>... clazz){
 int i = 0, len = taskscount.size();
 for(; i < len; i++){
  integer taskcount = taskscount.get(i);
  for(int t = 0; t < taskcount; t++){
  try{
   object taskobj = context.getbean(clazz[i]);
   if(taskobj != null){
   pooltaskexecutor.execute((runnable) taskobj);
   }
  }catch(exception ex){
   ex.printstacktrace();
  }
  }
 }
  }
  
 public void execute(string... args){
   int i = 0, len = taskscount.size();
 for(; i < len; i++){
  integer taskcount = taskscount.get(i);
  for(int t = 0; t < taskcount; t++){
  try{
   object taskobj = null;
   if(context.containsbean(args[i])){
   taskobj = context.getbean(args[i]);
   }else{
   if(beanclassnames.containskey(args[i].tolowercase())){
    class<?> clazz = class.forname(beanclassnames.get(args[i].tolowercase()));
    taskobj = context.getbean(clazz);
   }
   }
   if(taskobj != null){
   pooltaskexecutor.execute((runnable) taskobj);
   }
  }catch(exception ex){
   ex.printstacktrace();
  }
  }
 }
  }
 
 public void execute(){
 for(string taskname : taskscount.keyset()){
  integer taskcount = taskscount.get(taskname);
  for(int t = 0; t < taskcount; t++){
  try{
   object taskobj = null;
   if(context.containsbean(taskname)){
   taskobj = context.getbean(taskname);
   }else{
   if(beanclassnames.containskey(taskname)){
    class<?> clazz = class.forname(beanclassnames.get(taskname));
    taskobj = context.getbean(clazz);
   }
   }
   if(taskobj != null){
   pooltaskexecutor.execute((runnable) taskobj);
   }
  }catch(exception ex){
   ex.printstacktrace();
  }
  }
 }
  }
  
}

如何使用?(做事就要做全套 ^_^)

1.因为使用的springboot项目,需要在application.properties 或者 application.yml 添加

?
1
2
3
4
5
#配置执行的task线程数
thread.count.needexcutortask=4
#最大存活时间
thread.pool.alivesec=300000
#其他配置同理

2.将我们写的线程配置进行装载到我们的项目中

?
1
2
3
4
5
6
7
8
9
10
11
@configuration
public class taskmanager {
 
 @resource
 private taskpool taskpool;
 
 @postconstruct
 public void executor(){
 taskpool.execute();
 }
}

3.具体使用

?
1
2
3
4
5
6
7
8
@excutortask
public class needexcutortask implements runnable{
  @override
 public void run() {
    thread.sleep(1000l);
    log.info("====== 任务执行 =====")
  }
}

以上就是创建一个可扩展的线程池相关的配置(望指教~~~)。希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/u011663149/article/details/86497456