注意
想法及时记录,实现可以待做。
Map
Map常用有哪些呢?如图所示:
简介
HashMap是Java中用于映射(键值对)处理的数据类型。但是不能用于多线程的情况下,否则会出现线程安全的问题。那么多线程下用哪个呢?
ConcurrentHashMap便是最好的选择了,由Doug Lea推出来的并发编程包的一员。
HashMap允许键和值为null,但是ConcurrentHashMap不允许键和值为null,否则NPE伺候。
它的复杂度一般来说是常数级,也就是O(1),但是也不绝对,如果有hash碰撞了,有可能退化为O(n)或者是O(log n)。
如果不熟悉HashMap,可以看Java8之HashMap介绍
如果我们需要多线程环境下,还要保证有序数据存储,那么CHM也无法满足,此时,ConcurrentSkipListMap这种结构便有机会使用。
分析
ConcurrentSkipListMap是一个并发安全的,基于SkipList实现的有序存储的Map。
跳表是一个链表结构,写入和读取的时间复杂度在O(log n),利用空间换时间,增加多层索引,提升查找和写入效率。
跳表具有以下几个的特点:
- 最底层包含所有节点的一个有序链表
- 每一层都是有序的链表
- 每个节点都有两个指针,一个指向下一节点,一个指向下一层节点
- 如果一个元素出现在level(x)中,那么它肯定出现在x一下的所有level中
组成介绍
/**
* 用于标识基本级别标头的特殊值
* Special value used to identify base-level header
*/
private static final Object BASE_HEADER = new Object();
/**
* 跳表的最高头索引
* The topmost head index of the skiplist.
*/
private transient volatile HeadIndex<K,V> head;
/**
* 比较器用于维护此映射中的顺序,如果使用自然顺序,则为null。 (非私有可简化嵌套类的访问。)
* The comparator used to maintain order in this map, or null if
* using natural ordering. (Non-private to simplify access in
* nested classes.)
* @serial
*/
final Comparator<? super K> comparator;
/** Lazily initialized key set */
private transient KeySet<K> keySet;
/** Lazily initialized entry set */
private transient EntrySet<K,V> entrySet;
/** Lazily initialized values collection */
private transient Values<V> values;
/** Lazily initialized descending key set */
private transient ConcurrentNavigableMap<K,V> descendingMap;
关键的内部类
/**
* 头索引,扩展索引,多了一个字段,记录索引所在的层级
* Nodes heading each level keep track of their level.
*/
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
/**
* 数据节点,存储数据,单链表结构
* Nodes hold keys and values, and are singly linked in sorted
* order, possibly with some intervening marker nodes. The list is
* headed by a dummy node accessible as head.node. The value field
* is declared only as Object because it takes special non-V
* values for marker and header nodes.
*/
static final class Node<K,V> {
// 节点的key是不可变的
final K key;
// 使用 volatile 修饰,保障 happens-before 语义以及线程间可见性。
volatile Object value;
volatile Node<K,V> next;
}
/**
* 索引,存储着对应的Node值、向右向下的索引
* Index nodes represent the levels of the skip list. Note that
* even though both Nodes and Indexes have forward-pointing
* fields, they have different types and are handled in different
* ways, that can't nicely be captured by placing field in a
* shared abstract class.
*/
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
/**
* Creates index node with given values.
*/
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
}
查找
/**
* 获取key对应的值,如果没有找到,就返回null
* Gets value for key. Almost the same as findNode, but returns
* the found value (to avoid retries during re-reads)
*
* @param key the key
* @return the value, or null if absent
*/
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// b是key的前继节点,n是b的下一个节点,key对应的就是在 b和n之间
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 说明是链表的最后一个节点,为null,那自然没有找到了,直接挑出最外层循环
if (n == null)
break outer;
// f是n的下一个节点
Node<K,V> f = n.next;
// 两次读取的值不一致,说明有变化,进行查找重试
if (n != b.next) // inconsistent read
break;
// n的值如果是null,则删除,然后进行查找的重试
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// 前继节点被删除了,重试
if (b.value == null || v == n) // b is deleted
break;
// 找到了key对应的值,返回
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 没有找到key,跳出外层循环,
if (c < 0)
break outer;
// key还要往下一个范围继续找
b = n;
n = f;
}
}
return null;
}
/**
* 查找key的前继节点,从head节点开始遍历,找到小于key的最大的一个索引节点指向的节点
* Returns a base-level node with key strictly less than given key,
* or the base-level header if there is no such node. Also
* unlinks indexes to deleted nodes found along the way. Callers
* rely on this side-effect of clearing indices to deleted nodes.
* @param key the key
* @return a predecessor of key
*/
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
// q是head节点,r是q的右节点
for (Index<K,V> q = head, r = q.right, d;;) {
// 右节点不为null
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
// r这个节点未nul,则会被删除
if (n.value == null) {
// 从q中删除,cas失败则重试
if (!q.unlink(r))
break; // restart
// 赋值新的right节点,继续循环
r = q.right; // reread r
continue;
}
// 比较器比较 key > k,则继续向右查找
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 右节点为null,则down下去,如果down下去节点未null,则返回q的节点
if ((d = q.down) == null)
return q.node;
// down下去的节点不为null,继续下一个迭代,更新down和right方向的节点
q = d;
r = d.right;
}
}
}
插入
/**
* Main insertion method. Adds element if not present, or
* replaces value if present and onlyIfAbsent is false.
* @param key the key
* @param value the value that must be associated with key
* @param onlyIfAbsent if should not insert if already present
* @return the old value, or null if newly inserted
*/
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 外层循环
outer: for (;;) {
// 根据 key,找到待插入的位置
// b 叫做前驱节点,将来作为新加入结点的前驱节点
// n 叫做后继结点,将来作为新加入结点的后继结点
// 也就是说,新节点将插入在 b 和 n 之间
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
// 如果 n 为 null,那么说明 b 是链表的最尾端的结点,这种情况比较简单,直接构建新节点插入即可
// n不为null,说明不是链表的最后一个节点
if (n != null) {
Object v; int c;
// f是n的下一个节点
Node<K,V> f = n.next;
// 如果 n 不再是 b 的后继结点了,说明有其他线程向 b 后面添加了新元素
// 那么我们直接退出内循环,重新计算新节点将要插入的位置
if (n != b.next) // inconsistent read
break;
// 如果n的值为null,说明n已经删除,这里则帮助删除,重试
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// 前驱节点b删除,重试
if (b.value == null || v == n) // b is deleted
break;
// 如果 插入的key比 n的key还要大,说明n范围不够,需要继续往后找,将n作为前驱节点,f作为n,插入的key继续在此范围内寻找。
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
// n的key就是本次要插入的key
if (c == 0) {
// 如果要覆盖n的值,则cas赋值,成功则返回插入的值,否则因为cas竞争失败,则重试
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
//
// else c < 0; fall through
}
// 没有找到,则本次新插入的节点
z = new Node<K,V>(key, value, n);
// n是链表的最后一个节点,此时cas往链表后追加,cas竞争失败,则重试
if (!b.casNext(n, z))
break; // restart if lost race to append to b
// 全部走完,跳出外层的for循环,不在重试
break outer;
}
}
// 获取一个线程无关的随机数,占四个字节,32 个比特位
int rnd = ThreadLocalRandom.nextSecondarySeed();
// 和 1000 0000 0000 0000 0000 0000 0000 0001 与
// 如果等于 0,说明这个随机数最高位和最低位都为 0,这种概率很大
// 如果不等于 0,那么将仅仅把新节点插入到最底层的链表中即可,不会往上层递归
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
// 用低位连续为 1 的个数作为 level 的值,也是一种概率策略
while (((rnd >>>= 1) & 1) != 0)
++level;
// 上面这段代码是获取 level 的, 我们这里只需要知道获取 level 就可以 (50%的几率返回0,25%的几率返回1,12.5%的几率返回2...最大返回31。)
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
// 如果概率算得的 level 在当前跳表 level 范围内
// 构建一个从 1 到 level 的纵列 index 结点引用
// 初始化 max 的值, 若 level <= max , 创建一个通过down属性构成的Index链表
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
// idx是作为新节点的down节点的
idx = new Index<K,V>(z, idx, null);
}
// 若 level > max 则只增加一层 index 索引层
else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
// 创建一个Index数组
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
// 初始化数组元素
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
// 获取老的 level 层
int oldLevel = h.level;
// 初始状态下level肯定大于oldLevel
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
// 创建新层级对应的head节点,down节点指向原来的head,right节点指向新节点z对应的索引节点
// 通常oldLevel就比level小1,即只创建一个HeadIndex节点
for (int j = oldLevel+1; j <= level; ++j)
// idxs[j] 就是上面的 idxs中的最高层的索引
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
// cas修改head,如果修改失败说明其他线程在修改head,则重新for循环读取head
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// 如果新节点z对应的level不是0,则创建对应层级的down链表,
// 如果层级大于head的还需要调整head的层级,创建新的head节点,新层级的head节点的right节点就指向新节点z
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
// 节点删除了,循环结束
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
// r节点是无效的
if (n.value == null) {
// 将r从q的right链表中移除,移除失败重新读取q
if (!q.unlink(r))
break;
// 获取新的right节点
r = q.right;
continue;
}
// key大于n.key,说明前驱节点不对,需要向右继续遍历
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
// 将t插入到q的right链表中,插入到r的前面,失败则重试
if (!q.link(r, t))
break; // restart
// 正常不可能为null,极端并发下可能为null,因为此时已经将新节点插入到链表中了,有可能被其他某个线程给删除了
if (t.node.value == null) {
// 通过findNode将该节点移除,然后终止外层for循环
findNode(key);
break splice;
}
// 到底层节点了,终止外层for循环
if (--insertionLevel == 0)
break splice;
}
// 如果j大于level,说明未遍历到新节点所在的最高层级,小于level说明上一个层级的right节点处理好了,需要遍历下一个层级
if (--j >= insertionLevel && j < level)
t = t.down;
// 遍历下一个层级的节点
q = q.down;
r = q.right;
}
}
}
return null;
}
删除
/**
* Main deletion method. Locates node, nulls value, appends a
* deletion marker, unlinks predecessor, removes associated index
* nodes, and possibly reduces head index level.
*
* Index nodes are cleared out simply by calling findPredecessor.
* which unlinks indexes to deleted nodes found along path to key,
* which will include the indexes to this node. This is done
* unconditionally. We can't check beforehand whether there are
* index nodes because it might be the case that some or all
* indexes hadn't been inserted yet for this node during initial
* search for it, and we'd like to ensure lack of garbage
* retention, so must call to be sure.
*
* @param key the key
* @param value if non-null, the value that must be
* associated with key
* @return the node, or null if not found
*/
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 外层循环
outer: for (;;) {
// 查找key的前驱节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 正常情况下,key 应该等于 n.key
// key 大于 n.key 说明我们要找的结点可能在 n 的后面,往后递归即可
// key 小于 n.key 说明 key 所代表的结点根本不存在
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
break outer;
// 尝试将待删结点的 value 属性赋值 null,失败将退出重试
if (!n.casValue(v, null))
break;
// casValue成功,在n和f之间插入一个空节点,如果成功则将b的next节点修改成f
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
// 修改成功,通过findPredecessor清理关联的Index节点
findPredecessor(key, cmp); // clean index
// 判断此次删除后是否导致某一索引层没有其他节点了,并适情况删除该层索引
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
参考
- ConcurrentSkipListMap 源码解析
- ConcurrentSkipListMap 源码分析 (基于Java 8)
- 基于跳跃表的 ConcurrentSkipListMap 内部实现(Java 8)
- Java8 ConcurrentSkipListMap与ConcurrentSkipListSet (一) 源码解析