一、熟悉Java自带的并发编程集合
在java.util.concurrent包里有很多并发编程的常用工具类。
package com.ietree.basicskill.mutilthread.concurrent; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*; /**
* Created by Root on 5/10/2017.
*/
public class UseQueue {
public static void main(String[] args) throws Exception { /**
* 一个基于链接节点的*线程安全队列,高性能无阻塞*队列:ConcurrentLinkedQueue
* 此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。
* 新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。
* 当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。
*
* 此实现采用了有效的“无等待 (wait-free)”算法.
* 需要小心的是,与大多数 collection 不同,size 方法不是 一个固定时间操作。由于这些队列的异步特性,确定当前元素的数量需要遍历这些元素。
* 内存一致性效果:当存在其他并发 collection 时,将对象放入 ConcurrentLinkedQueue 之前的线程中的操作 happen-before 随后通过另一线程从 ConcurrentLinkedQueue 访问或移除该元素的操作。
*/
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.add("e"); System.out.println(q.poll()); //a 从头部取出元素,并从队列里删除
System.out.println(q.size()); //
System.out.println(q.peek()); //b
System.out.println(q.size()); // /**
* 一个由数组支持的有界阻塞队列
* 这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。
* 一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
* 此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。
* 然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
*/
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
array.add("f");
//System.out.println(array.offer("a", 3, TimeUnit.SECONDS)); /**
* 一个基于已链接节点的、范围任意的 blocking queue。阻塞队列
* 链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
* 可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,
* 则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
*/
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<String>();
lbq.offer("a");
lbq.offer("b");
lbq.offer("c");
lbq.offer("d");
lbq.offer("e");
lbq.add("f");
List<String> list = new ArrayList<String>();
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
System.out.println(lbq.drainTo(list, 3));
//System.out.println(q.size()); for (Iterator iterator = lbq.iterator(); iterator.hasNext(); ) {
String string = (String) iterator.next();
System.out.println(string);
} /**
* SynchronousQueue:一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。
* 同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;
* 除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。
* 队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。
* 对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
*
* 同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,
* 在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
* 对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。
* 但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。
*/
final SynchronousQueue<String> sq = new SynchronousQueue<String>();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(sq.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() { @Override
public void run() {
sq.add("asdasd");
}
});
t2.start(); /**
* 一个基于已链接节点的、任选范围的阻塞双端队列。
* 可选的容量范围构造方法参数是一种防止过度膨胀的方式。如果未指定容量,那么容量将等于 Integer.MAX_VALUE。
* 只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。
* 大多数操作都以固定时间运行(不计阻塞消耗的时间)。
* 异常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 以及批量操作,它们均以线性时间运行。
*/
LinkedBlockingDeque<String> dq = new LinkedBlockingDeque<String>(10);
// 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾。
dq.add("1");
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出 IllegalStateException。
dq.addFirst("a");
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出 IllegalStateException。
dq.addLast("b");
// 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
System.out.println("查看头元素:" + dq.peekFirst());
// 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
System.out.println("获取尾元素:" + dq.pollLast());
Object[] objs = dq.toArray();
for (int i = 0; i < objs.length; i++) {
System.out.println(objs[i]);
} /**
* 一个*阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。
* 虽然此队列逻辑上是*的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。此类不允许使用 null 元素。
* 依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。
* 此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
* 如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。
*
* 在此类上进行的操作不保证具有同等优先级的元素的顺序。如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。
* 例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。
*/
PriorityBlockingQueue<Task> pbq = new PriorityBlockingQueue<Task>(); Task tq1 = new Task();
tq1.setId(3);
tq1.setName("id为3");
Task tq2 = new Task();
tq2.setId(4);
tq2.setName("id为4");
Task tq3 = new Task();
tq3.setId(1);
tq3.setName("id为1"); //return this.id > task.id ? 1 : 0;
pbq.add(tq1); //
pbq.add(tq2); //
pbq.add(tq3); //1 // 1 3 4
System.out.println("容器:" + pbq);
System.out.println(pbq.take().getId());
System.out.println("容器:" + pbq); /**
* 支持获取的完全并发和更新的所期望可调整并发的哈希表。
* 此类遵守与 Hashtable 相同的功能规范,并且包括对应于 Hashtable 的每个方法的方法版本。
* 不过,尽管所有操作都是线程安全的,但获取操作不 必锁定,并且不 支持以某种防止所有访问的方式锁定整个表。
* 此类可以通过程序完全与 Hashtable 进行互操作,这取决于其线程安全,而与其同步细节无关。
*
* 获取操作(包括 get)通常不会受阻塞,因此,可能与更新操作交迭(包括 put 和 remove)。获取会影响最近完成的 更新操作的结果。
* 对于一些聚合操作,比如 putAll 和 clear,并发获取可能只影响某些条目的插入和移除。
* 类似地,在创建迭代器/枚举时或自此之后,Iterators 和 Enumerations 返回在某一时间点上影响哈希表状态的元素。
* 它们不会 抛出 ConcurrentModificationException。不过,迭代器被设计成每次仅由一个线程使用。
*
* 这允许通过可选的 concurrencyLevel 构造方法参数(默认值为 16)来引导更新操作之间的并发,该参数用作内部调整大小的一个提示。
* 表是在内部进行分区的,试图允许指示无争用并发更新的数量。因为哈希表中的位置基本上是随意的,所以实际的并发将各不相同。
* 理想情况下,应该选择一个尽可能多地容纳并发修改该表的线程的值。使用一个比所需要的值高很多的值可能会浪费空间和时间,而使用一个显然低很多的值可能导致线程争用。
* 对数量级估计过高或估计过低通常都会带来非常显著的影响。当仅有一个线程将执行修改操作,而其他所有线程都只是执行读取操作时,才认为某个值是合适的。
* 此外,重新调整此类或其他任何种类哈希表的大小都是一个相对较慢的操作,因此,在可能的时候,提供构造方法中期望表大小的估计值是一个好主意。
*/
ConcurrentHashMap<String, Object> chm = new ConcurrentHashMap<String, Object>();
chm.put("k1", "v1");
chm.put("k2", "v2");
chm.put("k3", "v3");
chm.putIfAbsent("k4", "vvvv");
//System.out.println(chm.get("k2"));
//System.out.println(chm.size()); for (Map.Entry<String, Object> me : chm.entrySet()) {
System.out.println("key:" + me.getKey() + ",value:" + me.getValue());
} /**
* ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。
*
* 这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更有效。
* 在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。“快照”风格的迭代器方法在创建迭代器时使用了对数组状态的引用。
* 此数组在迭代器的生存期内不会更改,因此不可能发生冲突,并且迭代器保证不会抛出 ConcurrentModificationException。
* 创建迭代器以后,迭代器就不会反映列表的添加、移除或者更改。在迭代器上进行的元素更改操作(remove、set 和 add)不受支持。
* 这些方法将抛出 UnsupportedOperationException。
*
* 允许使用所有元素,包括 null。
*
* 内存一致性效果:当存在其他并发 collection 时,将对象放入 CopyOnWriteArrayList 之前的线程中的操作 happen-before 随后通过另一线程从 CopyOnWriteArrayList 中访问或移除该元素的操作。
*/
CopyOnWriteArrayList<String> cwal = new CopyOnWriteArrayList<String>();
/**
* 对其所有操作使用内部 CopyOnWriteArrayList 的 Set。它共享以下相同的基本属性:
*
* 它最适合于具有以下特征的应用程序:set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
* 它是线程安全的。
* 因为通常需要复制整个基础数组,所以可变操作(add、set 和 remove 等等)的开销很大。
* 迭代器不支持可变 remove 操作。
* 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。
*/
CopyOnWriteArraySet<String> cwas = new CopyOnWriteArraySet<String>();
}
}
DelayQueue例子
创建一个网吧类、网民类,模拟上机、下机操作:
package com.ietree.basicskill.mutilthread.concurrent; import java.util.concurrent.DelayQueue; /**
* Created by Root on 5/11/2017.
*/
public class WangBa implements Runnable {
/**
* Delayed 元素的一个*阻塞队列
*
* 只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。
* 如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。
* 当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。
* 即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
* 例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
*/
private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>(); public boolean yinye = true; public void shangji(String name, String id, int money) {
Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
System.out.println("网名" + man.getName() + " 身份证" + man.getId() + "交钱" + money + "块,开始上机...");
this.queue.add(man);
} public void xiaji(Wangmin man) {
System.out.println("网名" + man.getName() + " 身份证" + man.getId() + "时间到下机...");
} @Override
public void run() {
while (yinye) {
try {
Wangmin man = queue.take();
xiaji(man);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} public static void main(String args[]) {
try {
System.out.println("网吧开始营业");
WangBa siyu = new WangBa();
Thread shangwang = new Thread(siyu);
shangwang.start(); siyu.shangji("路人甲", "123", 1);
siyu.shangji("路人乙", "234", 10);
siyu.shangji("路人丙", "345", 5);
} catch (Exception e) {
e.printStackTrace();
} }
}
package com.ietree.basicskill.mutilthread.concurrent; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; /**
* Created by Root on 5/11/2017.
*/
public class Wangmin implements Delayed {
private String name;
//身份证
private String id;
//截止时间
private long endTime;
//定义时间工具类
private TimeUnit timeUnit = TimeUnit.SECONDS; public Wangmin(String name,String id,long endTime){
this.name=name;
this.id=id;
this.endTime = endTime;
} public String getName(){
return this.name;
} public String getId(){
return this.id;
} /**
* 用来判断是否到了截止时间
*/
@Override
public long getDelay(TimeUnit unit) {
//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return endTime - System.currentTimeMillis();
} /**
* 相互批较排序用
*/
@Override
public int compareTo(Delayed delayed) {
Wangmin w = (Wangmin)delayed;
return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
}
}
程序输出:
网吧开始营业
网名路人甲 身份证123交钱1块,开始上机...
网名路人乙 身份证234交钱10块,开始上机...
网名路人丙 身份证345交钱5块,开始上机...
网名路人甲 身份证123时间到下机...
网名路人丙 身份证345时间到下机...
网名路人乙 身份证234时间到下机...
例子:简单线程同步问题
package com.ietree.multithread.sync; import java.util.Vector; public class Tickets {
public static void main(String[] args) { // 初始化火车票池并添加火车票:避免线程同步可采用Vector替代ArrayList HashTable替代HashMap
final Vector<String> tickets = new Vector<String>(); // Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>()); for (int i = 1; i <= 1000; i++) {
tickets.add("火车票" + i);
} // for (Iterator iterator = tickets.iterator(); iterator.hasNext();) {
// String string = (String) iterator.next();
// tickets.remove(20);
// } for (int i = 1; i <= 10; i++) {
new Thread("线程" + i) {
public void run() {
while (true) {
if (tickets.isEmpty()) {
break;
}
System.out.println(Thread.currentThread().getName() + "---" + tickets.remove(0));
}
}
}.start();
}
}
}