Java并发编程,除了被用于各种Web应用、分布式系统和大数据系统,构成高并发系统的核心基础外,其本身也蕴含着大量的设计模式思想在里面。这一系列文章主要是结合Java源码,对并发编程中使用到的、实现的各类设计模式做归纳总结,以便进一步沉淀对Java并发设计的理解。
模板设计模式
Thread类中run和start方法,就是一个典型的模板设计模式的实现,即:父类定义算法逻辑代码,子类实现其细节。
public synchronized void start() {
/**
* 线程对象新建后的New状态,其内部thereadStatus属性为0
*/
if (threadStatus != 0)
throw new IllegalThreadStateException(); /* 同时会被添加到一个ThreadGroup */
group.add(this); boolean started = false;
//调用JNI方法start0()来启动线程
try {
start0();
started = true;
} finally {
//线程结束之后,再次启动将抛出异常
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
下面以一个例子演示模板模式:
public class TemplateMethod {
//相当于Thread类的start方法, 用final修饰避免被更改
public final void print(String message) {
System.out.println("-------------");
wrapPrint(message);
System.out.println("-------------");
}
//相当于Thread的run方法, 用protected修饰限于子类重写
protected void wrapPrint(String message) { } public static void main(String[] args) {
//通过匿名内部子类, 重写父类的wrapPrint方法, 从而实现不同的输出模板
TemplateMethod t1 = new TemplateMethod() {
@Override
protected void wrapPrint(String message) {
System.out.println("111" + message + "111");
}
};
t1.print("Hello World!"); TemplateMethod t2 = new TemplateMethod() {
@Override
protected void wrapPrint(String message) {
System.out.println("222" + message + "222");
}
};
t2.print("Hello World!");
}
}
策略模式
创建Java多线程中,实现Runnable接口作为Target并传入Thread类的构造方法来生成线程对象的过程,就体现了GoF中策略模式的设计思想。下面是一个简单的示例:
首先,仿照Runnable接口的思想,定义一个用于处理数据库行的接口
/*
* RowHandler定义了对数据库查询返回结果操作的方法, 具体实现需要
* 实现类完成, 类似于Runnable接口
*/
public interface RowHandler<T> {
T handle(ResultSet rs);
}
然后,仿照Thread方法,定义数据库查询的工作类
public class RecordQuery { private final Connection connection; public RecordQuery(Connection connection) {
this.connection = connection;
}
//方法中传入RowHandler的实现类
public <T> T query(RowHandler<T> handler, String sql, Object... params) throws SQLException {
PreparedStatement stmt;
ResultSet resultSet;
stmt = connection.prepareStatement(sql);
int index = 1;
for (Object param : params) {
stmt.setObject(index++, param);
}
resultSet = stmt.executeQuery();
//调用实现类的handle方法来处理数据
return handler.handle(resultSet);
}
}
生产者-消费者模式
生产者-消费者模式是使用Java并发编程通信所实现的经典模式之一。该模式是通过队列这一数据结构来存储对象元素,由多线程分别充当生产者和消费者,生产者不断生成元素、消费者不断消费元素的过程。下面通过代码来演示:
实现一个带有入队和出队的队列
/*
* 通过一个生产者-消费者队列来说明线程通信的基本使用方法
*/
public class EventQueue {
//定义一个队列元素数量, 一旦赋值则不可更改
private final int max;
//定义一个空的内部类, 代表存储元素
static class Event{ }
//定义一个不可改的链表集合, 作为队列载体
private final LinkedList<Event> eventQueue = new LinkedList<>();
//如果不指定初始容量, 则容量默认为10
private final static int DEFAULT_MAX_EVENT = 10;
//使用自定义容量初始化队列
public EventQueue(int max) {
this.max = max;
}
//如果不指定初始容量, 则容量默认为10
public EventQueue() {
this(DEFAULT_MAX_EVENT);
}
//封装一个输出到控制台的方法
private void console(String message) {
System.out.printf("%s:%s\n",Thread.currentThread().getName(), message);
}
//定义入队方法
public void offer(Event event) {
//使用链表对象作为锁, 通过synchronized代码块实现同步
synchronized(eventQueue) {
//在循环中判断如果队列已满, 则调用锁的wait方法, 使生产者线程阻塞
while(eventQueue.size() >= max) {
try {
console(" the queue is full");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console(" the new event is submitted");
eventQueue.addLast(event);
//唤醒所有等待中的消费者;注意如果此处使用notify(),可能导致线程不安全
this.eventQueue.notifyAll();
}
}
//定义出队方法
public Event take() {
//使用链表对象作为锁
synchronized(eventQueue) {
//在循环中判断如果队列已空, 则调用锁的wait方法, 使消费者线程阻塞
while(eventQueue.isEmpty()) {
try {
console(" the queue is empty.");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeFirst();
//唤醒所有等待中的生产者;注意如果此处使用notify(),可能导致线程不安全
this.eventQueue.notifyAll();
console(" the event " + event + " is handled/taked.");
return event;
}
}
}
验证该队列的类
/*
* producer/client pattern
*/
public class EventClient { public static void main(String[] args) {
//定义不可变队列实例
final EventQueue eventQueue = new EventQueue();
//新建生产者线程, 可以设置多个
new Thread(()->{
while(true) {
eventQueue.offer(new EventQueue.Event());
}
}, "producer").start();
//新建消费者线程, 可以设置多个
new Thread(()->{
while(true) {
eventQueue.take();
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "consumer").start();
}
}