JDK1.8的版本中ConcurrentHashMap原理解

在多线程环境下的HashMap就无法保证线程安全了,而HashTable又因为针对整个方法加锁而变得效率低下,这时就要考虑使用java.util.concurrent.ConcurrentHashMap类。

ConcurrentHashMap类依旧采用HashMap的结构,基本变量也和HashMap一致,因此接下来只说和HashMap不同之处。

ConcurrentHashMap用到乐观锁CAS,采用的是目前不开源的sun.misc.Unsafe 类,其是直接操作底层的类,常用到方法和含义如下:

  • compareAndSwapObject(Object o, long l, Object o1, Object o2):将对象o内偏移量为l的值和o1比较,如果一致则将原本值替换为o2
  • compareAndSwapInt(Object o, long l, int i, int i1):和compareAndSwapObject一致。
  • getObjectVolatile(Object o, long l):获取对象o偏移l位置的值,这个方法常常用来定位给定键在Table表内的位置,即定位到在哪个桶。

重要的变量

整个ConcurrentHashMap中的变量都加上了volatile,使得任何操作有可见,即每次读都可以保证是最新更新的的结果(即使是在读的过程中刚好写入)。

ConcurrentHashMap中新增了控制变量:

//MOVED是表示该节点处于转移状态,处于转移状态的node的hash值会变为-1。通常在增删节点的时候会用到。
static final int MOVED     = -1; 

//表示table中该节点是红黑树节点
static final int TREEBIN   = -2; 

//某个桶正在被扩容(resize)操作所保留,而不会存储任何元素。但是本身内部节点依旧可以被查询
static final int RESERVED  = -3; 

//只是用于计算Hash值
static final int HASH_BITS = 0x7fffffff; 

针对节点数的计算则引入了baseCountsizeCtl等变量:

//它在插入和删除元素时会被更新,用于统计当前哈希表的大小。但是在并发操作时可能不是实时更新的,估算用
private transient volatile long baseCount;

//大于0是表示扩容阈值,通常是table大小与Threshold的乘积,为负数则表示处于扩容操作中
private transient volatile int sizeCtl;

//CounterCell类是对于桶内元素的计数,该数组可以减少在高并发的删减节点是对于整体节点数修改的冲突
private transient volatile CounterCell[] counterCells;

ConcurrentHashMap存储节点还是像HashMap一样底层采用Node<K,V>数组的方式:

transient volatile Node<K,V>[] table;

但是新增了控制转移的节点类:ForwardingNode,其内部只有一个指向下一次扩容的数组:nextTable

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ……
}

关键方法

  1. putVal方法 调用put方法时,本质都是调用putval方法:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        //进入死循环,即自旋
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //1.懒加载,第一次初始化数据
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //2.且要存入的节点在找到的桶内且该桶无数据,用Unsafe类中compareAndSwapObject方法加入节点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                    break;   
            }
            //3.如果该节点是控制扩容的节点,则尝试去扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //4.针对单个节点加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //链表节点
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //该桶为红黑树结构,调用红黑树的putTreeVal方法。
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    可以看出putval的流程和HashMap一致。但是在刚开始就进入了自旋状态(即for无线循环),插入过程中,通过 CAS 操作尝试更新节点,失败则重试;而在注释4处,其是针对单个桶加锁,即table数据中的单个节点。这也就意味着最多可以有table.size()个线程同时对ConcurrentHashMap进行处理。事实上,ConcurrentHashMap在控制锁粒度方面都是尽量做到最小。

    在锁的内部则是根据桶节点的hash值正负来判断是链表还是红黑树结构,然后就是老套的遍历+替换/新增节点了。

    在最后则是判断是否树化和节点数量自增并判断是否扩容。

    再对整个流程关键点逐个解析:

    • ConCurrentHashMap中将初始化和扩容方法分开(HashMap中是在方法 resize() 中的)。initTable() 是初始化方法:

      private final Node<K,V>[] initTable() {
          Node<K,V>[] tab; int sc;
          //只有第一次存数据时才会被调用
          while ((tab = table) == null || tab.length == 0) {
              //sizeCtl为负数是表示有别的线程在操作table数据,线程暂停
              if ((sc = sizeCtl) < 0)
                  Thread.yield(); 
              else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                  try {
                      if ((tab = table) == null || tab.length == 0) {
                          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                          @SuppressWarnings("unchecked")
                          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                          table = tab = nt;
                          sc = n - (n >>> 2);
                      }
                  } finally {
                      sizeCtl = sc;
                  }
                  break;
              }
          }
          return tab;
      }
      

      这里U.compareAndSwapInt(this, SIZECTL, sc, -1) 才是精髓,因为最开始的时候就将sizeCtl的值赋给了sc,所以这里两者是一定相等的,该方法直接将SIZECTL置为-1,使得其他线程调用该方法是会调用Thread.yield(),使其直接进入就绪状态,等到其再度争抢到cpu资源时获取ConCurrentHashMap已经被初始化了。

      随后就是根据原sizeCtl或者默认值的值创建数组并修改sizeCtl的值。

    • 在注释第2点里直接调用tabAt方法是直接根据键的hash定位到该键属于哪一个桶,而随后调用的casTabAt是直接在该位置直接创建新的节点,两个方法都是调用的Unsafe的方法:

      static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
          return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
      }
      
      static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                          Node<K,V> c, Node<K,V> v) {
      
    • 注释第3点的helpTransfer方法,其是协助去扩容,ConCurrentHashMap的扩容是可以多线程的。但是多线程数有上限。

        final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
            Node<K,V>[] nextTab; int sc;
            //当节点是ForwardingNode类且当前table数组和新生成的数组都存在时才会判断是否需要协助扩容
            if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
                //生成扩容戳记,多线程下唯一
                int rs = resizeStamp(tab.length);
                //当前table数组处于扩容状态
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        //扩容方法
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }
      

      该方法是针对节点是否是控制节点ForwardingNode进行判断的,开头也提及了,该类存放的是新的Table数组。

  2. replaceNode方法,该方法主要被repalce和remove方法调用。remove调用传入的是null

    //cv是作为期待的比较值存在的,虽然调用replace和remove是传入都是null,但是一旦传入就必须在满足cv=传入键的节点值才会执行替换操作
    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //链表桶
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        //传入的value为null,则将该节点的下个节点赋给上个节点的next
                                        else if (pred != null) 
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                //找到指定key键的节点后会将validated置为true,然后节点数减一
                if (validated) {
                    if (oldVal != null) {
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }
    

    逻辑上和putval的类似,排除掉当前table或当前键不存在的场景、或是需要扩容场景。在替换则是分为了链表和红黑树节点。

  3. get就很简单了,ConcurrentHashMap的读操作是不加锁的

     public V get(Object key) {
         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
         int h = spread(key.hashCode());
         //先确定key的桶位置
         if ((tab = table) != null && (n = tab.length) > 0 &&
             (e = tabAt(tab, (n - 1) & h)) != null) {
             //桶的节点就直接返回
             if ((eh = e.hash) == h) {
                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                     return e.val;
             }
             //红黑树节点调用其find方法
             else if (eh < 0)
                 return (p = e.find(h, key)) != null ? p.val : null;
             //链表节点从头遍历
             while ((e = e.next) != null) {
                 if (e.hash == h &&
                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
                     return e.val;
             }
         }
         return null;
     }
    

从这里来看,1.8版本的ConcurrentHashMap采用乐观锁结构,针对改操作加锁,读操作不加锁。且针对单个桶加锁,即满足了多线程的安全性又提高了并发性。因此涉及到多线程操作HashMap的时候更推荐用ConcurrentHashMap。

当然,要注意ConcurrentHashMap和ThreadLocal<HashMap>的区别。前者是多线程共享一个ConcurrentHashMap对象,后者是针对HashMap做线程内副本。

全部评论

相关推荐

bg:双非本,一段中小厂6个月测开实习今天发这个帖子主要是想聊一聊我秋招以来的一个发展我是在8月底辞职,打算秋招,可是看网上都说金九银十就想着自己就是一个普通本科生,现在九月份都是一些大神在争抢,所以9月份基本上没投,等到了10月份才开始秋招,可是这个时间好像已经有些晚了,今年秋招开启的格外早,提前到了7,8月份,我十月才开始,官网投了很多公司,没有任何一个面试机会,这个情况一直到了十月底才有了第一个面试,当时没有面试经验,所以不出意外的挂了后续就是漫长的投递,但是毫无例外没有面试,没有办法我只能另辟蹊径开始在BOSS上边投递,然后顺便也根据BOSS上边这个公司名称去浏览器搜索看看有没有官网投递渠道,毕竟官网上投递后还是可以第一时间被HR看到的,然后一直不停投递,一开始第一个星期基本上都是投的正式秋招岗位到了第二个星期才开始实习和正式一起投,到十一月底的时候已经沟通了700➕才有一共1个正式的,5个要提前实习的,3个实习的面试,最后结果是过了1个要提前实习的和2个实习的每次面试我都会复盘,发现这些小公司面试官问的五花八门,有的专问基础,有的专问项目,有的啥都问,不过自己也是看出来了一下门道,就是小公司不像大公司面试官那样能力比较强基本上你简历上边的他都会,然后会根据简历来问,小公司面试官他们更多的是看自己会什么,然后看看你简历上边哪些他也是会的然后来问,经过不断的复盘加上背各种各样面试题,到了11月底12月初才有了1个要提前实习的offer还有2个实习的offer,而且薪资待遇对我来说已经很可观了可是啊,人总是这样得了千钱想万钱,我又开始不满现状,但是此时的我面试能力经过这么多面试和复盘已经很强了,然后在十二月份运气爆棚,被极兔和小鹏补录捞起来面试,还有个百度测开的实习面试,这个时候因为有了offer所以感觉有了底气,面试也很自信,最后结果是全部都过了那个时候我感觉自己真的很厉害,我问了极兔那边的HR像我这样的双非本收到offer的在极兔有多少?他告诉我产研岗90%都是硕士,10%里边基本上都是211,985,想我这样的很少很少,那一刻感觉自己超级牛逼,小鹏就更不用说了,最后也是不出意外选择了小鹏所以我就我个人经历想对和我学历履历差不多的牛友一些建议第一:秋招一定要趁早,真到了9,10月,那个时候可能你投的结果可能还不如7,8,11月,第二:最好先拿小公司实习或者正式练练手,提升一下面试能力,我个人觉得因为小公司问的五花八门所以你会更加横向去提升自己能力,而且大公司其实面试没有那么难,除了一些非常卷的岗位,公司大神比较多会问的很难,一般好点的公司都不会问的那么难,他们也知道都是应届生不会要求那么高第三:当有一定能力后,就是坚持了,对于我们这样的学历,没有特别强的履历情况下,就是要抓住提前批和补录的机会,这个时候各方面不会卡的很严,是我们很好很好的一个机会第四:就是运气也是很重要的一部分,不过这个很难去说什么最后祝各位牛友都能收获自己满意的offer😁😁😁
秋招,不懂就问
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务