Java并发编程-------Executor框架、使用线程池的好处、创建线程池的四种方式

时间:2022-02-13 14:51:49

以下内容转自:https://www.cnblogs.com/vhua/p/5277694.html;https://www.cnblogs.com/Steven0805/p/6393443.html

 

Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建是非常消耗资源的,因此java提供了线程池。在jdk1.5以前的版本中,线程池的使用是极其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员并发并发程序以及解决并发问题很大的帮助。

线程池

  对于数据库连接,我们经常听到数据库连接池这个概念。因为建立数据库连接时非常耗时的一个操作,其中涉及到网络IO的一些操作。因此就想出把连接通过一个连接池来管理。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是正在意义上的关闭,只是把连接放回到我们的池里,供其他人在使用。所以对于线程,也有了线程池这个概念,其中的原理和数据库连接池是差不多的,因此java jdk中也提供了线程池的功能。

  线程池的作用:线程池就是限制系统中使用线程的数量以及更好的使用线程。

  根据系统的运行情况,可以自动或手动设置线程数量,打到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完,就从队列中取一个新任务执行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么将任务加入到等待队列中。

 

new Thread的弊端:

  • 每次线程的频繁创建非常消耗系统资源;
  • 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或者oom;
  • 缺乏更多功能,如定时执行、定期执行、线程终端

使用线程池的好处:

  1.重用存在的线程,减少对象创建、消亡的开销,性能佳;

  2.可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞;

  3.提供定时执行、定期指定、单线程、并发数控制等功能

 

Java里面线程池的*接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。Executor类提了一系列工厂方法用于创建线程池,返回的线程池都实现ExecutorService接口。

 

Executor接口的API文档介绍:

Executor
子接口
|-----ExcutorService
|-----ScheduledExecutorService
已知实现子类
|______AbstractExecutorService
|______ScheduledThreadPoolExecutor
|______ThreadPoolExecutor

 

Executor接口,用于执行已提交的Runnable任务的对象,此接口提供一种将任务提交与每个任务如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用Executor而不是显式地创建线程。例如,可能会使用以下方法:

Executor executor = anExecutor;
 executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...

 

而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start()

 

Executor接口并没有严格地要求执行是同步或是异步的:

同步:执行程序可以在调用者的线程中立即运行已提交的任务:

 class DirectExecutor implements Executor {
     public void execute(Runnable r) { r.run(); } }

 

异步:任务是在某个不是调用者线程的线程中执行的,例如以下执行程序将为每个任务生成一个新线程。

 class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) { new Thread(r).start(); } }

 

以下执行程序使任务提交与第二个执行程序保持连续,这是一个复合执行程序:

import java.util.ArrayDeque;
import java.util.Queue; import java.util.concurrent.Executor; class SerialExecutor implements Executor { final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() {//将指定的元素插入到此队列(如果立即可行且不会违反容量限制),当使用由容量限制 public void run() { //的队列时,此方法通常要优于add(E),add(E)可能无法插入元素,而只是抛出一个异常 try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) {//poll()获取并移除此队列的头,如果此队列为空,则返回null  executor.execute(active); } } }
 

 

一、Java通过Executors工具创建四种线程池,分别为:

  • newCachedThreadPool------ 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程;

 

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

//调用ThreadPoolExecutor()的构造方法为:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

 

  • newFixedThreadPool---------创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待;

 

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

 

  • newScheduledThreadPool--------创建一个定长线程池,支持定时及周期性任务执行;

 

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

//调用的构造方法分别为:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

 

  • newSingleThreadExecutor----------创建一个单线程化对的线程池,它只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序(FIFO,LIFO,优先级)执行。
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }


可见四种构造方法底层调用的都是:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

 

 

 (1)newCachedThreadPool示例

 

package Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            try {
                Thread.sleep(index * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                public void run() {
                    System.out.println(index);
                    System.out.println("ThreadName-----"+Thread.currentThread().getName());
                }
            });
        }
    }
}

 

输出为:

0
ThreadName-----pool-1-thread-1
1
ThreadName-----pool-1-thread-1
2
ThreadName-----pool-1-thread-1
3
ThreadName-----pool-1-thread-1
4
ThreadName-----pool-1-thread-1
5
ThreadName-----pool-1-thread-1
6
ThreadName-----pool-1-thread-1
7
ThreadName-----pool-1-thread-1
8
ThreadName-----pool-1-thread-1
9
ThreadName-----pool-1-thread-1

 

可见全程只有1个线程在进行打印:线程池为无限大,当执行第二个任务时第一个任务已经完成,因此会复用执行第一个任务的线程,而不会每次新建线程。

 

(2)newFixedThreadPool示例

package Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
    public static void main(String[] args){
        ExecutorService executorService= Executors.newFixedThreadPool(3);
        for(int i=0;i<10;i++){
            int index=i;
            try{
                Thread.sleep(100*i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("ThreadName----"+Thread.currentThread().getName());
                    System.out.println("index--------"+index);
                }
            });
        }
    }
}

输出为:

ThreadName----pool-1-thread-1
index--------0
ThreadName----pool-1-thread-2
index--------1
ThreadName----pool-1-thread-3
index--------2
ThreadName----pool-1-thread-1
index--------3
ThreadName----pool-1-thread-2
index--------4
ThreadName----pool-1-thread-3
index--------5
ThreadName----pool-1-thread-1
index--------6
ThreadName----pool-1-thread-2
index--------7
ThreadName----pool-1-thread-3
index--------8
ThreadName----pool-1-thread-1
index--------9

 

(3)newScheduledThreadPool示例:

      1.定时执行示例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
    public static void main(String[] args){
        ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("delay 3 seconds");
            }
        },3, TimeUnit.SECONDS);
    }
}
ScheduledExecutorService的schedule方法为:
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

输出为:

会延迟3秒输出:

delay 3 seconds


(2)定期执行的示例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
    public static void main(String[] args){
        ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(5);
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("delay 3 seconds");
            }
        },3,4,TimeUnit.SECONDS);
    }
}
 

调用方法为:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

输出为延迟3秒后每4秒输出一次


(4)
newSingleThreadExecutor示例:
package Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ScheduledThreadPool {
    public static void main(String[] args){
        ExecutorService singleThreadPool= Executors.newSingleThreadExecutor();
        for(int i=0;i<10;i++){
            int index=i;
            try{
                Thread.sleep(100*i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            singleThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("ThreadName----"+Thread.currentThread().getName());
                    System.out.println("index--------"+index);
                }
            });
        }
    }
}
 

输出为:

ThreadName----pool-1-thread-1
index--------0
ThreadName----pool-1-thread-1
index--------1
ThreadName----pool-1-thread-1
index--------2
ThreadName----pool-1-thread-1
index--------3
ThreadName----pool-1-thread-1
index--------4
ThreadName----pool-1-thread-1
index--------5
ThreadName----pool-1-thread-1
index--------6
ThreadName----pool-1-thread-1
index--------7
ThreadName----pool-1-thread-1
index--------8
ThreadName----pool-1-thread-1
index--------9