目录

cas

定义

CAS(Compare and swap)直译过来就是比较和替换,也有人叫compare and exchange,是一种通过硬件实现并发安全的常用工作,底层通过利用CPU的CAS指令对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。仔细观察J.U.C包中类的实现代码,会发现这些类中大量使用到了CAS,所以CAS是Java并发包的实现基础。它的实现过程是,有3个操作数,内存值V,旧的预期值E,要修改的新值N,当且仅当预期值E和内存值V相同时,才将内存值V修改为N,否则什么都不做。

当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作

一图看懂CAS的操作流程

https://gitee.com/lienhui68/picStore/raw/master/null/20200728084708.png

剖析cas

下面看两个测试代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.eh.ftd.jvm;

import java.util.concurrent.CountDownLatch;

public class Some {

    static int m;


    public static void main(String[] args) {
        CountDownLatch downLatch = new CountDownLatch(100);

        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                for (int j = 1; j <= 10000; j++) {
                    synchronized (Some.class) {
                        m++;
                    }
                }
                downLatch.countDown();

            }).start();
        }
        try {
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("总计:" + m);
    }
}

代码很简单,每个线程都对m做++操作,众所周知,由于m++不是原子操作,从CPU级别来看,m++经历了3步,取,加,存3步操作,所以在这之间就有可能出现线程并发的问题。加上一个synchronized 重量级锁就避免了这个问题。

如果不想用synchronized,不想加锁,可能很多人都用过AtomicInteger。实现如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.eh.ftd.jvm;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class Some {

    static AtomicInteger m = new AtomicInteger(0);


    public static void main(String[] args) {
        CountDownLatch downLatch = new CountDownLatch(100);

        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                for (int j = 1; j <= 10000; j++) {
                    m.getAndAdd(1);
                }
                downLatch.countDown();

            }).start();
        }
        try {
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("总计:" + m);
    }
}

AtomicInteger 是JUC出来后为大家提供的原子类,这里面的操作都是原子操作, m.incrementAndGet()增加并获取,这就是一个CAS操作,我们再也不用加锁了。

PS:很多人喜欢叫CAS为无锁,我很不喜欢这个叫法,准确的说CAS是自旋锁。为什么说不能叫无锁呢?

我们从源码探究一下CAS底层是怎么实现的:从代码示例2的 m.getAndAdd()我们跟进去。

1
2
3
4
5
6
7
8
9
/**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
  1. 使用了 sun.misc.Unsafe 对象,这个类提供了一系列直接操作内存对象的方法,只是在 jdk 内部使用,不建议开发者使用;

  2. value 表示实际值,这里的 value 定义为 volatile,因为 volatile 可以保证内存可见性,也就是 value 值只要发生变化,其他线程是马上可以看到变化后的值的;

  3. valueOffset 是 value 字段相对Java对象的“起始地址”的偏移量,用 unsafe.objectFieldOffset 方法获得,用作后面的 compareAndSet 方法;一个java对象可以看成是一段内存,各个字段都得按照一定的顺序放在这段内存里,同时考虑到对齐要求,可能这些字段不是连续放置的,用这个方法能准确地告诉你某个字段相对于对象的起始内存地址的字节偏移量,因为是相对偏移量,所以它其实跟某个具体对象又没什么太大关系,跟class的定义和虚拟机的内存模型的实现细节更相关。

getAndAdd你会发现它调用了一个类,这个类叫unsafe,调用了unsafe类里面的getAndAddInt。我们跟着进入到getAndAddInt方法中。

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

这时候你发现getAndAddInt方法中调用了compareAndSwapInt方法,从名字不难看出这是一个CAS操作,操作的什么类型呢?Int类型,那么具体怎么操作呢,我们进入到这个方法内。

1
 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

当你跟到这里的时候,你发现这是native 修饰的方法,native什么意思啊,native说明已经跟到C++的代码了。我们就来看一下C++中是怎么实现的CAS。下面是unsafe.cpp中代码实现。

1
2
3
4
5
6
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

所以你在java中调用了compareAndSwapInt的话,实际上是调用了Unsafe_CompareAndSwapInt名字都一样,由于调用链很长我就不一一贴代码了,感兴趣的朋友可以自己找源码跟一下,我们直接跳到最根上。

jdk8u:atomic_linux_x86.inline.hpp 的93行

1
2
3
4
5
6
7
8
inline jint  Atomic::cmpxchg(jint exchange_value, volatile jint* dest, jint  compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

我们从文件名字 atomic_linux_x86.inline.hpp 中就能获取到一些信息,说明到现在位置已经跟具体的平台和具体的cpu的型号关系了,在x86平台上linux版本的实现,那么CAS是怎么实现的呢?

看这条指令__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" 不知道你明白没有,看到这我们可以下一条结论,原来在CPU层级有一条指令,叫scmpxchg

到这一步,你是不是觉得自己已经对于CAS已经理解的很透彻了呢?

你有没有想过cmpxchg这条指令是原子操作吗?

https://gitee.com/lienhui68/picStore/raw/master/null/20200728085708.png

这个图怎么解释呢,就是一个线程a 取到了0的值,把0改为了1 ,正要把新值写回内存的时候,线程2抢先一步修改了内存中0的值,会不会有这样的情况发生呢,答案是肯定的,在并发层级很高的情况下,这种情况是完全可能发生的。问题就在于cmpxchg这条指令是不是原子性的。虽然在汇编层级有一条指令支持CAS,但是很遗憾它不是原子性的,那么怎么保证这条指令是原子性的呢?

不知道你们有没有注意到LOCK_IF_MP(%4),从外观来看像是加锁的操作,我们进到LOCK_IF_MP这个方法内

1
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

前面的汇编我们忽略,你一定注意到了lock;1这是什么意思呢,从方法名字看lock if mp ,mp的全称是Multi Processor,多cpu,意思是什么呢,就是看你操作系统有多少个处理器,若果只有一个cpu一核的话就不需要原子性了,一定是顺序执行的,如果是多核心多cpu前面就要加lock,所以最红能够实现CAS的汇编指令就被我们揪出来了。最终的汇编指令是lock cmpxchg 指令,lock指令在执行后面指令的时候锁定一个北桥信号(一个电信号,锁定北桥信号比锁定总线轻量一些,感兴趣的自己百度)。

所以如果你是多核或者多个cpu,CPU在执行cmpxchg指令之前会执行lock锁定总线,实际是锁定北桥信号。我不释放这把锁谁也过不去,以此来保证cmpxchg的原子性。

当cpu发现lock指令会立即做两件事

  1. 将当前内核中线程工作内存中该共享变量刷新到主存;
  2. 通知其他内核里缓存的该共享变量内存地址无效;

底层有两种实现方式

  1. 缓存一致性协议,如mesi

  2. 锁总线

总结 CAS并不是真的无锁。 记住这条指令lock cmpxch!lock cmpxchg!lock cmpxchg!重要的事情说三遍!面试中这就是你的亮点!

使用场景

  • CAS 适合简单对象的操作,比如布尔值、整型值等;

  • CAS 适合冲突较少的情况,如果太多线程在同时自旋,那么长时间循环会导致 CPU 开销很大;

    数据库里的乐观锁也是用到了cas, update customer_addition set integral=integral + 2,version = version+1 where id=#{id} and version= #{version};

ABA问题

CAS 存在一个问题,就是一个值从 A 变为 B ,又从 B 变回了 A,这种情况下,CAS 会认为值没有发生过变化,但实际上是有变化的。对此,并发包下倒是有 AtomicStampedReference 提供了根据版本号判断的实现,可以解决一部分问题。基础类型简单值不需要版本号

这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果二者都相等,才使用CAS设置为新的值和标志。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

cas和mesi

在x86架构上,CAS被翻译为”lock cmpxchg…“,当两个core同时执行针对同一地址的CAS指令时,其实他们是在试图修改每个core自己持有的Cache line,

假设两个core都持有相同地址对应cacheline,且各自cacheline 状态为S, 这时如果要想成功修改,就首先需要把S转为E或者M, 则需要向其它core invalidate 这个地址的cacheline,则两个core都会向ring bus(环形总线))发出 invalidate这个操作, 那么在ringbus上就会根据特定的设计协议仲裁是core0,还是core1能赢得这个invalidate, 胜者完成操作, 失败者需要接受结果, invalidate自己对应的cacheline,再读取胜者修改后的值, 回到起点.

对于我们的CAS操作来说, 其实锁并没有消失,只是转嫁到了ring bus的总线仲裁协议中. 而且大量的多核同时针对一个地址的CAS操作会引起反复的互相invalidate 同一cacheline, 造成pingpong效应, 同样会降低性能(参考[9])。当然如果真的有性能问题,我觉得这可能会在ns级别体现了,一般的应用程序中使用CAS应该不会引起性能问题

循环时间长开销大

CAS多与自旋结合。如果自旋CAS长时间不成功,会占用大量的CPU资源。

解决思路是让JVM支持处理器提供的pause指令

pause指令能让自旋失败时cpu睡眠一小段时间再继续自旋,从而使得读操作的频率低很多,为解决内存顺序冲突而导致的CPU流水线重排的代价也会小很多。

参考

CAS你以为你真的懂?