同步容器是什么:
JDK提供给了很多容器,其中有list,set,queue,map等。
这里我们挑出List单讲。
众所周知,很多书上,我们看到Arraylist并不是线程安全的,Vector是线程安全的。
那就从源码上分析一下:
ArrayList中,add方法如下:
public boolean add(E e) { ensureCapacityInternal(size + 1); // Increments modCount!! elementData[size++] = e; return true; }复制代码
Vector中,add方法如下:
public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; }复制代码
对比发现,Vector之所以是线程安全的,是因为Vector对所有的方法使用synchronized进行了修饰。
不安全的同步容器:
public class SynchornizedVector { public static void main(String[] agrs){ Vector vector = new Vector(); for(int i =0 ; i<10; i++){ vector.add(i,i); } new Thread(){ @Override public void run() { //vector共有10个元素,index对应0-9 //第一步:线程1执行到j=8,暂停; for(int j = 0; j < vector.size(); j++){ //第三部,线程1继续执行,要获取vector.get(8)的时候出错,因为vector的元素已经被线程2清空 if(j == 8){ try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(vector.get(j)); } } }.start(); new Thread(){ @Override public void run() { //第二步:线程2获得时间片,立即执行,删除掉vector中所有的元素 for(int i = 0; i < vector.size(); i++){ vector.remove(i); } } }.start(); }}复制代码
需要对size()的地方进行同步互斥,才能确保容器是安全的,举例如下:
第39行和第17行
public class SynchornizedVector { public static void main(String[] agrs) { Vector vector = new Vector(); for (int i = 0; i < 10; i++) { vector.add(i, i); } new Thread() { @Override public void run() { //vector共有10个元素,index对应0-9 //第一步:线程1执行到j=8,暂停; synchronized (SynchornizedVector.class) { for (int j = 0; j < vector.size(); j++) { //第三部,线程1继续执行,要获取vector.get(8)的时候出错,因为vector的元素已经被线程2清空 if (j == 8) { try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(vector.get(j)); } } } }.start(); new Thread() { @Override public void run() { //第二步:线程2获得时间片,立即执行,删除掉vector中所有的元素 synchronized (SynchornizedVector.class) { for (int i = 0; i < vector.size(); i++) { vector.remove(i); } } } }.start(); }}复制代码
工程中大量使用的同步容器ConcurrentHashMap
众所周知,hashMap是根据散列值分段存储的,同步Map在同步的时候锁住了所有的段(粗粒度的锁)
而ConcurrentHashMap根据散列值锁定了散列值对应的段,提高了并发性能(细粒度的锁)
其数据结构如下:
根据图中的数据结构:每次对key寻找到相应的位置需要两次定位:1.定位到Segment。2.定位到元素所在Segment中的具体链表的头部。
对读操作不加锁,对写操作的锁的粒度细化到每个Segment
支持的最大并发数就是Segment的数量
static final class Segmentextends ReentrantLock implements Serializable { transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry [] table; final float loadFactor;}复制代码
count:Segment中元素的数量
modCount:对table的大小造成影响的操作的数量,比如put(),remove()
threshold:扩容阈值
table:数组中每一个元素代表了一个链表的头部
loadFactor:用于确定threshold
get过程
static final class HashEntry{ final K key; final int hash; volatile V value; final HashEntry next;}复制代码
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment(cap, loadFactor);}复制代码
initialCapcity:初始的容量
loadFactor:负载参数
concurrentLevel:Segment的数量,一旦设定不可改变,如果map容量不够,需要扩容,则增加Segment数组的大小,而不增加Segment的数量,这样就不需要对Map做rehash,只要对Segment中的元素做rehash
整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash);}复制代码
第三行的作用是:把key对应的segment找出来
final SegmentsegmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask];}复制代码
采用移位的方式操作,可以加快计算速度
确定了具体的segment之后,就要确定segment中具体的链表位置
HashEntrygetFirst(int hash) { HashEntry [] tab = table; return tab[hash & (tab.length - 1)];}复制代码
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntrye = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null;}复制代码
put过程:
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); HashEntry[] tab = table; int index = hash & (tab.length - 1); HashEntry first = tab[index]; HashEntry e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry (key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); }}复制代码
如果Segment中元素的数量超过了threshold就要进行rehash,如有key存在,则更新value值,否则新生成一个HashEntry加入到整个Segment的头部
注意:
ConcurrentHashMap 的 get 的操作在大多数情况下都是不加锁的,只有当找到的 HashEntry 的 value 是 null 时,才会再进行一次加锁的读操作,以保障读操作的一致性。通常这种情况发生在你找到的 HashEntry 恰是另一个线程在做 put 操作时创建的,且 value 恰好没有设置完成。这种情况不太容易发生。所以,对于 ConcurrentHashMap 来说,发生在同一个 Segment 的一个写和多个读操作是并不互斥的,所以 Segment 也就没有继承读写锁了,而且这种设计要比读写锁的并发能力更高