18.2.2 ConcurrentHashMap线程安全实现机制
1. ConcurrentHashMap概述
1.1 设计目标
ConcurrentHashMap是Java并发包中提供的线程安全的哈希表实现,旨在解决HashMap在多线程环境下的安全问题,同时提供比Hashtable更好的并发性能。
1.2 核心特点
- 线程安全,支持高并发访问
- 分段锁机制(JDK7)/ CAS + synchronized(JDK8)
- 不允许null键和null值
- 支持完全并发的读操作
- 支持可配置的并发写操作
2. JDK7中的ConcurrentHashMap
2.1 分段锁机制
JDK7采用分段锁(Segment)机制,将整个哈希表分成多个段,每个段独立加锁。
// JDK7 ConcurrentHashMap核心结构
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
// 默认并发级别
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 段数组
final Segment<K,V>[] segments;
// Segment继承ReentrantLock,每个段都是一个独立的锁
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// 段内的哈希表
transient volatile HashEntry<K,V>[] table;
// 段内元素数量
transient int count;
// 哈希表节点
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value; // volatile保证可见性
volatile HashEntry<K,V> next; // volatile保证链表结构的可见性
}
}
}
2.2 JDK7的put操作流程
public class JDK7ConcurrentHashMapPut {
public V put(K key, V value) {
// 1. 计算hash值定位到具体的段
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
// 2. 获取段,如果不存在则创建
Segment<K,V> s = ensureSegment(j);
// 3. 在段内执行put操作
return s.put(key, hash, value, false);
}
// Segment内的put方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 尝试获取锁,如果获取失败则自旋等待
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
// 遍历链表查找或插入
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// 插入新节点
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock(); // 释放锁
}
return oldValue;
}
}
2.3 JDK7的get操作特点
public class JDK7ConcurrentHashMapGet {
public V get(Object key) {
// 无需加锁,通过volatile保证可见性
int hash = hash(key);
long u = (((hash >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 通过UNSAFE直接访问内存
Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u);
if (s != null && s.table != null) {
// 通过volatile读取链表头节点
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(s.table, ((long)(((s.table.length - 1) & hash)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k)))
return e.value;
}
}
return null;
}
}
3. JDK8中的ConcurrentHashMap
3.1 数据结构变化
JDK8废弃了分段锁,采用CAS + synchronized的方式实现线程安全。
// JDK8 ConcurrentHashMap核心结构
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V> {
// 存储数据的数组
transient volatile Node<K,V>[] table;
// 控制表初始化和扩容的标志
private transient volatile int sizeCtl;
// 元素个数,使用LongAdder的思想
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
// 普通节点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
boolean red;
}
// 转发节点,用于扩容
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
}
3.2 JDK8的put操作流程
public class JDK8ConcurrentHashMapPut {
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i;
// 1. 如果表为空,初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 2. 如果桶为空,使用CAS插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // CAS成功,跳出循环
}
// 3. 如果正在扩容,帮助扩容
else if (f.hash == MOVED)
tab = helpTransfer(tab, f);
// 4. 桶不为空,需要加锁处理
else {
V oldVal = null;
synchronized (f) { // 锁住链表头节点
if (tabAt(tab, i) == f) {
if (f.hash >= 0) {
// 链表处理
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, val
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经

查看19道真题和解析