博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发-7-同步容器和ConcurrentHashMap
阅读量:5874 次
发布时间:2019-06-19

本文共 7300 字,大约阅读时间需要 24 分钟。

同步容器是什么:

JDK提供给了很多容器,其中有list,set,queue,map等。

这里我们挑出List单讲。

众所周知,很多书上,我们看到Arraylist并不是线程安全的,Vector是线程安全的。

那就从源码上分析一下:

ArrayList中,add方法如下:

public boolean add(E e) {        ensureCapacityInternal(size + 1);  // Increments modCount!!        elementData[size++] = e;        return true;    }复制代码

Vector中,add方法如下:

public synchronized boolean add(E e) {        modCount++;        ensureCapacityHelper(elementCount + 1);        elementData[elementCount++] = e;        return true;    }复制代码

对比发现,Vector之所以是线程安全的,是因为Vector对所有的方法使用synchronized进行了修饰。

不安全的同步容器:

public class SynchornizedVector {    public static void main(String[] agrs){        Vector vector = new Vector();        for(int i =0 ; i<10; i++){            vector.add(i,i);        }        new Thread(){            @Override            public void run() {                //vector共有10个元素,index对应0-9                //第一步:线程1执行到j=8,暂停;                for(int j = 0; j < vector.size(); j++){                    //第三部,线程1继续执行,要获取vector.get(8)的时候出错,因为vector的元素已经被线程2清空                    if(j == 8){                        try {                            Thread.currentThread().sleep(1000);                        } catch (InterruptedException e) {                            e.printStackTrace();                        }                    }                    System.out.println(vector.get(j));                }            }        }.start();        new Thread(){            @Override            public void run() {                //第二步:线程2获得时间片,立即执行,删除掉vector中所有的元素                for(int i = 0; i < vector.size(); i++){                    vector.remove(i);                }            }        }.start();    }}复制代码

需要对size()的地方进行同步互斥,才能确保容器是安全的,举例如下:

第39行和第17行

public class SynchornizedVector {    public static void main(String[] agrs) {        Vector vector = new Vector();        for (int i = 0; i < 10; i++) {            vector.add(i, i);        }        new Thread() {            @Override            public void run() {                //vector共有10个元素,index对应0-9                //第一步:线程1执行到j=8,暂停;                synchronized (SynchornizedVector.class) {                    for (int j = 0; j < vector.size(); j++) {                        //第三部,线程1继续执行,要获取vector.get(8)的时候出错,因为vector的元素已经被线程2清空                        if (j == 8) {                            try {                                Thread.currentThread().sleep(1000);                            } catch (InterruptedException e) {                                e.printStackTrace();                            }                        }                        System.out.println(vector.get(j));                    }                }            }        }.start();        new Thread() {            @Override            public void run() {                //第二步:线程2获得时间片,立即执行,删除掉vector中所有的元素                synchronized (SynchornizedVector.class) {                    for (int i = 0; i < vector.size(); i++) {                        vector.remove(i);                    }                }            }        }.start();    }}复制代码

工程中大量使用的同步容器ConcurrentHashMap

  众所周知,hashMap是根据散列值分段存储的,同步Map在同步的时候锁住了所有的段(粗粒度的锁)

  而ConcurrentHashMap根据散列值锁定了散列值对应的段,提高了并发性能(细粒度的锁)

  其数据结构如下:

  根据图中的数据结构:

  每次对key寻找到相应的位置需要两次定位:1.定位到Segment。2.定位到元素所在Segment中的具体链表的头部。

  对读操作不加锁,对写操作的锁的粒度细化到每个Segment

  支持的最大并发数就是Segment的数量

  

static final class Segment
extends ReentrantLock implements Serializable { transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry
[] table; final float loadFactor;}复制代码

count:Segment中元素的数量

modCount:对table的大小造成影响的操作的数量,比如put(),remove()

threshold:扩容阈值

table:数组中每一个元素代表了一个链表的头部

loadFactor:用于确定threshold

get过程

static final class HashEntry
{ final K key; final int hash; volatile V value; final HashEntry
next;}复制代码
public ConcurrentHashMap(int initialCapacity,                         float loadFactor, int concurrencyLevel) {    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)        throw new IllegalArgumentException();      if (concurrencyLevel > MAX_SEGMENTS)        concurrencyLevel = MAX_SEGMENTS;      // Find power-of-two sizes best matching arguments    int sshift = 0;    int ssize = 1;    while (ssize < concurrencyLevel) {        ++sshift;        ssize <<= 1;    }    segmentShift = 32 - sshift;    segmentMask = ssize - 1;    this.segments = Segment.newArray(ssize);      if (initialCapacity > MAXIMUM_CAPACITY)        initialCapacity = MAXIMUM_CAPACITY;    int c = initialCapacity / ssize;    if (c * ssize < initialCapacity)        ++c;    int cap = 1;    while (cap < c)        cap <<= 1;      for (int i = 0; i < this.segments.length; ++i)        this.segments[i] = new Segment
(cap, loadFactor);}复制代码

initialCapcity:初始的容量

loadFactor:负载参数

concurrentLevel:Segment的数量,一旦设定不可改变,如果map容量不够,需要扩容,则增加Segment数组的大小,而不增加Segment的数量,这样就不需要对Map做rehash,只要对Segment中的元素做rehash

整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。

public V get(Object key) {    int hash = hash(key.hashCode());    return segmentFor(hash).get(key, hash);}复制代码

第三行的作用是:把key对应的segment找出来

final Segment
segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask];}复制代码

采用移位的方式操作,可以加快计算速度

确定了具体的segment之后,就要确定segment中具体的链表位置

HashEntry
getFirst(int hash) { HashEntry
[] tab = table; return tab[hash & (tab.length - 1)];}复制代码
V get(Object key, int hash) {    if (count != 0) { // read-volatile        HashEntry
e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null;}复制代码

put过程:

V put(K key, int hash, V value, boolean onlyIfAbsent) {    lock();    try {        int c = count;        if (c++ > threshold) // ensure capacity            rehash();        HashEntry
[] tab = table; int index = hash & (tab.length - 1); HashEntry
first = tab[index]; HashEntry
e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry
(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); }}复制代码

如果Segment中元素的数量超过了threshold就要进行rehash,如有key存在,则更新value值,否则新生成一个HashEntry加入到整个Segment的头部

注意:

ConcurrentHashMap 的 get 的操作在大多数情况下都是不加锁的,只有当找到的 HashEntry 的 value 是 null 时,才会再进行一次加锁的读操作,以保障读操作的一致性。通常这种情况发生在你找到的 HashEntry 恰是另一个线程在做 put 操作时创建的,且 value 恰好没有设置完成。这种情况不太容易发生。所以,对于 ConcurrentHashMap 来说,发生在同一个 Segment 的一个写和多个读操作是并不互斥的,所以 Segment 也就没有继承读写锁了,而且这种设计要比读写锁的并发能力更高

转载地址:http://ithnx.baihongyu.com/

你可能感兴趣的文章
Python 字符串、列表、字典 操作方法大全 & 正则re
查看>>
Vue.js 介绍及其脚手架工具搭建
查看>>
Register code
查看>>
oracle基础入门(二)
查看>>
java 基础知识-数组的7种算法(排序、求和、最值、遍历...)
查看>>
倒要看看你有啥本事
查看>>
bzu-java(三)
查看>>
【初体验】valgrind分析程序性能
查看>>
testlink(以及服务器)问题定位思路
查看>>
Liferay Portal使用MySQL数据库配置
查看>>
个人代码库の模拟QQ振屏功能
查看>>
51Nod:1268 和为K的组合
查看>>
计科1501韩猛实验8
查看>>
课堂练习 组合数据练习
查看>>
面向对象程序设计第二次作业
查看>>
STL库的内存配置器(allocator)
查看>>
NO3 cat-xargs-cp-mv-rm-find命令
查看>>
Performance Tuning
查看>>
Javascript 强制浏览器渲染Dom文档
查看>>
用HTML5 Canvas为网页添加动态波浪背景
查看>>