FastThreadLocal
ThreadLocal
使用場景
使用場景是在于同一個類,但是會開多個線程執(zhí)行,但是每一個線程可以保持不同的變量狀態(tài)。
做法如上圖,線程類Thread有成員變量ThreadLocal.ThreadLocalMap,用來存儲該線程中的所有的ThreadLocal變量,初始化是一個Entry數(shù)組。
內(nèi)存泄漏
static class Entry extends WeakReference<ThreadLocal<?>> {/** The value associated with this ThreadLocal. */Object value;Entry(ThreadLocal<?> k, Object v) {super(k);value = v;}}Entry繼承于WeakReference,簡單說一下四種引用。強引用,就是我們常規(guī)使用的new出來一個對象,這時候會有變量建立具體的對象聯(lián)系。軟引用,適用于cache類型的變量,當jvm內(nèi)存不夠時會釋放該引用。弱引用,只要發(fā)生gc就會回收。虛引用,沒有很深刻的體會。
SoftReference<Dog> softReference = new SoftReference<Dog>(new Dog("dd"));while (true){if(softReference.get() == null){System.out.println("null");break;}else {System.out.println("ok");}System.gc();}ok ok ok ... WeakReference<Dog> weakReference = new WeakReference<Dog>(new Dog("dd"));while (true){if(weakReference.get() == null){System.out.println("null");break;}else {System.out.println("ok");}System.gc();} ok nullEntry中的key設(shè)置為虛引用,那么gc時候會被回收,此時ThreadLocal進行清理的時候可以根據(jù)key是否為null進行判斷清除,防止內(nèi)存泄漏。
但是還是要調(diào)用remove函數(shù),這個在線程池中如果不執(zhí)行的話會造成內(nèi)存泄漏,因為線程不進行回收,那么ThreadLocalMap中會一直存在這些Entry,同時不進行remove的話就會一直占用內(nèi)存。
Hash原理
獲取線程對應(yīng)的ThreadLocal,是應(yīng)用hashcode在Map中定位,如果發(fā)生hash沖突使用的是線性尋地址法,即往下一位找,這種沖突方法有概率會導致死循環(huán)。所以如果變量過多,沖突很多,定位較慢。
private Entry getEntry(ThreadLocal<?> key) {int i = key.threadLocalHashCode & (table.length - 1);Entry e = table[i];if (e != null && e.get() == key)return e;elsereturn getEntryAfterMiss(key, i, e);}private void set(ThreadLocal<?> key, Object value) {// We don't use a fast path as with get() because it is at// least as common to use set() to create new entries as// it is to replace existing ones, in which case, a fast// path would fail more often than not.Entry[] tab = table;int len = tab.length;int i = key.threadLocalHashCode & (len-1);for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {ThreadLocal<?> k = e.get();if (k == key) {e.value = value;return;}if (k == null) {replaceStaleEntry(key, value, i);return;}}tab[i] = new Entry(key, value);int sz = ++size;if (!cleanSomeSlots(i, sz) && sz >= threshold)rehash();}FastThreadLocal
使用場景
不同于JDK自帶的ThreadLocal,如果Thread是使用的FastThreadLocalThread,那么自帶有private InternalThreadLocalMap threadLocalMap,那么如果類中有用到FastThreadLocal會從threadLocalMap中獲取,netty中每一個FastThreadLocal都有全局唯一的index,所以是常數(shù)級從數(shù)組中定位獲取內(nèi)容,并且在set的同時會將該FastThreadLocal放到threadLocalMap中index為0的Set<FastThreadLocal<?>>上,這樣垃圾清理會比較簡單和快捷。
構(gòu)造函數(shù)
public FastThreadLocal() {index = InternalThreadLocalMap.nextVariableIndex();cleanerFlagIndex = InternalThreadLocalMap.nextVariableIndex();}由該函數(shù)保證了index的全局唯一性
public static int nextVariableIndex() {int index = nextIndex.getAndIncrement();if (index < 0) {nextIndex.decrementAndGet();throw new IllegalStateException("too many thread-local indexed variables");}return index;}set方法
public final void set(V value) {// 如果設(shè)置的非“空”if (value != InternalThreadLocalMap.UNSET) {// 獲取存儲的Map對象InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();// 如果當前設(shè)置的index上是unset(即我們已經(jīng)開始污染使用這個Map了),進行注冊清理操作(保證內(nèi)存清理)if (setKnownNotUnset(threadLocalMap, value)) {registerCleaner(threadLocalMap);}} else {// 如果設(shè)置UNSET,則進行清理操作remove();}}UNSET是一個new Object()對象,一方面用來填充整個空的Map,另一方面也是一個判斷是否使用的標志。
public static InternalThreadLocalMap get() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {// 直接會從FastThreadLocalThread中的成員變量中獲取InternalThreadLocalMapreturn fastGet((FastThreadLocalThread) thread);} else {// ThreadLocal<InternalThreadLocalMap>,用JDK的ThreadLocal代存InternalThreadLocalMapreturn slowGet();}} private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {// 數(shù)組定位進行替換,判斷原來的位置是否沒用過,即是否為unsetif (threadLocalMap.setIndexedVariable(index, value)) {// 存放到數(shù)組[0]上的set中,方便回收addToVariablesToRemove(threadLocalMap, this);return true;}return false;}public boolean setIndexedVariable(int index, Object value) {Object[] lookup = indexedVariables;if (index < lookup.length) {Object oldValue = lookup[index];lookup[index] = value;return oldValue == UNSET;} else {expandIndexedVariableTableAndSet(index, value);return true;}}private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);Set<FastThreadLocal<?>> variablesToRemove;// 判斷v有沒有設(shè)置為set,一般是首次會執(zhí)行這個if (v == InternalThreadLocalMap.UNSET || v == null) {variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);} else {// 已經(jīng)設(shè)置過了variablesToRemove = (Set<FastThreadLocal<?>>) v;}// 直接在set中加入當前的FastThreadLocalvariablesToRemove.add(variable);}注冊到ObjectClean中,保證會被清理內(nèi)存
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {Thread current = Thread.currentThread();if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {return;}// removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime// of the thread, and this Object will be discarded if the associated thread is GCed.threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released// and FastThreadLocal.onRemoval(...) will be called.ObjectCleaner.register(current, new Runnable() {@Overridepublic void run() {remove(threadLocalMap);// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.}});}ObjectCleaner
這個和ThreadDeathWatcher的監(jiān)控大致原理是相似的,都是開啟一個監(jiān)控守護線程進行for循環(huán)拉取任務(wù),只是該類沒有取消任務(wù),所以直接一個并發(fā)安全的set就足夠。
注冊函數(shù)將需要進行清理的object對象設(shè)為虛引用,并保存了清理任務(wù),放入到任務(wù)集中,如果未開啟監(jiān)控線程就開啟。
監(jiān)控線程死循環(huán)拿出虛引用隊列,如果有引用拿到,說明該對象已經(jīng)被gc,此時執(zhí)行清理任務(wù),如果無任務(wù)了就關(guān)閉線程。反之繼續(xù)。
這么做能保證內(nèi)存釋放,即使是使用JDK的ThreadLocal,因為也是對象Map。
/*** Allows a way to register some {@link Runnable} that will executed once there are no references to an {@link Object}* anymore.*/ public final class eObjectCleaner {private static final int REFERENCE_QUEUE_POLL_TIMEOUT_MS =max(500, getInt("io.netty.util.internal.ObjectCleaner.refQueuePollTimeout", 10000));// Package-private for testingstatic final String CLEANER_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread";// This will hold a reference to the AutomaticCleanerReference which will be removed once we called cleanup()private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean(false);private static final Runnable CLEANER_TASK = new Runnable() {@Overridepublic void run() {boolean interrupted = false;for (;;) {// Keep on processing as long as the LIVE_SET is not empty and once it becomes empty// See if we can let this thread complete.while (!LIVE_SET.isEmpty()) {final AutomaticCleanerReference reference;try {// 從虛引用的隊列中獲取引用,這個是只有對象被回收才會放到這個隊列中,能獲取得到,說明該引用已經(jīng)被gcreference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);} catch (InterruptedException ex) {// Just consume and move oninterrupted = true;continue;}if (reference != null) {try {// 執(zhí)行引用的清理任務(wù),從而保證gc后也能清理reference.cleanup();} catch (Throwable ignored) {// ignore exceptions, and don't log in case the logger throws an exception, blocks, or has// other unexpected side effects.}// 從任務(wù)集中去除引用LIVE_SET.remove(reference);}}CLEANER_RUNNING.set(false);// Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct// behavior in multi-threaded environments.if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {// There was nothing added after we set STARTED to false or some other cleanup Thread// was started already so its safe to let this Thread complete now.break;}}if (interrupted) {// As we caught the InterruptedException above we should mark the Thread as interrupted.Thread.currentThread().interrupt();}}};/*** Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references* to the object anymore.** This should only be used if there are no other ways to execute some cleanup once the Object is not reachable* anymore because it is not a cheap way to handle the cleanup.*/public static void register(Object object, Runnable cleanupTask) {AutomaticCleanerReference reference = new AutomaticCleanerReference(object,ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));// Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct// behavior in multi-threaded environments.// 任務(wù)集內(nèi)容,要保證并發(fā)安全LIVE_SET.add(reference);// Check if there is already a cleaner running.// 如果running標志沒開啟,CAS操作進行開啟一個守護線程執(zhí)行if (CLEANER_RUNNING.compareAndSet(false, true)) {final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);cleanupThread.setPriority(Thread.MIN_PRIORITY);// Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited// classloader.// See:// - https://github.com/netty/netty/issues/7290// - https://bugs.openjdk.java.net/browse/JDK-7008595AccessController.doPrivileged(new PrivilegedAction<Void>() {@Overridepublic Void run() {cleanupThread.setContextClassLoader(null);return null;}});cleanupThread.setName(CLEANER_THREAD_NAME);// Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is// running.cleanupThread.setDaemon(true);cleanupThread.start();}}public static int getLiveSetCount() {return LIVE_SET.size();}private ObjectCleaner() {// Only contains a static method.}// 繼承軟引用,這里Object是thread,用來判斷thread是否被回收private static final class AutomaticCleanerReference extends WeakReference<Object> {// 存下任務(wù)private final Runnable cleanupTask;AutomaticCleanerReference(Object referent, Runnable cleanupTask) {super(referent, REFERENCE_QUEUE);this.cleanupTask = cleanupTask;}void cleanup() {cleanupTask.run();}@Overridepublic Thread get() {return null;}@Overridepublic void clear() {LIVE_SET.remove(this);super.clear();}} }remove方法
public final void remove(InternalThreadLocalMap threadLocalMap) {if (threadLocalMap == null) {return;}// lookup[index] = UNSET;制定位置置為unset,去除value強引用Object v = threadLocalMap.removeIndexedVariable(index);// set.remove(this),去除FastTheadLocal的強引用removeFromVariablesToRemove(threadLocalMap, this);if (v != InternalThreadLocalMap.UNSET) {try {// 可以重寫,做一些自己想做的事情。。onRemoval((V) v);} catch (Exception e) {PlatformDependent.throwException(e);}}}// 從set中取出所有的FastThreadLocal執(zhí)行remove,并且最后將Map置為空,all overpublic static void removeAll() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();if (threadLocalMap == null) {return;}try {Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);if (v != null && v != InternalThreadLocalMap.UNSET) {@SuppressWarnings("unchecked")Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;FastThreadLocal<?>[] variablesToRemoveArray =variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);for (FastThreadLocal<?> tlv: variablesToRemoveArray) {tlv.remove(threadLocalMap);}}} finally {InternalThreadLocalMap.remove();}}get方法
public final V get() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();Object v = threadLocalMap.indexedVariable(index);if (v != InternalThreadLocalMap.UNSET) {return (V) v;}// 利用重寫的初始化函數(shù)進行初始化V value = initialize(threadLocalMap);registerCleaner(threadLocalMap);return value;}對比
- 都是Thead自己存儲自己的TheadLocal
- JDK的存儲使用線性探測法的Map,數(shù)量大容易造成沖突,性能下降很快,并且會有內(nèi)存泄漏的風險。
- FastTheadLocal快的原因改進了存儲方式,全局唯一index來標志一個ftl,當然這樣如果全局ftl很多會造成空間浪費,這是一種空間換時間的方式。同時它會進行內(nèi)存監(jiān)控清理防止內(nèi)存泄漏。
- 個人認為在TheadLocal不多的情況下其實兩種性能差不多(因為JDK自身不會hash沖突),但是Ftl更能保證內(nèi)存不泄漏,所以JDK調(diào)用的時候記得remove
reference
https://www.jianshu.com/p/3fc2fbac4bb7
轉(zhuǎn)載于:https://www.cnblogs.com/GrimReaper/p/10385325.html
總結(jié)
以上是生活随笔為你收集整理的FastThreadLocal的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nodejs 开发,手把手开始第一个服务
- 下一篇: 设置宿舍路由器为多账号登录的方法