国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術(shù)文章
文章詳情頁

Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解

瀏覽:2日期:2022-08-12 15:21:34
目錄一、ConcurrentLinkedQueue介紹二、構(gòu)造方法三、入隊(duì) 四、出隊(duì)五、總結(jié)一、ConcurrentLinkedQueue介紹

并編程中,一般需要用到安全的隊(duì)列,如果要自己實(shí)現(xiàn)安全隊(duì)列,可以使用2種方式:方式1:加鎖,這種實(shí)現(xiàn)方式就是我們常說的阻塞隊(duì)列。方式2:使用循環(huán)CAS算法實(shí)現(xiàn),這種方式實(shí)現(xiàn)隊(duì)列稱之為非阻塞隊(duì)列。從點(diǎn)到面, 下面我們來看下非阻塞隊(duì)列經(jīng)典實(shí)現(xiàn)類:ConcurrentLinkedQueue (JDK1.8版)

ConcurrentLinkedQueue 是一個基于鏈接節(jié)點(diǎn)的無界線程安全的隊(duì)列。當(dāng)我們添加一個元素的時候,它會添加到隊(duì)列的尾部,當(dāng)我們獲取一個元素時,它會返回隊(duì)列頭部的元素。它采用了“wait-free”算法來實(shí)現(xiàn),用CAS實(shí)現(xiàn)了非阻塞的線程安全隊(duì)列。當(dāng)多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當(dāng)?shù)倪x擇。此隊(duì)列不允許使用 null 元素,因?yàn)橐瞥貢r實(shí)際是將節(jié)點(diǎn)中item置為null,如果元素本身為null,則跟刪除有沖突

我們首先看一下ConcurrentLinkedQueue的類圖結(jié)構(gòu)先,好有一個內(nèi)部邏輯有一個大概的印象,如下圖所示:

Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解

主要屬性head節(jié)點(diǎn),tail節(jié)點(diǎn)

// 鏈表頭節(jié)點(diǎn)private transient volatile Node<E> head;// 鏈表尾節(jié)點(diǎn)private transient volatile Node<E> tail;

主要內(nèi)部類Node

類Node在static方法里獲取到item和next的內(nèi)存偏移量,之后通過casItem和casNext更改item值和next節(jié)點(diǎn)

private static class Node<E> { volatile E item; volatile Node<E> next; /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E item) {//將item存放在本節(jié)點(diǎn)的itemOffset偏移量位置的內(nèi)存里UNSAFE.putObject(this, itemOffset, item);//設(shè)置this對象的itemoffset位置 } //更新item值 boolean casItem(E cmp, E val) { //this對象的itemoffset位置存放的值如果和期望值cmp相等,則替換為valreturn UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { //this對象的nextOffset位置存入valUNSAFE.putOrderedObject(this, nextOffset, val); } //更新next節(jié)點(diǎn)值 boolean casNext(Node<E> cmp, Node<E> val) { //this對象的nextOffset位置存放的值如果和期望值cmp相等,則替換為valreturn UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; //當(dāng)前節(jié)點(diǎn)存放的item的內(nèi)存偏移量 private static final long itemOffset; //當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn)的內(nèi)存偏移量 private static final long nextOffset; static {try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('item')); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('next'));} catch (Exception e) { throw new Error(e);} }}

concurrentlinkedqueue同樣在static方法里獲取到head和tail的內(nèi)存偏移量:之后通過casHead和casTail更改head節(jié)點(diǎn)和tail節(jié)點(diǎn)值

static { try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentLinkedQueue.class;headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField('head'));tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField('tail')); } catch (Exception e) {throw new Error(e); }} private boolean casTail(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);} private boolean casHead(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);}二、構(gòu)造方法 無參構(gòu)造函數(shù),head=tail=new Node<E>(null)=空節(jié)點(diǎn)(里面無item值) 集合構(gòu)造函數(shù)(集合中每個元素不能為null):就是將集合中的元素挨個鏈起來

//無參構(gòu)造函數(shù),head=tail=new Node<E>(null)=空節(jié)點(diǎn)//初始一個為空的ConcurrentLinkedQueue,此時head和tail都指向一個item為null的節(jié)點(diǎn)public ConcurrentLinkedQueue() { // 初始化頭尾節(jié)點(diǎn) head = tail = new Node<E>(null);} //集合構(gòu)造函數(shù):就是將集合中的元素挨個鏈起來public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) {checkNotNull(e);Node<E> newNode = new Node<E>(e);if (h == null) h = t = newNode;else { t.lazySetNext(newNode);//可以理解為一種懶加載, 將t的next值設(shè)置為newNode t = newNode;} } if (h == null)h = t = new Node<E>(null); head = h; tail = t;} private static void checkNotNull(Object v) { if (v == null)throw new NullPointerException();} //putObjectVolatile的內(nèi)存非立即可見版本,//寫后結(jié)果并不會被其他線程看到,通常是幾納秒后被其他線程看到,這個時間比較短,所以代價(jià)可以接收void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val);}三、入隊(duì)

獲取到當(dāng)前尾節(jié)點(diǎn)p=tail:

如果p.next=null,代表是真正的尾節(jié)點(diǎn),將新節(jié)點(diǎn)鏈入p.next=newNode。此時檢查tail是否還是p,如果不是p了,此時更新tail為最新的newNode(只有在tail節(jié)點(diǎn)后面tail.next成功添加的元素才不需要更新tail,其實(shí)更新不更新tail是交替的,即每添加倆次更新一次tail)。 如果p.next=p,此時其實(shí)是p.next==p==null,此時代表p被刪除了,此時需要從新的tail節(jié)點(diǎn)檢查,如果此時tail節(jié)點(diǎn)還是原來的tail(原來的tail在p前面,肯定也被刪除了),那就只能從head節(jié)點(diǎn)開始遍歷了 如果p.next!=null,代表有別的線程搶先添加元素了,此時需要繼續(xù)p=p.next遍歷獲取是null的節(jié)點(diǎn)(此時需要如果tail變了就使用新的tail往后遍歷)

public boolean offer (E e){ //先檢查元素是否為null,是null則拋出異常 不是null,則構(gòu)造新節(jié)點(diǎn)準(zhǔn)備入隊(duì) checkNotNull(e); final Node<E> newNode = new Node<E>(e); //初始p指針和t指針都指向尾節(jié)點(diǎn),p指針用來向隊(duì)列后面推移,t指針用來判斷尾節(jié)點(diǎn)是否改變 Node<E> t = tail, p = t; for (; ; ) {Node<E> q = p.next;if (q == null) {//p.next為null,則代表p為尾節(jié)點(diǎn),則將p.next指向新節(jié)點(diǎn) // p is last node if (p.casNext(null, newNode)) {/** * 如果p!=t,即p向后推移了,t沒動,則此時同時將tail更新 * 不符合條件不更新tail,這里可以看出并不是每入隊(duì)一個節(jié)點(diǎn)都會更新tail的 * 而此時真正的尾節(jié)點(diǎn)其實(shí)是newNode了,所以tail不一定是真正的尾節(jié)點(diǎn), * tail的更新具有滯后性,這樣設(shè)計(jì)提高了入隊(duì)的效率,不用每入隊(duì)一個,更新一次 *尾節(jié)點(diǎn) */if (p != t) casTail(t, newNode); // Failure is OK.return true; } // Lost CAS race to another thread; re-read next} else if (p == q)/** * 如果p.next和p相等,這種情況是出隊(duì)時的一種哨兵節(jié)點(diǎn)代表已被遺棄刪除, * 那就是有線程在一直刪除節(jié)點(diǎn),刪除到了p.next 那此時如果有線程已經(jīng)更新了tail,那就從p指向tail再開始繼續(xù)像后推移 * 如果始終沒有線程更新tail,則p指針從head開始向后推移 * * p從head開始推移的原因:tail沒有更新,以前的tail肯定在哨兵節(jié)點(diǎn)的前面(因?yàn)榇搜h(huán)是從tail向后推移到哨兵節(jié)點(diǎn)的), * 而head節(jié)點(diǎn)一定在哨兵節(jié)點(diǎn)的后面(出隊(duì)時只有更新了head節(jié)點(diǎn),才會把前面部分的某個節(jié)點(diǎn)置為哨兵節(jié)點(diǎn)) * 此時其實(shí)是一種tail在head之前,但實(shí)際上tail已經(jīng)無用了,哨兵之前的節(jié)點(diǎn)都無用了, * 等著其他線程入隊(duì)時更新尾節(jié)點(diǎn)tail,此時的tail才有用所以從head開始,從head開始可以找到任何節(jié)點(diǎn) * */ p = (t != (t = tail)) ? t : head;else/** * p.next和p不相等時,此時p應(yīng)該向后推移到p.next,即p=p.next, * 如果next一直不為null一直定位不到尾節(jié)點(diǎn),會一直next, * 但是中間會優(yōu)先判斷tail是否已更新,如果tail已更新則p直接從tail向后推移即可。就沒必要一直next了。 */ // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; }}四、出隊(duì)

poll出隊(duì):獲取到當(dāng)前頭節(jié)點(diǎn)p=head:如果成功設(shè)置了item為null,即p.catItem(item,null),如果此時被其他線程搶走消費(fèi)了,此時需要p=p.next,向后繼續(xù)爭搶消費(fèi),直到成功執(zhí)行p.catItem(item,null),此時檢查p是不是head節(jié)點(diǎn),如果不是更新p.next為頭結(jié)點(diǎn)

public E poll() { restartFromHead: for (;;) {// p節(jié)點(diǎn)表示首節(jié)點(diǎn),即需要出隊(duì)的節(jié)點(diǎn)for (Node<E> h = head, p = h, q;;) { E item = p.item; // 如果p節(jié)點(diǎn)的元素不為null,則通過CAS來設(shè)置p節(jié)點(diǎn)引用的元素為null,如果成功則返回p節(jié)點(diǎn)的元素 if (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.// 如果p != h,則更新headif (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p);return item; } // 如果頭節(jié)點(diǎn)的元素為空或頭節(jié)點(diǎn)發(fā)生了變化,這說明頭節(jié)點(diǎn)已經(jīng)被另外一個線程修改了。 // 那么獲取p節(jié)點(diǎn)的下一個節(jié)點(diǎn),如果p節(jié)點(diǎn)的下一節(jié)點(diǎn)為null,則表明隊(duì)列已經(jīng)空了 else if ((q = p.next) == null) {// 更新頭結(jié)點(diǎn)updateHead(h, p);return null; } // p == q,則使用新的head重新開始 else if (p == q)continue restartFromHead; // 如果下一個元素不為空,則將頭節(jié)點(diǎn)的下一個節(jié)點(diǎn)設(shè)置成頭節(jié)點(diǎn) elsep = q;} }}五、總結(jié)

offer:

找到尾節(jié)點(diǎn),將新節(jié)點(diǎn)鏈入到尾節(jié)點(diǎn)后面,tail.next=newNode,

由于多線程操作,所以拿到p=tail后cas操作執(zhí)行p.next=newNode可能由于被其他線程搶去而執(zhí)行不成功,此時需要p=p.next向后遍歷,直到找到p.next=null的目標(biāo)節(jié)點(diǎn)。繼續(xù)嘗試向其后面添加元素,添加成功后檢查p是否是tail,如果不是tail,則更新tail=p,添加不成功繼續(xù)向后next遍歷

poll:

獲取到當(dāng)前頭節(jié)點(diǎn)p=head:如果成功設(shè)置了item為null,即p.catItem(item,null),

如果此時被其他線程搶走消費(fèi)了,此時需要p=p.next,向后繼續(xù)爭搶消費(fèi),直到成功執(zhí)行p.catItem(item,null),此時檢查p是不是head節(jié)點(diǎn),如果不是更新頭結(jié)點(diǎn)head=p.next(因?yàn)閜已經(jīng)刪除了)

更新tail和head:

不是每次添加都更新tail,而是間隔一次更新一次(head也是一樣道理):第一個搶到的線程拿到tail執(zhí)行成功tail.next=newNode1此時不更新tail,那么第二個線程再執(zhí)行成功添加p.next=newNode2會判斷出p是newNode1而不是tail,所以就更新tail為newNode2。

tail節(jié)點(diǎn)不總是最后一個,head節(jié)點(diǎn)不總是第一個設(shè)計(jì)初衷:

讓tail節(jié)點(diǎn)永遠(yuǎn)作為隊(duì)列的尾節(jié)點(diǎn),這樣實(shí)現(xiàn)代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個缺點(diǎn)就是每次都需要使用循環(huán)CAS更新tail節(jié)點(diǎn)。如果能減少CAS更新tail節(jié)點(diǎn)的次數(shù),就能提高入隊(duì)的效率。

在JDK 1.7的實(shí)現(xiàn)中,doug lea使用hops變量來控制并減少tail節(jié)點(diǎn)的更新頻率,并不是每次節(jié)點(diǎn)入隊(duì)后都將 tail節(jié)點(diǎn)更新成尾節(jié)點(diǎn),而是當(dāng)tail節(jié)點(diǎn)和尾節(jié)點(diǎn)的距離大于等于常量HOPS的值(默認(rèn)等于1)時才更新tail節(jié)點(diǎn),tail和尾節(jié)點(diǎn)的距離越長使用CAS更新tail節(jié)點(diǎn)的次數(shù)就會越少,但是距離越長帶來的負(fù)面效果就是每次入隊(duì)時定位尾節(jié)點(diǎn)的時間就越長,因?yàn)檠h(huán)體需要多循環(huán)一次來定位出尾節(jié)點(diǎn),但是這樣仍然能提高入隊(duì)的效率,因?yàn)閺谋举|(zhì)上來看它通過增加對volatile變量的讀操作來減少了對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠(yuǎn)遠(yuǎn)大于讀操作,所以入隊(duì)效率會有所提升。

在JDK 1.8的實(shí)現(xiàn)中,tail的更新時機(jī)是通過p和t是否相等來判斷的,其實(shí)現(xiàn)結(jié)果和JDK 1.7相同,即當(dāng)tail節(jié)點(diǎn)和尾節(jié)點(diǎn)的距離大于等于1時,更新tail。

到此這篇關(guān)于Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解的文章就介紹到這了,更多相關(guān)Java ConcurrentLinkedQueue源碼內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 欧美日韩一区二区三区四区在线观看 | 成 人 动漫在线观看网站网站 | 久草在线在线 | 国产爱啪啪 | 免费观看一级一片 | 免费看片aⅴ免费大片 | 很黄很暴力深夜爽爽无遮挡 | 日本人的色道www免费一区 | 久草青青 | 亚洲欧洲无码一区二区三区 | 亚洲欧美在线观看视频 | 国产欧美一区二区三区精品 | 欧美久久久久 | 成人无遮挡毛片免费看 | 久久视频免费 | 精品成人免费一区二区在线播放 | 国产a∨一区二区三区香蕉小说 | 午夜影院福利社 | 日韩精品一区二三区中文 | 日韩中文字幕视频在线 | 在线播放免费播放av片 | 一个人免费观看日本www视频 | 亚洲成av人在线视 | 午夜日b视频 | 一本一本久久a久久精品综合麻豆 | 老司机毛片 | 国产高清精品自在线看 | 性欧美高清久久久久久久 | 99热久久国产综合精品久久国产 | 久久成人精品免费播放 | 精品免费久久久久欧美亚一区 | 国产精品国色综合久久 | 欠草视频| 古代级a毛片可以免费看 | 久草在线免费播放 | 99热com| 毛片美国 | 久草3| 一级视频免费观看 | 国产精品亚洲第一区柳州莫青 | 亚洲精品国产第一区第二区国 |