java并发编程之美-阅读记录4
java并發包中的原子操作類,這些類都是基于非阻塞算法CAS實現的。
4.1原子變量操作類
AtomicInteger/AtomicLong/AtomicBoolean等原子操作類
AtomicLong類:
public class AtomicLong extends Number implements java.io.Serializable {// 基于硬件的原子操作類private static final Unsafe unsafe = Unsafe.getUnsafe();// 存放value的偏移地址private static final long valueOffset;//判斷jvm是否支持Long類型無鎖CASstatic final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();private static native boolean VMSupportsCS8();// 初始化value字段的偏移量static {try {valueOffset = unsafe.objectFieldOffset(AtomicLong.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}// 存放具體的值private volatile long value; ......... }雖然該類提供了原子操作(雖然是無阻塞的CAS操作,相對于阻塞算法提升了很火),但是在高并發情況下,會競爭更新同一個原子變量,仍然會有效率問題。
導致問題的原因也就是CAS操作:
public final long getAndAddLong(Object paramObject , long paramLongl , long paramLong2)long l ;do {1 = getLongvolatile(paramObject , paramLongl) ;// .compareAndSwapLong比較交換操作,該方法就是CAS的核心方法,但是該方法在高并發情況下,會競爭更新原子變量,最終只有一個線程更新成功,其他線程會循環多次CAS操作,浪費了cpu資源,降低了效率) while (!compareAndSwapLong(param0bject , paramLongl , 1, 1 + paramLong2) );return l ; } compareAndSwapLong:paramObject操作對象,paramLong1 value的偏移量,第三個參數為except表示當前是否是該值,最后一個參數就是要修改成的值?
jdk8新增的原子操作類LongAdder
? 為了解決高并發情況下多線程對一個共享變量的CAS爭奪失敗后進行自旋而造成的降低并發性能的問題,LongAdder在內部維護了一個cell元素(一個動態的cell數組)來分擔對單個變量進行爭奪的開銷,也就是將對一個變量的爭奪分配到對多個變量的爭奪上。
LongAdder結構:繼承Striped64
Striped64類,Cell類上有一個Contented注解,作用是避免偽共享問題
// 一個內部類,LongAdder就是通過Cell對象來提高性能,降低自選的CAS操作@sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe類,基于硬件的原子操作類private static final sun.misc.Unsafe UNSAFE;// value在Cell對象內存地址中的偏移量private static final long valueOffset;// 靜態代碼塊 初始化Unsafe類,和偏移量valueOfsetstatic {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}/** Number of CPUS, to place bound on table size */static final int NCPU = Runtime.getRuntime().availableProcessors();// cell數組,每一個線程會爭奪一個cell對象,cell數組變相的降低了爭奪一個變量的性能問題transient volatile Cell[] cells;// LongAdder的真實值就是base+cell[0]...+cell[n]transient volatile long base;// 為了實現自旋鎖,狀態值只有0和1transient volatile int cellsBusy;LongAdder重點方法add():
當內部increment或decrement自增或自減操作是,內部調用的就是add方法,當cell數組為空時,默認走的就是casBase方法(此時和AtomicLong操作相同,即都是在base的基礎上進行累加或累減操作的)。
進入第3行if判斷的情況:
1:cells不為空,會進入if
假如cells不為空進入的if判斷,則第5行結果為false,第6行會判斷Thread對象中的threadLocalRandomProbe的值是否存在,不存在,則直接調用longAccumulate方法,存在的話,會進一步判斷a.cas操作是否成功,不成功則調用longAccumulate方法,成功則代表add方法累加成功
2:casBase操作失敗,會進入if
如果cells為空,casBase方法操作失敗,則進入if判斷中,且第5行為true,則直接調用longAccumulate方法
1 public void add(long x) {2 Cell[] as; long b, v; int m; Cell a;3 if ((as = cells) != null || !casBase(b = base, b + x)) {4 boolean uncontended = true;5 if (as == null || (m = as.length - 1) < 0 ||6 (a = as[getProbe() & m]) == null ||7 !(uncontended = a.cas(v = a.value, v + x)))8 longAccumulate(x, null, uncontended);9 } 10 } 11 12 /** 13 * Equivalent to {@code add(1)}. 14 */ 15 public void increment() { 16 add(1L); 17 } 18 19 /** 20 * Equivalent to {@code add(-1)}. 21 */ 22 public void decrement() { 23 add(-1L); 24 }?longAccumulate方法是進行cells數組初始化和擴容的地方
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 初始化probe,也就是Thread類中的threadlocalsRandomProbe變量if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonempty// 死循環for (;;) {Cell[] as; Cell a; int n; long v;// 這里和上邊add方法的兩種情況對應,一種是cells不為null,一種是未null,當前if為cells不為null的時候會進入if ((as = cells) != null && (n = as.length) > 0) {// as[(n-1) & h] 里邊的索引和上一步add方法中的相同,都是通過probe的值 并上 cells數組的長度減1, 這一塊就是獲取線程應該訪問的Cell對象(相當于AtomicLong對象中的共享變量)if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell 當前索引處無Cell對象,并且沒有線程在執行CAS操作,則會新建一個Cell對象Cell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;else if (n >= NCPU || cells != as)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = advanceProbe(h);}// 以下判斷是當cells為null時,并且cellsBusy為0,并且能夠將cellsBusy自增為1時,也就是能夠加鎖時,進入判斷內else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) { // 默認初始化Cell數組大小為2Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}// 以上條件都不滿足,如果指定了operation方法,則使用operation進行操作,沒有的話,則進行累加操作(這塊是針對自定義operation的)else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}?
?LongAccumulator類:LongAdder是LongAccumulator的一個特例,后者提供了更加強大的功能,可以讓用戶自定義累加規則
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) {this.function = accumulatorFunction;base = this.identity = identity;}// java8中的函數式接口可以自定義操作規則,累加、累乘等操作 @FunctionalInterface public interface LongBinaryOperator {/*** Applies this operator to the given operands.** @param left the first operand* @param right the second operand* @return the operator result*/long applyAsLong(long left, long right); }?
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的java并发编程之美-阅读记录4的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java并发编程之美-阅读记录3
- 下一篇: java并发编程之美-阅读记录5