目录

ThreadLocal

对于ThreadLocal,大家的第一反应可能是很简单呀,线程的变量副本,每个线程隔离。那这里有几个问题大家可以思考一下:

  • ThreadLocal的key是弱引用,那么在 threadLocal.get()的时候,发生GC之后,key是否为null
  • ThreadLocalThreadLocalMap数据结构
  • ThreadLocalMapHash算法
  • ThreadLocalMapHash冲突如何解决?
  • ThreadLocalMap扩容机制?
  • ThreadLocalMap中过期key的清理机制?探测式清理启发式清理流程?
  • ThreadLocalMap.set()方法实现原理?
  • ThreadLocalMap.get()方法实现原理?
  • 项目中ThreadLocal使用情况?遇到的坑?

什么是ThreadLocal

ThreadLocal类顾名思义可以理解为线程本地变量。也就是说如果定义了一个ThreadLocal,每个线程往这个ThreadLocal中读写是线程隔离,互相之间不会影响的。它提供了一种将可变数据通过每个线程有自己的独立副本从而实现线程封闭的机制。

大致实现思路

Thread类有一个类型为ThreadLocal.ThreadLocalMap的实例变量threadLocals,也就是说每个线程有一个自己的ThreadLocalMap。ThreadLocalMap有自己的独立实现,可以简单地将它的key视作ThreadLocal,value为代码中放入的值(实际上key并不是ThreadLocal本身,而是它的一个弱引用)。每个线程在往某个ThreadLocal里塞值的时候,都会往自己的ThreadLocalMap里存,读也是以某个ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。

ThreadLocal的API

https://gitee.com/lienhui68/picStore/raw/master/null/20200731081114.png

ThreadLocalMap的源码实现

ThreadLocalMap的源码实现,才是我们读ThreadLocal源码真正要领悟的。看看大师Doug Lea和Joshua Bloch的鬼斧神工之作。

https://gitee.com/lienhui68/picStore/raw/master/null/20200731081144.png

ThreadLocalMap提供了一种为ThreadLocal定制的高效实现,并且自带一种基于弱引用的垃圾清理机制。下面从基本结构开始一点点解读。

存储结构

既然是个map(注意不要与java.util.map混为一谈,这里指的是概念上的map),当然得要有自己的key和value,上面回答的问题2中也已经提及,我们可以将其简单视作key为ThreadLocal,value为实际放入的值。之所以说是简单视作,因为实际上ThreadLocal中存放的是ThreadLocal的弱引用。我们来看看ThreadLocalMap里的节点是如何定义的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200728213809.png

1
2
3
4
5
6
7
8
9
static class Entry extends WeakReference<java.lang.ThreadLocal<?>> {
    // 往ThreadLocal里实际塞入的值
    Object value;

    Entry(java.lang.ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

Entry便是ThreadLocalMap里定义的节点,它继承了WeakReference类,定义了一个类型为Object的value,用于存放塞到ThreadLocal里的值。

为什么是弱引用

读到这里,如果不问不答为什么是这样的定义形式,为什么要用弱引用,等于没读懂源码。 因为如果这里使用普通的key-value形式来定义存储结构,实质上就会造成节点的生命周期与线程强绑定,只要线程没有销毁,那么节点在GC分析中一直处于可达状态,没办法被回收,而程序本身也无法判断是否可以清理节点。弱引用是Java中四档引用的第三档,比软引用更加弱一些,如果一个对象没有强引用链可达,那么一般活不过下一次GC。当某个ThreadLocal已经没有强引用可达,则随着它被垃圾回收,在ThreadLocalMap里对应的Entry的键值会失效,这为ThreadLocalMap本身的垃圾清理提供了便利

类成员变量与相应方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
/**
 * 初始容量,必须为2的幂
 */
private static final int INITIAL_CAPACITY = 16;

/**
 * Entry表,大小必须为2的幂
 */
private Entry[] table;

/**
 * 表里entry的个数
 */
private int size = 0;

/**
 * 重新分配表大小的阈值,默认为0
 */
private int threshold; 

可以看到,ThreadLocalMap维护了一个Entry表或者说Entry数组,并且要求表的大小必须为2的幂,同时记录表里面entry的个数以及下一次需要扩容的阈值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/**
 * 设置resize阈值以维持最坏2/3的装载因子
 */
private void setThreshold(int len) {
    threshold = len * 2 / 3;
}

/**
 * 环形意义的下一个索引
 */
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

/**
 * 环形意义的上一个索引
 */
private static int prevIndex(int i, int len) {
    return ((i - 1 >= 0) ? i - 1 : len - 1);
}

ThreadLocal需要维持一个最坏2/3的负载因子,对于负载因子相信应该不会陌生,在HashMap中就有这个概念。

ThreadLocal有两个方法用于得到上一个/下一个索引,注意这里实际上是环形意义下的上一个与下一个。

由于ThreadLocalMap使用线性探测法来解决散列冲突,所以实际上Entry[]数组在程序逻辑上是作为一个环形存在的。

解决哈希冲突的方法一般有:开放寻址法、链地址法(拉链法)、再哈希法和建立公共溢出区等方法。在 Java中为了解决Hash碰撞,ThreadLocalMap采用线性探测再散列的开放寻址法,LinkedHashMap采用链表法。

开放寻址法:又称开放定址法,当哈希碰撞发生时,从发生碰撞的那个单元起,按照一定的次序,从哈希表中寻找一个空闲的单元,然后把发生冲突的元素存入到该单元。这个空闲单元又称为开放单元或者空白单元。

与二次探测和双散列一样,线性探测是一种开放寻址的策略。在这些策略里,散列表的每个单元都存储一对键值对。当散列函数对一个给定值产生一个键,并且这个键指向散列表中某个已经被另一个键值对所占用的单元时,线性探测用于解决此时产生的冲突:查找散列表中离冲突单元最近的空闲单元,并且把新的键插入这个空闲单元。同样的,查找也同插入如出一辙:从散列函数给出的散列值对应的单元开始查找,直到找到与键对应的值或者是找到空单元。

至此,我们已经可以大致勾勒出ThreadLocalMap的内部存储结构。下面是我绘制的示意图。虚线表示弱引用,实线表示强引用。

https://gitee.com/lienhui68/picStore/raw/master/null/20200731083029.png

ThreadLocalMap维护了Entry环形数组,数组中元素Entry的逻辑上的key为某个ThreadLocal对象(实际上是指向该ThreadLocal对象的弱引用),value为代码中该线程往该ThreadLoacl变量实际塞入的值。

构造函数

好的,接下来再来看看ThreadLocalMap的一个构造函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
/**
 * 构造一个包含firstKey和firstValue的map。
 * ThreadLocalMap是惰性构造的,所以只有当至少要往里面放一个元素的时候才会构建它。
 */
ThreadLocalMap(java.lang.ThreadLocal<?> firstKey, Object firstValue) {
    // 初始化table数组
    table = new Entry[INITIAL_CAPACITY];
    // 用firstKey的threadLocalHashCode与初始大小16取模得到哈希值
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    // 初始化该节点
    table[i] = new Entry(firstKey, firstValue);
    // 设置节点表大小为1
    size = 1;
    // 设定扩容阈值
    setThreshold(INITIAL_CAPACITY);
}

这个构造函数在set和get的时候都可能会被间接调用以初始化线程的ThreadLocalMap。

作者:时之令 链接:https://www.jianshu.com/p/a1953f00bfbb 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

ThreadLocalMap Hash函数

既然是Map结构,那么ThreadLocalMap当然也要实现自己的hash算法来解决散列表数组冲突问题。

1
int i = key.threadLocalHashCode & (len-1);

ThreadLocalMaphash算法很简单,这里i就是当前key在散列表中对应的数组下标位置。

这里最关键的就是threadLocalHashCode值的计算,ThreadLocal中有一个属性为HASH_INCREMENT = 0x61c88647

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadLocal<T> {
    private final int threadLocalHashCode = nextHashCode();

    private static AtomicInteger nextHashCode = new AtomicInteger();

    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

    static class ThreadLocalMap {
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);

            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }
    }
}

每当创建一个ThreadLocal对象,这个ThreadLocal.nextHashCode 这个值就会增长 0x61c88647

这个值很特殊,它是斐波那契数 也叫 黄金分割数。通过理论与实践, hash增量为 这个数字,带来的好处就是 hash 分布非常均匀

0x61c88647是Integer有符号整数的0.618倍,既黄金比例,斐波拉契数列。使用这个比例,可以使key在数组上被更均匀的分散。

斐波拉契数列:随着斐波拉契数列的增长,前一个数除以后一个数无限接近黄金分割点

ThreadLocalMap中,形如key.threadLocalHashCode & (table.length - 1)(其中key为一个ThreadLocal实例)这样的代码片段实质上就是在求一个ThreadLocal实例的哈希值,只是在源码实现中没有将其抽为一个公用函数。

ThreadLocalMap Hash冲突

注明: 下面所有示例图中,绿色块Entry代表正常数据灰色块代表Entrykey值为null已被垃圾回收白色块表示Entrynull

虽然ThreadLocalMap中使用了黄金分隔数来作为hash计算因子,大大减少了Hash冲突的概率,但是仍然会存在冲突。

HashMap中解决冲突的方法是在数组上构造一个链表结构,冲突的数据挂载到链表上,如果链表长度超过一定数量则会转化成红黑树

ThreadLocalMap中并没有链表结构,所以这里不能适用HashMap解决冲突的方式了。

ThreadLocalMap使用的是线性探测法,均匀分布的好处在于很快就能探测到下一个临近的可用slot,从而保证效率。

https://gitee.com/lienhui68/picStore/raw/master/null/20200729163538.png

如上图所示,如果我们插入一个value=27的数据,通过hash计算后应该落入第4个槽位中,而槽位4已经有了Entry数据。

此时就会线性向后查找,一直找到Entrynull的槽位才会停止查找,将当前元素放入此槽位中。当然迭代过程中还有其他的情况,比如遇到了Entry不为nullkey值相等的情况,还有Entry中的key值为null的情况等等都会有不同的处理,后面会一一详细讲解。

这里还画了一个Entry中的keynull的数据(Entry=2的灰色块数据),因为key值是弱引用类型,所以会有这种数据存在。在set过程中,如果遇到了key过期的Entry数据,实际上是会进行一轮探测式清理操作的,具体操作方式后面会讲到。

ThreadLocalMap因为使用了弱引用,所以其实每个slot的状态有三种也即有效(value未回收),无效(value已回收),空(entry==null)。

源码分析

getEntry方法

1
2
3
4
5
6
ThreadLocalMap
-get
-- getEntry
--- getEntryAfterMiss
---- expungeStaleEntry // 探测式清理

这个方法会被ThreadLocal的get方法直接调用,用于获取map中某个ThreadLocal存放的值。

getEntry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private Entry getEntry(ThreadLocal<?> key) {
    // 根据key这个ThreadLocal的ID来获取索引,也即哈希值
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    // 对应的entry存在且未失效且弱引用指向的ThreadLocal就是key,则命中返回
    if (e != null && e.get() == key) {
        return e;
    } else {
        // 因为用的是线性探测,所以往后找还是有可能能够找到目标Entry的。
        return getEntryAfterMiss(key, i, e);
    }
}

getEntryAfterMiss

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
 * 调用getEntry未直接命中的时候调用此方法
 */
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    // 基于线性探测法不断向后探测直到遇到空entry。
    while (e != null) {
        ThreadLocal<?> k = e.get();
        // 找到目标
        if (k == key) {
            return e;
        }
        if (k == null) {
            // 该entry对应的ThreadLocal已经被回收,调用expungeStaleEntry来清理无效的entry
          	// 重要:清理无效的entry, 将后面的有效entry往前挪(rehash),确保具有相同hash值的entry在一个连续的段(中间不能有空entry)
            expungeStaleEntry(i);
        } else {
            // 环形意义下往后面走
            i = nextIndex(i, len);
        }
        e = tab[i];
    }
    return null;
}

expungeStaleEntry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
 * 这个函数是ThreadLocal中核心清理函数,它做的事情很简单:
 * 就是从staleSlot开始遍历,将无效(弱引用指向对象被回收)清理,即对应entry中的value置为null,将指向这个entry的table[i]置为null,直到扫到空entry。
 * 另外,在过程中还会对非空的entry作rehash。
 * 可以说这个函数的作用就是从staleSlot开始清理连续段中的slot(断开强引用,rehash slot等)
 */
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 因为entry对应的ThreadLocal已经被回收,value设为null,显式断开强引用
    tab[staleSlot].value = null;
    // 显式设置该entry为null,以便垃圾回收
    tab[staleSlot] = null;
    size--;

    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        // 清理对应ThreadLocal已经被回收的entry
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            /*
             * 对于还没有被回收的情况,需要做一次rehash。
             * 
             * 如果对应的ThreadLocal的ID对len取模出来的索引h不为当前位置i,
             * 则从h向后线性探测到第一个空的slot,把当前的entry给挪过去。
             */
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;

                /*
                 * 在原代码的这里有句注释值得一提,原注释如下:
                 *
                 * Unlike Knuth 6.4 Algorithm R, we must scan until
                 * null because multiple entries could have been stale.
                 *
                 * 这段话提及了Knuth高德纳的著作TAOCP(《计算机程序设计艺术》)的6.4章节(散列)
                 * 中的R算法。R算法描述了如何从使用线性探测的散列表中删除一个元素。
                 * R算法维护了一个上次删除元素的index,当在非空连续段中扫到某个entry的哈希值取模后的索引
                 * 还没有遍历到时,会将该entry挪到index那个位置,并更新当前位置为新的index,
                 * 继续向后扫描直到遇到空的entry。
                 *
                 * ThreadLocalMap因为使用了弱引用,所以其实每个slot的状态有三种也即
                 * 有效(value未回收),无效(value已回收),空(entry==null)。
                 * 正是因为ThreadLocalMap的entry有三种状态,所以不能完全套高德纳原书的R算法。
                 *
                 * 因为expungeStaleEntry函数在扫描过程中还会对无效slot清理将之转为空slot,
                 * 如果直接套用R算法,可能会出现具有相同哈希值的entry之间断开(中间有空entry)。
                 */
                while (tab[h] != null) {
                    h = nextIndex(h, len);
                }
                tab[h] = e;
            }
        }
    }
    // 返回staleSlot之后第一个空的slot索引
    return i;
}

我们来回顾一下从ThreadLocal读一个值可能遇到的情况: 根据入参threadLocal的threadLocalHashCode对表容量取模得到index

  • 如果index对应的slot就是要读的threadLocal,则直接返回结果
  • 调用getEntryAfterMiss线性探测,过程中每碰到无效slot,调用expungeStaleEntry进行段清理;如果找到了key,则返回结果entry
  • 没有找到key,返回null

set方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ThreadLocalMap
- set
-- replaceStaleEntry // 替换失效的entry
--- cleanSomeSlots // 启发式清理slot
--- expungeStaleEntry // 探测式清理
-- cleanSomeSlots
--- expungeStaleEntry
-- rehash
--- expungeStaleEntries // 全量清理
--- resize // 扩容,因为需要保证table的容量len为2的幂,所以扩容即扩大2倍
---- setThreshold

set

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void set(ThreadLocal<?> key, Object value) {

    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len - 1);
    // 线性探测
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        // 找到对应的entry
        if (k == key) {
            e.value = value;
            return;
        }
        // 替换失效的entry
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold) {
        rehash();
    }
}

replaceStaleEntry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // 向前扫描,查找最前的一个无效slot
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len)) {
        if (e.get() == null) {
            slotToExpunge = i;
        }
    }

    // 向后遍历table
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();

        // 找到了key,将其与无效的slot交换
        if (k == key) {
            // 更新对应slot的value值
            e.value = value;

            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            /*
             * 如果在整个扫描过程中(包括函数一开始的向前扫描与i之前的向后扫描)
             * 找到了之前的无效slot则以那个位置作为清理的起点,
             * 否则则以当前的i作为清理起点
             */
            if (slotToExpunge == staleSlot) {
                slotToExpunge = i;
            }
            // 从slotToExpunge开始做一次连续段的清理,再做一次启发式清理
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // 如果当前的slot已经无效,并且向前扫描过程中没有无效slot,则更新slotToExpunge为当前位置
        if (k == null && slotToExpunge == staleSlot) {
            slotToExpunge = i;
        }
    }

    // 如果key在table中不存在,则在原地放一个即可
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // 在探测过程中如果发现任何无效slot,则做一次清理(连续段清理+启发式清理)
    if (slotToExpunge != staleSlot) {
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }
}

cleanSomeSlots

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * 启发式地清理slot,
 * i对应entry是非无效(指向的ThreadLocal没被回收&&后一个entry为空,或者entry本身为空)
 * n是用于控制控制扫描次数的
 * 正常情况下如果log n次扫描没有发现无效slot,函数就结束了
 * 但是如果发现了无效的slot,将n置为table的长度len,做一次连续段的清理
 * 再从下一个空的slot开始继续扫描
 * 
 * 这个函数有两处地方会被调用,一处是插入的时候可能会被调用,另外个是在替换无效slot的时候可能会被调用,
 * 区别是前者传入的n为元素个数,后者为table的容量
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        // i在任何情况下自己都不会是一个无效slot,所以从下一个开始判断
        i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            // 扩大扫描控制因子
            n = len;
            removed = true;
            // 清理一个连续段
            i = expungeStaleEntry(i);
        }
    } while ((n >>>= 1) != 0);
    return removed;
}

rehash

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
private void rehash() {
    // 做一次全量清理
    expungeStaleEntries();

    /*
     * 因为做了一次全量清理,所以size很可能会变小。
     * ThreadLocalMap这里的实现是调低阈值来判断是否需要扩容,
     * threshold默认为len*2/3,所以这里的threshold - threshold / 4相当于len/2
     */
    if (size >= threshold - threshold / 4) {
        resize();
    }
}

expungeStaleEntries

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
/*
 * 做一次全量清理
 */
private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
        Entry e = tab[j];
        if (e != null && e.get() == null) {
            /*
             * 个人觉得这里可以取返回值,如果大于j的话取了用,这样也是可行的。
             * 因为expungeStaleEntry执行过程中是把连续段内所有无效slot都清理了一遍了。
             */
            expungeStaleEntry(j);
        }
    }
}

resize

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * 扩容,因为需要保证table的容量len为2的幂,所以扩容即扩大2倍
 */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; 
            } else {
                // 线性探测来存放Entry
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null) {
                    h = nextIndex(h, newLen);
                }
                newTab[h] = e;
                count++;
            }
        }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
}

我们来回顾一下ThreadLocal的set方法可能会有的情况

  • 探测过程中slot都不无效,并且顺利找到key所在的slot,直接替换即可
  • 探测过程中发现有无效slot,调用replaceStaleEntry,效果是最终一定会把key和value放在这个slot,并且会尽可能清理无效slot
    • 在replaceStaleEntry过程中,如果找到了key,则做一个swap把它放到那个无效slot中,value置为新值
    • 在replaceStaleEntry过程中,没有找到key,直接在无效slot原地放entry
  • 探测没有发现key,则在连续段末尾的后一个空位置放上entry,这也是线性探测法的一部分。放完后,做一次启发式清理,如果没清理出去key,并且当前table大小已经超过阈值了,则做一次rehash,rehash函数会调用一次全量清理slot方法也即expungeStaleEntries,如果全量清理完了之后容量超过了threshold - threshold / 4 (1/2*len, 相当于全量清零掉超过1/4 threshold就不做扩容),则进行扩容2倍

remove

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/**
 * 从map中删除ThreadLocal
 */
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len - 1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            // 显式断开弱引用
          	// 相当于 Entry e = e.get(); e = null;
            e.clear();
            // 进行段清理
            expungeStaleEntry(i);
            return;
        }
    }
}

remove方法相对于getEntry和set方法比较简单,直接在table中找key,如果找到了,把弱引用断了做一次段清理。

ThreadLocal与内存泄露

关于ThreadLocal是否会引起内存泄漏也是一个比较有争议性的问题,其实就是要看对内存泄漏的准确定义是什么。 认为ThreadLocal会引起内存泄漏的说法是因为如果一个ThreadLocal对象被回收了,我们往里面放的value对于【当前线程->当前线程的threadLocals(ThreadLocal.ThreadLocalMap对象)->Entry数组->某个entry.value】这样一条强引用链是可达的,因此value不会被回收。 认为ThreadLocal不会引起内存泄漏的说法是因为ThreadLocal.ThreadLocalMap源码实现中自带一套自我清理的机制。

之所以有关于内存泄露的讨论是因为在有线程复用如线程池的场景中,一个线程的寿命很长,大对象长期不被回收影响系统运行效率与安全。如果线程不会复用,用完即销毁了也不会有ThreadLocal引发内存泄露的问题。《Effective Java》一书中的第6条对这种内存泄露称为unintentional object retention(无意识的对象保留)。

当我们仔细读过ThreadLocalMap的源码,我们可以推断,如果在使用的ThreadLocal的过程中,显式地进行remove是个很好的编码习惯,这样是不会引起内存泄漏。 那么如果没有显式地进行remove呢?只能说如果对应线程之后调用ThreadLocal的get和set方法都有很高的概率会顺便清理掉无效对象,断开value强引用,从而大对象被收集器回收。

但无论如何,我们应该考虑到何时调用ThreadLocal的remove方法。一个比较熟悉的场景就是对于一个请求一个线程的server如tomcat,在代码中对web api作一个切面,存放一些如用户名等用户信息,在连接点方法结束后,再显式调用remove。

内存泄露

ThreadLocalMap的生命周期是与线程一样的,但是ThreadLocal却不一定,可能ThreadLocal使用完了就想要被回收,但是此时线程可能不会立即终止,还会继续运行(比如线程池中线程重复利用),如果ThreadLocal对象只存在弱引用,那么在下一次垃圾回收的时候必然会被清理掉。

如果ThreadLocal没有被外部强引用的情况下,在垃圾回收的时候会被清理掉的,这样一来ThreadLocalMap中使用这个ThreadLocal的key也会被清理掉。但是,value 是强引用,不会被清理,这样一来就会出现key为null的value值。

在ThreadLocalMap中,调用 set()、get()、remove()方法的时候,会清理掉key为null的记录。在ThreadLocal设置为null之后,ThreadLocalMap中存在key为null的值,那么就可能发生内存泄漏,只有手动调用remove()方法来避免。

所以我们在使用完ThreadLocal变量时,尽量用threadLocal.remove()来清除,避免threadLocal=null的操作。remove方法是彻底地回收该对象,而通过threadLocal=null只是释放掉了ThreadLocal的引用,但是在ThreadLocalMap中却还存在其Entry,后续还需处理。

解决方案

  1. 使用完ThreadLocal对象后,总是调用其remove方法

  2. 自定义线程池

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    
        @Override
        public void execute(Runnable runnable) {
            Map<String, String> context = MDC.getCopyOfContextMap();
            super.execute(() -> run(runnable, context));
        }
    
        @Override
        private void run(Runnable runnable, Map<String, String> context) {
            if (context != null) {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.remove();
            }
        }
    }
    

为什么ThreadLocalMap自带清理机制还需要remove

ThreadLocalMap内部Entry中key使用的是对ThreadLocal对象的弱引用,这为避免内存泄露是一个进步,因为如果是强引用,那么即使其他地方没有对ThreadLocal对象的引用,ThreadLocalMap中的ThreadLocal对象还是不会被回收,而如果是弱引用则这时候ThreadLocal引用是会被回收掉的,虽然对于的value还是不能被回收,这时候ThreadLocalMap里面就会存在key为null但是value不为null的entry项,虽然ThreadLocalMap提供了set,get,remove方法在一些时机下会对这些Entry项进行清理,但是这是不及时的,也不是每次都会执行的,所以一些情况下还是会发生内存泄露,比如上面讲到的在线程池中使用ThreadLocal,所以在使用完毕后即使调用remove方法才是解决内存泄露的王道。

ThreadLocal与线程池结合使用需要注意的地方

线程池中的线程在任务执行完成后会被复用,所以在线程执行完成时,要对 ThreadLocal 进行清理(清除掉与本线程相关联的 value 对象)。不然,被复用的线程去执行新的任务时会使用被上一个线程操作过的 value 对象,从而产生不符合预期的结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.calvin.currency.ThreadLocal;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadLocalVariableHolder {

    private static ThreadLocal<Integer> variableHolder = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return 0;
        }
    };

    private static void remove() {
        variableHolder.remove();
    }

    private static int getValue() {
        return variableHolder.get();
    }

    private static void increment() {
        variableHolder.set(variableHolder.get() + 1);
    }

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(2, 5,
                3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
        for (int i = 0; i < 10; i++) {
            threadPool.execute(() -> {
                try {
                    long threadId = Thread.currentThread().getId();
                    int before = getValue();
                    increment();
                    int after = getValue();
                    System.out.println("threadId: " + threadId + ", before: " + before + ", after: " + after);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 清理线程本地存储
                    remove();
                }
            });
        }
        threadPool.shutdown();
    }
}

InheritableThreadLocal

inheritable: 可遗传的;可继承的

源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
/**
* 使用InheritableThreadLocal来创建共享变量的线程副本,容器是Thread.inheritableThreadLocals
* 当创建子对象时,会将inheritableThreadLocals中有效的entry复制到子线程的inheritableThreadLocals中
*/
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    protected T childValue(T parentValue) {
        return parentValue;
    }

    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

对于InheritableThreadLocal,本文不作过多介绍,只是简单略过。 ThreadLocal本身是线程隔离的,InheritableThreadLocal提供了一种父子线程之间的数据共享机制。

它的具体实现是在Thread类中除了threadLocals外还有一个inheritableThreadLocals对象。

1
2
3
4
5
6
7
8
9
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

在线程对象初始化的时候,会调用ThreadLocal的createInheritedMap从父线程的inheritableThreadLocals中把有效的entry都拷过来。

https://gitee.com/lienhui68/picStore/raw/master/null/20200731142930.png

可以看一下其中的具体实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private ThreadLocalMap(ThreadLocalMap parentMap) {
    Entry[] parentTable = parentMap.table;
    int len = parentTable.length;
    setThreshold(len);
    table = new Entry[len];

    for (int j = 0; j < len; j++) {
        Entry e = parentTable[j];
        if (e != null) {
            @SuppressWarnings("unchecked")
            ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
            if (key != null) {
                // 这里的childValue方法在InheritableThreadLocal中默认实现为返回本身值,可以被重写
                Object value = key.childValue(e.value);
                Entry c = new Entry(key, value);
                int h = key.threadLocalHashCode & (len - 1);
                while (table[h] != null)
                    h = nextIndex(h, len);
                table[h] = c;
                size++;
            }
        }
    }
}

还是比较简单的,做的事情就是以父线程的inheritableThreadLocalMap为数据源,过滤出有效的entry,初始化到自己的inheritableThreadLocalMap中。其中childValue可以被重写。

需要注意的地方是InheritableThreadLocal只是在子线程创建的时候会去拷一份父线程的inheritableThreadLocals。如果父线程是在子线程创建后再set某个InheritableThreadLocal对象的值,对子线程是不可见的。

ThreadLocal使用场景

  1. Spring里的事务, 同一个线程执行的方法里拿到的connection得保证是一样的

  2. 日志里的traceId

    我们现在项目中日志记录用的是ELK+Logstash,最后在Kibana中进行展示和检索。

    现在都是分布式系统统一对外提供服务,项目间调用的关系可以通过traceId来关联,但是不同项目之间如何传递traceId呢?

    这里我们使用org.slf4j.MDC来实现此功能,内部就是通过ThreadLocal来实现的,具体实现如下:

    当前端发送请求到服务A时,服务A会生成一个类似UUIDtraceId字符串,将此字符串放入当前线程的ThreadLocal中,在调用服务B的时候,将traceId写入到请求的Header中,服务B在接收请求时会先判断请求的Header中是否有traceId,如果存在则写入自己线程的ThreadLocal中。

    一般使用参数requestId作为我们各个系统链路关联的traceId,系统间互相调用,通过这个requestId即可找到对应链路,这里还有会有一些其他场景:

    https://gitee.com/lienhui68/picStore/raw/master/null/20200731144604.png

    针对于这些场景,我们都可以有相应的解决方案,如下所示

    Feign远程调用解决方案

    服务发送请求:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    @Component
    @Slf4j
    public class FeignInvokeInterceptor implements RequestInterceptor {
    
        @Override
        public void apply(RequestTemplate template) {
            String requestId = MDC.get("requestId");
            if (StringUtils.isNotBlank(requestId)) {
                template.header("requestId", requestId);
            }
        }
    }
    

    服务接收请求:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
    @Slf4j
    @Component
    public class LogInterceptor extends HandlerInterceptorAdapter {
    
        @Override
        public void afterCompletion(HttpServletRequest arg0, HttpServletResponse arg1, Object arg2, Exception arg3) {
            MDC.remove("requestId");
        }
    
        @Override
        public void postHandle(HttpServletRequest arg0, HttpServletResponse arg1, Object arg2, ModelAndView arg3) {
        }
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    
            String requestId = request.getHeader(BaseConstant.REQUEST_ID_KEY);
            if (StringUtils.isBlank(requestId)) {
                requestId = UUID.randomUUID().toString().replace("-", "");
            }
            MDC.put("requestId", requestId);
            return true;
        }
    }
    

    线程池异步调用,requestId传递

    因为MDC是基于ThreadLocal去实现的,异步过程中,子线程并没有办法获取到父线程ThreadLocal存储的数据,所以这里可以自定义线程池执行器,修改其中的run()方法:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    
        @Override
        public void execute(Runnable runnable) {
            Map<String, String> context = MDC.getCopyOfContextMap();
            super.execute(() -> run(runnable, context));
        }
    
        @Override
        private void run(Runnable runnable, Map<String, String> context) {
            if (context != null) {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.remove();
            }
        }
    }
    

    使用MQ发送消息给第三方系统

    在MQ发送的消息体中自定义属性requestId,接收方消费消息后,自己解析requestId使用即可。

参考

面试官:小伙子,听说你看过ThreadLocal源码?万字图文深度解析ThreadLocal

https://www.jianshu.com/p/a1953f00bfbb

https://www.jianshu.com/p/230d14dc8ab8

https://blog.csdn.net/qq_41066066/article/details/107116449

https://zhuanlan.zhihu.com/p/149060245