加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

java并发-18-ConcurrentLinkedList设计

(2018-04-22 19:44:26)
分类: java

 

 

一、           基本结构

在并发编程中,有时需要使用线程安全的队列。实现一个线程安全的队列有两种方式:一是使用阻塞方式,在入队、出队、读取加锁;另一种可以使用非阻塞方式,即利用CASDoug Lee就使用了非阻塞方式设计了ConcurrentLinkedList,类图如下:

http://s15/mw690/002i2qf8zy7jTyRs7Cude&690

ConcurrentLinkedQueuehead节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

下面为了简化,我们自己写了一个类似的队列。

private static class MyConcurrentLinkedQueue {
private transient volatile MyNodehead;
    private transient volatile
MyNodetail;

    private static long
headOffset;
    private static long
tailOffset;

    private static
Unsafe unsafe;
    static
{
try {
            Field field =Unsafe.
class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
Class<?> k = MyConcurrentLinkedQueue.class;
headOffset= unsafe.objectFieldOffset
                    (k.getDeclaredField(
"head"));
tailOffset= unsafe.objectFieldOffset
                    (k.getDeclaredField(
"tail"));
} catch (NoSuchFieldException e) {
e.printStackTrace()
;
} catch (IllegalAccessException e) {
e.printStackTrace()
;
}
    }

MyConcurrentLinkedQueue() {
head = tail = new MyNode(null);
}

public void setFirstTail(MyNode node) {
tail.casNext(null, node);
System.out.println();
}

final void updateHead(MyNode h, MyNode p) {
if (h != p &&casHead(h, p))
h.lazySetNext(h)
;
}

public booleancasHead(MyNodecmp, MyNodeval) {
return unsafe.compareAndSwapObject(this, headOffset, cmp, val);
}

public booleancasTail(MyNodecmp, MyNodeval) {
return unsafe.compareAndSwapObject(this, tailOffset, cmp, val);
}

public booleanoffer(String e) {
final MyNodenewNode = new MyNode(e);
        for
(MyNode t = tail, p = t;;) {
MyNode q = p.
next;
            if
(q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
if(p != t) // hop two nodes at a time
casTail(t, newNode);  // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
p = (t != (t = tail)) ? t :head;
            else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
    }

public String peek() {
restartFromHead:
for (;;) {
for(MyNode h = head, p = h, q;;) {
                String item = p.
item;
                if
(item != null || (q = p.next) == null) {
updateHead(h
, p);
                    return
item;
}
else if (p == q)
continue restartFromHead;
                else
p = q;
}
        }
    }
}

private static class MyNode {
private volatile String item;
    private volatile
MyNodenext;
    private static
Unsafe unsafe;

    private static long
itemOffset;
    private static long
nextOffset;

MyNode(String item) {
unsafe.putObject(this, itemOffset, item);
}

public booleancasNext(MyNodecmp, MyNodeval) {
return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
}

void lazySetNext(MyNodeval) {
unsafe.putOrderedObject(this, nextOffset, val);
}

public void setNext(MyNode next) {
this.next= next;
}

public void setItem(String s) {
item = s;
}
public String getItem() {
return item;
}

static {
try {
            Field field =Unsafe.
class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
Class<?> k = MyNode.class;

itemOffset= unsafe.objectFieldOffset(k.getDeclaredField("item"));
nextOffset= unsafe.objectFieldOffset(k.getDeclaredField("next"));
} catch (NoSuchFieldException e) {
e.printStackTrace()
;
} catch (IllegalAccessException e) {
e.printStackTrace()
;
}
    }
}

 

二、           入队

http://s7/mw690/002i2qf8zy7jTySnnYa96&690

         由上面代码可以发现,offer中不是每次都设置新插入节点为tail。而是当前tail和新插入节点中间隔了一个节点才会重新调整tail所指引用。这样的好处是减少了每次cas设置tail的次数。

         测试代码:

@Test
public void test() {
ConcurrentLinkedQueue<String> queue =
new ConcurrentLinkedQueue<>();
queue.add("1");
System.out.println(queue.peek());
queue.add("2");
}
@Test
public void testSimulatedMyLinkedQueue() throws NoSuchFieldException, IllegalAccessException {
MyConcurrentLinkedQueuemyLinkedQueue =
new MyConcurrentLinkedQueue();
myLinkedQueue.offer("1");
System.out.println(myLinkedQueue.peek());
myLinkedQueue.offer("2");
System.out.println();
}

 

         这里有一个奇怪的现象无法解释:在testConcurrentLinkedQueue执行第一个add,从而执行p.casNext方法,导致headtail居然分道扬镳,cas没有成功对tailnext赋值为新的newNode,反而使得head的地址成为了新的newNodetailnext居然是原tail本身

所以我们在testSimulatedMyLinkedQueue模拟了这一过程,却无法还原这一奇怪现象。p.casNext成功将headtail所在地址的next改为newNodeitemnull。后面的peek方法正好又重新整合了队列,使得headitem"1"next仍然为空,原head所在地址(也就是tail)的值改为itemnullnext为该地址本身;而下一个offer("2")中,使得奇怪的tailitemnullnext为本身变成新增节点,从此开始队列一切正常。

 

三、           出队

http://s4/mw690/002i2qf8zy7jTyT1gpdc3&690

从图中可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head 节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head 节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。

 


0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有