Java应用【二】Java 并发编程与任务调度详解

时间:2023-02-23 12:03:44

第一部分:并发编程基础

在讨论并发编程之前,首先需要理解一些基本概念和原理。

什么是线程

线程是计算机操作系统中的基本执行单位之一。它是进程中的一部分,是操作系统调度的基本单位。一个进程可以包含多个线程,每个线程执行自己的代码,但是它们共享进程的内存和其他资源。

线程通常比进程更轻量级,可以更快地创建和销毁,并且可以更有效地利用系统资源。不同线程之间可以并发地执行,从而使得多个任务可以同时进行。线程可以在单个 CPU 上运行,也可以在多个 CPU 上并发地执行,从而实现并行计算。

线程是并发编程中重要的概念,通常用于创建多任务应用程序。例如,在一个 Web 服务器中,每个请求可以被分配给一个单独的线程进行处理,以便服务器可以同时处理多个请求。

在 Java 中,线程是通过 Thread 类来实现的。线程可以并行运行,也可以协作运行,共同完成一项任务。

什么是并发

在 Java 中,并发是指在一个时间段内多个线程在同一个程序中执行。当多个线程同时执行时,它们可能会共享一些资源(如内存或文件),这可能导致数据竞争和其他问题。Java 提供了多种机制来协调和控制多个线程之间的执行,以确保它们可以正确地访问共享资源。

什么是同步

同步是一种机制,它用于确保多个线程不会同时访问共享资源。在 Java 中,可以使用 synchronized 关键字将代码块标记为同步代码块,从而确保在任何时候只有一个线程可以访问该代码块。synchronized 关键字可以用于实例方法、静态方法和代码块。

什么是互斥

互斥是一种机制,它用于确保在任何时候只有一个线程可以访问共享资源。在 Java 中,可以使用锁(Lock)对象来实现互斥。Lock 对象可以保证只有一个线程可以持有锁,并且只有在持有锁的线程释放锁之后,其他线程才能获取锁并访问共享资源。与 synchronized 关键字不同,Lock 对象可以提供更高级的功能,如可重入锁和公平锁。

第二部分:JDK原生线程和任务调度

Java 提供了丰富的并发编程 API,包括线程、锁、信号量、阻塞队列等。本部分将深入介绍 JDK 原生的线程和任务调度。

创建线程

在 Java 中,创建线程有两种方式,一种是继承 Thread 类,另一种是实现 Runnable 接口。

public class MyThread extends Thread {
public void run() {
// 线程执行的代码
}
}

public class MyRunnable implements Runnable {
public void run() {
// 线程执行的代码
}
}

// 创建线程并启动
MyThread thread = new MyThread();
thread.start();

MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable);
thread.start();

线程状态

在 Java 中,线程有多种状态,包括新建、运行、阻塞、等待、计时等待和终止等状态。

  • NEW:新建状态,线程对象已经创建,但是还没有调用 start() 方法启动线程。
  • RUNNABLE:运行状态,线程正在运行或者等待 CPU 时间片。
  • BLOCKED:阻塞状态,线程正在等待获取一个排他锁,或者等待 I/O 操作完成。
  • WAITING:等待状态,线程正在等待其他线程的通知,处于无限期等待状态。
  • TIMED_WAITING:计时等待状态,线程正在等待其他线程的通知,但是最多等待一段时间。
  • TERMINATED:终止状态,线程已经执行完成或者被中断。

线程状态图:

+----------------+     +----------------+     +-----------------+
| NEW | --> | RUNNABLE | --> | BLOCKED |
| | | | | |
| | | | | waiting for |
| | | | | monitor lock |
+----------------+ +----------------+ +-----------------+
| | |
| | |
| | |
| | |
V V V
+----------------+ +----------------+ +-----------------+
| TERMINATED | <-- | TIMED_WAITING | <-- | WAITING |
| | | | | |
| | | | | waiting for |
| | | | | another thread |
| | | | | or I/O to finish|
+----------------+ +----------------+ +-----------------+

线程同步

线程同步是保证多个线程之间共享资源的正确访问顺序的一种机制。Java 提供了多种方式实现线程同步,包括 synchronized、Lock、Semaphore 等。

synchronized

synchronized 是 Java 中最常用的线程同步机制,它可以保证同一时间只有一个线程访问共享资源。synchronized 可以用在方法或代码块中,它通过获取对象的锁来实现同步。

public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized void decrement() {
count--;
}
}

上面的代码中,increment() 和 decrement() 方法都是同步的,它们会获取 Counter 对象的锁,然后执行 count++ 或者 count-- 操作。这样可以保证多个线程对 count 变量的访问是安全的。

Lock

Lock 是 Java 中另一种常用的线程同步机制,它提供了更多的功能和灵活性。Lock 可以用来实现同步,也可以用来实现等待/通知机制。与 synchronized 不同,Lock 不是 Java 语言层面的关键字,而是一个 Java 类。Lock 提供了多种方法实现线程同步,包括 lock()、tryLock()、lockInterruptibly()、newCondition() 等。

public class Counter {
private int count = 0;
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
count--;
} finally {
lock.unlock();
}
}
}

上面的代码中,increment() 和 decrement() 方法都是使用 Lock 实现的。它们会获取一个锁对象,然后执行 count++ 或者 count-- 操作。与 synchronized 不同,使用 Lock 需要显式地获取锁和释放锁。

Semaphore

Semaphore 是 Java 中一种常用的同步工具,它可以控制同一时间有多少个线程访问共享资源。Semaphore 提供了两个常用的方法 acquire() 和 release(),用来获取和释放许可。

下面是一个使用 Semaphore 的简单示例,其中有一个共享资源,多个线程需要访问该资源。但是,每次只允许两个线程同时访问该资源。

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
// 定义 Semaphore,初始许可数为 2
private static final Semaphore sem = new Semaphore(2);

public static void main(String[] args) {
// 创建 5 个线程
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
// 尝试获取许可
sem.acquire();
System.out.println("Thread " + Thread.currentThread().getName() + " acquired permit.");
// 模拟线程执行一段时间
Thread.sleep(2000);
System.out.println("Thread " + Thread.currentThread().getName() + " releasing permit.");
// 释放许可
sem.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}

在上面的示例中,Semaphore 的初始许可数为 2,表示最多只有两个线程可以同时访问共享资源。每个线程首先调用 acquire() 方法尝试获取许可,如果没有许可可用,线程将阻塞直到许可可用为止。一旦线程获取到许可,它将执行一段模拟工作的代码,然后释放许可。当线程释放许可时,另一个线程就有机会获取许可并继续执行。由于 Semaphore 限制了许可的数量,因此只有两个线程可以同时访问共享资源。


线程池

线程池是一种常用的线程管理方式,它可以避免线程频繁创建和销毁的开销,同时可以控制线程的数量,保证系统的稳定性和效率。Java 提供了 ThreadPoolExecutor 类来实现线程池功能。

public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable task = new Task();
executor.execute(task);
}
executor.shutdown();
}
}

上面的代码中,创建了一个大小为 5 的线程池,并提交了 10 个任务。每个任务都是一个实现了 Runnable 接口的类,执行 executor.execute(task) 方法后,线程池会自动调度线程执行任务。执行 executor.shutdown() 方法后,线程池会关闭并等待所有任务执行完成。

任务调度

任务调度是指在特定的时间或者条件下执行指定的任务。Java 提供了多种任务调度方式,包括 Timer、ScheduledExecutorService 等。

Timer

Timer 是 Java 中自带的一个任务调度器,它可以在指定的时间执行指定的任务。Timer 提供了 schedule() 和 scheduleAtFixedRate() 两种方法,用来执行一次性任务和循环任务。

public class TimerDemo {
public static void main(String[] args) {
TimerTask task = new TimerTask() {
public void run() {
System.out.println("task is running");
}
};
Timer timer = new Timer();
timer.schedule(task, 5000);
}
}

上面的代码中,创建了一个 Timer 对象和一个 TimerTask 对象,然后调用 timer.schedule(task, 5000) 方法,在 5 秒后执行任务。Timer 还提供了其他方法,比如 scheduleAtFixedRate() 用来执行循环任务。

ScheduledExecutorService

ScheduledExecutorService 是 Java 中推荐使用的任务调度器,它提供了比 Timer 更灵活和可靠的任务调度方式。ScheduledExecutorService 提供了 schedule() 和 scheduleAtFixedRate() 两种方法,用来执行一次性任务和循环任务。

public class ScheduledExecutorDemo {
public static void main(String[] args) {
Runnable task = new Task();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(task, 5, TimeUnit.SECONDS);
executor.shutdown();
}
}

上面的代码中,创建了一个 ScheduledExecutorService 对象和一个任务对象,然后调用 executor.schedule(task, 5, TimeUnit.SECONDS) 方法,在 5 秒后执行任务。ScheduledExecutorService 还提供了其他方法,比如 scheduleWithFixedDelay() 用来执行循环任务。

使用三方类库

除了 JDK 自带的线程和任务调度功能,Java 还有很多优秀的第三方类库可以用来实现并发编程和任务调度。常见的三方类库有:

  • Apache Commons Lang
  • Guava
  • Quartz
  • Spring Task

这里以 Quartz 和 Spring Task 为例,介绍如何使用这两个类库实现并发编程和任务调度。

Quartz

Quartz 是一个开源的任务调度框架,它提供了更丰富和灵活的任务调度功能,支持分布式任务调度和集群任务调度。Quartz 可以用来执行一次性任务和循环任务,也可以执行复杂的任务,比如发送邮件、备份数据等。

Quartz 的特点包括:

  1. 灵活性高:Quartz 可以根据各种触发器类型(如简单触发器、cron 触发器等)来调度任务,也可以配置任务执行的优先级、执行次数和时间等属性。
  2. 高可靠性:Quartz 提供了错误处理和恢复机制,可以在任务执行过程中出现故障时自动进行处理和恢复。
  3. 分布式和集群支持:Quartz 支持分布式任务调度和集群任务调度,可以部署在多台机器上,从而提高任务的可靠性和扩展性。

Quartz 调度定时任务

下面是一个使用 Quartz 进行任务调度的 Java 示例,它使用 Quartz 调度一个定时任务,每隔 10 秒钟输出一句话:

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Date;

public class QuartzDemo implements Job {
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("Hello Quartz! " + new Date());
}

public static void main(String[] args) throws SchedulerException, InterruptedException {
// 创建 Scheduler 工厂
StdSchedulerFactory factory = new StdSchedulerFactory();
// 创建 Scheduler
Scheduler scheduler = factory.getScheduler();
// 创建 JobDetail
JobDetail jobDetail = JobBuilder.newJob(QuartzDemo.class).build();
// 创建触发器
Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
.build();
// 将任务和触发器加入 Scheduler
scheduler.scheduleJob(jobDetail, trigger);
// 启动 Scheduler
scheduler.start();
// 让程序休眠 60 秒
Thread.sleep(60000);
// 停止 Scheduler
scheduler.shutdown();
}
}

 在上面的示例中,我们首先创建了一个 Job 类(QuartzDemo),它实现了 Quartz 的 Job 接口,并重写了 execute() 方法,这个方法中定义了需要执行的任务。

然后,我们使用 Quartz 的 SchedulerFactory 创建了一个 Scheduler,并创建了一个 JobDetail 对象和一个 Trigger 对象。JobDetail 定义了需要执行的 Job,而 Trigger 定义了 Job 的触发条件。

最后,我们将 JobDetail 和 Trigger 加入 Scheduler 中,并启动 Scheduler。程序休眠 60 秒钟后,我们停止了 Scheduler。在程序执行过程中,每隔 10 秒钟就会执行一次 Job 中定义的任务。

使用Quartz的Cron表达式

除了定时任务之外,Quartz 还提供了其他类型的任务调度,例如:

  1. Cron 表达式触发器:使用类似于 Linux 的 Cron 表达式来定义任务执行时间,可以灵活地定义任务的执行时间和频率。
  2. 日历触发器:使用日历来定义任务的执行时间,可以通过排除某些特定的时间来调度任务。
  3. 监听器:Quartz 提供了多种监听器,可以在任务执行前后或触发器触发前后执行特定的逻辑。
  4. 集群任务调度:Quartz 支持多个 Scheduler 实例之间的集群任务调度,从而提高任务的可靠性和扩展性。
  5. 分布式任务调度:Quartz 还支持分布式任务调度,可以将任务调度到远程的节点执行。

下面是一个使用 Quartz 的 Cron 表达式触发器进行任务调度的 Java 示例,它使用 Quartz 调度一个定时任务,每天上午 9 点钟执行一次:

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Date;

public class QuartzDemo implements Job {
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("Hello Quartz! " + new Date());
}

public static void main(String[] args) throws SchedulerException, InterruptedException {
// 创建 Scheduler 工厂
StdSchedulerFactory factory = new StdSchedulerFactory();
// 创建 Scheduler
Scheduler scheduler = factory.getScheduler();
// 创建 JobDetail
JobDetail jobDetail = JobBuilder.newJob(QuartzDemo.class).build();
// 创建触发器
Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 9 * * ?"))
.build();
// 将任务和触发器加入 Scheduler
scheduler.scheduleJob(jobDetail, trigger);
// 启动 Scheduler
scheduler.start();
// 让程序休眠 60 秒
Thread.sleep(60000);
// 停止 Scheduler
scheduler.shutdown();
}
}

在上面的示例中,我们使用 Quartz 的 CronScheduleBuilder 创建了一个 Cron 表达式触发器,这个触发器每天上午 9 点钟触发一次任务。

细说Quartz的Cron表达式

Quartz 的 Cron 表达式是一种用于定义任务调度时间的表达式,它使用类似于 Linux 的 Cron 语法来定义任务执行的时间和频率。Cron 表达式的语法非常复杂,但是掌握一些基本的语法规则就足够了。下面是一个简单的 Cron 表达式示例:

0 0 0/1 * * ?   // 每小时执行一次

上面的 Cron 表达式由 6 个字段组成,分别代表秒、分、时、日、月和星期。下面是每个字段的语法规则:

  1. 秒(Seconds):0~59,可选。
  2. 分钟(Minutes):0~59,必须指定。
  3. 小时(Hours):0~23,必须指定。
  4. 日(Day of month):1~31,必须指定。
  5. 月(Month):112 或 JANDEC,必须指定其中之一。
  6. 星期(Day of week):07 或 SUNSAT(0 和 7 都表示周日),必须指定其中之一。

其中,月和星期两个字段是互斥的,也就是说,一个任务只能在指定的月份或指定的星期中执行。

除了基本的语法规则之外,Cron 表达式还支持一些特殊符号和关键词,用于表示一些常用的时间段和逻辑条件。下面是一些常用的特殊符号和关键词:

  1. 星号(*):表示任意值,可以用在任何字段中。
  2. 问号(?):表示不指定值,可以用在日和星期字段中。
  3. 斜线(/):用于指定增量,例如,*/5 表示每 5 个单位执行一次。
  4. 逗号(,):用于指定多个值,例如,1,2,3 表示 1、2、3 三个值都可以执行。
  5. 连字符(-):用于指定范围,例如,10-15 表示 10 到 15 的范围内的值都可以执行。

下面是一些示例 Cron 表达式:

  • ​0 0 0/1 * * ?​​:每小时执行一次。
  • ​0 0 12 ? * MON-FRI​​:每个工作日中午 12 点执行一次。
  • ​0 0 1 ? * SAT#3​​:每个月的第三个星期六凌晨 1 点执行一次。

需要注意的是,Cron 表达式的语法非常复杂,需要根据具体需求来编写,建议参考官方文档进行学习和使用。


Spring Task

Spring Task 是 Spring Framework 中的一个基于注解的任务调度框架,它提供了简单的任务调度功能,支持一次性任务和循环任务。Spring Task 的主要组成部分是任务调度器(TaskScheduler)和任务执行器(TaskExecutor),其中任务调度器负责管理任务的调度,而任务执行器负责执行任务。

Spring Task 的核心特性包括:

  1. 支持定时任务和周期性任务:Spring Task 支持多种任务触发方式,包括固定延迟、固定速率和 Cron 表达式等。
  2. 支持异步执行:Spring Task 支持在单独的线程中执行任务,不会阻塞主线程。
  3. 支持任务过滤:Spring Task 支持基于任务名称、组名和 Cron 表达式等方式进行任务过滤。

固定延迟任务、固定速率任务和 Cron 表达式任务

下面是一个简单的 Spring Task 示例代码:

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MyTask {

@Scheduled(fixedDelay = 1000)
public void myFixedDelayTask() {
System.out.println("Fixed delay task - " + System.currentTimeMillis());
}

@Scheduled(fixedRate = 2000)
public void myFixedRateTask() {
System.out.println("Fixed rate task - " + System.currentTimeMillis());
}

@Scheduled(cron = "0/5 * * * * ?")
public void myCronTask() {
System.out.println("Cron task - " + System.currentTimeMillis());
}
}

上面的代码定义了一个名为 MyTask 的 Spring Task,其中包含三个方法,分别用于定义固定延迟任务、固定速率任务和 Cron 表达式任务。这些方法都使用了 Spring Task 提供的 @Scheduled 注解,用于指定任务的触发方式。

其中,@Scheduled 注解中的 fixedDelay 和 fixedRate 属性分别用于指定固定延迟和固定速率任务的触发间隔,单位为毫秒。而 cron 属性则用于指定 Cron 表达式任务的触发时间。在上面的示例中,固定延迟任务和固定速率任务分别会每隔 1 秒和 2 秒执行一次,而 Cron 表达式任务会每隔 5 秒执行一次。

需要注意的是,为了启用 Spring Task 功能,需要在 Spring Boot 应用程序的主类上添加 @EnableScheduling 注解。这个注解可以在应用程序启动时自动启用 Spring Task。


自定义任务调度器和任务执行器

我们还可以使用 Spring Task 提供的 TaskScheduler 和 TaskExecutor 接口来自定义任务调度器和任务执行器。这样可以更灵活地管理任务调度和任务执行。下面是一个使用 TaskScheduler 和 TaskExecutor 接口自定义任务调度器和任务执行器的示例代码:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;

@Configuration
public class MyTaskConfig {

@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("my-task-scheduler-");
return scheduler;
}

@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(5);
return executor;
}

@Bean
public MyTask myTask() {
return new MyTask();
}

public class MyTask {
public void runTask() {
System.out.println("Task is running - " + System.currentTimeMillis());
}
}

public void scheduleTask() {
TaskScheduler scheduler = taskScheduler();
SimpleAsyncTaskExecutor executor = taskExecutor();
MyTask task = myTask();
scheduler.schedule(() -> executor.execute(task::runTask), new CronTrigger("0/5 * * * * ?"));
}
}

在上面的示例代码中,我们首先定义了一个 TaskScheduler 和一个 TaskExecutor,分别用于自定义任务调度器和任务执行器。其中,TaskScheduler 使用了 ThreadPoolTaskScheduler 实现,可以配置线程池大小和线程名前缀等参数;而 TaskExecutor 使用了 SimpleAsyncTaskExecutor 实现,可以配置并发限制等参数。

接下来,我们定义了一个 MyTask 类,其中包含一个 runTask() 方法,用于执行具体的任务逻辑。然后,我们通过 @Bean 注解将 MyTask 类注册为 Spring Bean。

最后,我们定义了一个 scheduleTask() 方法,用于启动任务调度。这个方法使用 TaskScheduler 和 CronTrigger 来定义任务调度的触发方式,使用 TaskExecutor 来执行任务。在这个示例中,我们定义了一个 Cron 表达式,用于每隔 5 秒触发一次任务调度。任务调度被触发时,会调用 MyTask 的 runTask() 方法来执行具体的任务逻辑。

需要注意的是,为了启用自定义的任务调度器和任务执行器,我们需要将 MyTaskConfig 类注册为 Spring Bean,并在应用程序启动时通过 @PostConstruct 注解来调用 scheduleTask() 方法来启动任务调度。


结语

本文介绍了如何在 Java 中使用线程进行并发编程和任务调度。使用线程可以提高程序的并发处理能力,使用任务调度可以提高程序的自动化处理能力。本文介绍了 JDK 原生的线程和任务调度功能,以及常用的三方类库 Quartz 和 Spring Task,希望能够对读者有所帮助。

在使用线程和任务调度的过程中,需要注意线程安全和任务重复执行等问题,尤其是在多线程环境下,需要加锁保证线程安全。同时还需要注意内存泄漏和资源消耗等问题,尽量避免在程序中出现死循环和大量创建线程的情况,以免导致系统崩溃。

线程和任务调度是 Java 中非常重要的功能,需要仔细学习和实践,才能掌握其核心技术和优化技巧,提高程序的性能和可靠性。