前言

Exchanger应该算并发包中工具使用相对少的,因为它主要用于线程之间交换数据,它的用法比较简单在不同线程之间使用exchange方法交换数据,但是内部实现比较巧妙,使用了unsafe的CAS原子操作、自旋来解决冲突问题,下面我们通过源码一探究竟。

源码

先看看源码注释中关于核心算法的介绍

复制代码
        for (;;) {             if (slot is empty) {                  // slot为空时,将item 设置到Node 中                                   place item in a Node;                 if (can CAS slot from empty to node) {                     // 当将node通过CAS交换到slot中时,挂起线程等待被唤醒                    wait for release;                     // 被唤醒后返回node中匹配到的item                    return matching item in node;                 }             } else if (can CAS slot from node to empty) { // release                 // 将slot设置为空                 // 获取node中的item,将需要交换的数据设置到匹配的item                get the item in node;                 set matching item in node;                 // 唤醒等待的线程                release waiting thread;             }             // else retry on CAS failure        }
复制代码

比如有2条线程A和B,A线程交换数据时,发现slot为空,则将需要交换的数据放在slot中等待其它线程进来交换数据,等线程B进来,读取A设置的数据,然后设置线程B需要交换的数据,然后唤醒A线程,原理就是这么简单。当时当多个线程之间进行交换数据时就会出现问题,所以Exchanger加入了slot数组。

Exchanger 属性及构造器

复制代码
    // 用于左移Node数组下标,从而得出数据在内存中的偏移量来获取数据,避免伪共享    private static final int ASHIFT = 7;     // note数组最大下标    private static final int MMASK = 0xff;     // 用于递增bound,每次加一个SEQ    private static final int SEQ = MMASK + 1;     // CPU核心数    private static final int NCPU = Runtime.getRuntime().availableProcessors();     // 当前数组最大的下标(多处理器情况下)    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;     // 自旋次数,CPU核心为1个时,自旋被禁用    private static final int SPINS = 1 << 10;     // 空对象,用于当线程exchange方法中参数为null时传递给其他线程的对象    private static final Object NULL_ITEM = new Object();     // 用于超时时传递的对象    private static final Object TIMED_OUT = new Object();     // Participa