Java并發(fā)中的Fork/Join 框架機(jī)制詳解
Fork/Join 框架是一種在 JDk 7 引入的線程池,用于并行執(zhí)行把一個(gè)大任務(wù)拆成多個(gè)小任務(wù)并行執(zhí)行,最終匯總每個(gè)小任務(wù)結(jié)果得到大任務(wù)結(jié)果的特殊任務(wù)。通過(guò)其命名也很容易看出框架主要分為 Fork 和 Join 兩個(gè)階段,第一階段 Fork 是把一個(gè)大任務(wù)拆分為多個(gè)子任務(wù)并行的執(zhí)行,第二階段 Join 是合并這些子任務(wù)的所有執(zhí)行結(jié)果,最后得到大任務(wù)的結(jié)果。
這里不難發(fā)現(xiàn)其執(zhí)行主要流程:首先判斷一個(gè)任務(wù)是否足夠小,如果任務(wù)足夠小,則直接計(jì)算,否則,就拆分成幾個(gè)更小的小任務(wù)分別計(jì)算,這個(gè)過(guò)程可以反復(fù)的拆分成一系列小任務(wù)。Fork/Join 框架是一種基于 分治 的算法,通過(guò)拆分大任務(wù)成多個(gè)獨(dú)立的小任務(wù),然后并行執(zhí)行這些小任務(wù),最后合并小任務(wù)的結(jié)果得到大任務(wù)的最終結(jié)果,通過(guò)并行計(jì)算以提高效率。。
Fork/Join 框架使用示例下面通過(guò)一個(gè)計(jì)算列表中所有元素的總和的示例來(lái)看看 Fork/Join 框架是如何使用的,總的思路是:將這個(gè)列表分成許多子列表,然后對(duì)每個(gè)子列表的元素進(jìn)行求和,然后,我們?cè)儆?jì)算所有這些值的總和就得到原始列表的和了。Fork/Join 框架中定義了 ForkJoinTask 來(lái)表示一個(gè) Fork/Join 任務(wù),其提供了 fork()、join() 等操作,通常情況下,我們并不需要直接繼承這個(gè) ForkJoinTask 類,而是使用框架提供的兩個(gè) ForkJoinTask 的子類:
RecursiveAction 用于表示沒(méi)有返回結(jié)果的 Fork/Join 任務(wù)。 RecursiveTask 用于表示有返回結(jié)果的 Fork/Join 任務(wù)。很顯然,在這個(gè)示例中是需要返回結(jié)果的,可以定義 SumAction 類繼承自 RecursiveTask,代碼入下:
/** * @author mghio * @since 2021-07-25 */public class SumTask extends RecursiveTask<Long> { private static final int SEQUENTIAL_THRESHOLD = 50; private final List<Long> data; public SumTask(List<Long> data) { this.data = data; } @Override protected Long compute() { if (data.size() <= SEQUENTIAL_THRESHOLD) { long sum = computeSumDirectly(); System.out.format('Sum of %s: %dn', data.toString(), sum); return sum; } else { int mid = data.size() / 2; SumTask firstSubtask = new SumTask(data.subList(0, mid)); SumTask secondSubtask = new SumTask(data.subList(mid, data.size())); // 執(zhí)行子任務(wù) firstSubtask.fork(); secondSubtask.fork(); // 等待子任務(wù)執(zhí)行完成,并獲取結(jié)果 long firstSubTaskResult = firstSubtask.join(); long secondSubTaskResult = secondSubtask.join(); return firstSubTaskResult + secondSubTaskResult; } } private long computeSumDirectly() { long sum = 0; for (Long l : data) { sum += l; } return sum; } public static void main(String[] args) { Random random = new Random(); List<Long> data = random.longs(1_000, 1, 100).boxed().collect(Collectors.toList()); ForkJoinPool pool = new ForkJoinPool(); SumTask task = new SumTask(data); pool.invoke(task); System.out.println('Sum: ' + pool.invoke(task)); }}
這里當(dāng)列表大小小于 SEQUENTIAL_THRESHOLD 變量的值(閾值)時(shí)視為小任務(wù),直接計(jì)算求和列表元素結(jié)果,否則再次拆分為小任務(wù),運(yùn)行結(jié)果如下:
通過(guò)這個(gè)示例代碼可以發(fā)現(xiàn),F(xiàn)ork/Join 框架 中 ForkJoinTask 任務(wù)與平常的一般任務(wù)的主要不同點(diǎn)在于:ForkJoinTask 需要實(shí)現(xiàn)抽象方法 compute() 來(lái)定義計(jì)算邏輯,在這個(gè)方法里一般通用的實(shí)現(xiàn)模板是,首先先判斷當(dāng)前任務(wù)是否是小任務(wù),如果是,就執(zhí)行執(zhí)行任務(wù),如果不是小任務(wù),則再次拆分為兩個(gè)子任務(wù),然后當(dāng)每個(gè)子任務(wù)調(diào)用 fork() 方法時(shí),會(huì)再次進(jìn)入到 compute() 方法中,檢查當(dāng)前任務(wù)是否需要再拆分為子任務(wù),如果已經(jīng)是小任務(wù),則執(zhí)行當(dāng)前任務(wù)并返回結(jié)果,否則繼續(xù)分割,最后調(diào)用 join() 方法等待所有子任務(wù)執(zhí)行完成并獲得執(zhí)行結(jié)果。偽代碼如下:
if (problem is small) { directly solve problem.} else { Step 1. split problem into independent parts. Step 2. fork new subtasks to solve each part. Step 3. join all subtasks. Step 4. compose result from subresults.}Fork/Join 框架設(shè)計(jì)
Fork/Join 框架核心思想是把一個(gè)大任務(wù)拆分成若干個(gè)小任務(wù),然后匯總每個(gè)小任務(wù)的結(jié)果最終得到大任務(wù)的結(jié)果,如果讓你設(shè)計(jì)一個(gè)這樣的框架,你會(huì)如何實(shí)現(xiàn)呢?(建議思考一下),F(xiàn)ork/Join 框架的整個(gè)流程正如其名所示,分為兩個(gè)步驟:
大任務(wù)分割 需要有這么一個(gè)的類,用來(lái)將大任務(wù)拆分為子任務(wù),可能一次拆分后的子任務(wù)還是比較大,需要多次拆分,直到拆分出來(lái)的子任務(wù)符合我們定義的小任務(wù)才結(jié)束。 執(zhí)行任務(wù)并合并任務(wù)結(jié)果 第一步拆分出來(lái)的子任務(wù)分別存放在一個(gè)個(gè) 雙端隊(duì)列 里面(P.S. 這里為什么要使用雙端隊(duì)列請(qǐng)看下文),然后每個(gè)隊(duì)列啟動(dòng)一個(gè)線程從隊(duì)列中獲取任務(wù)執(zhí)行。這些子任務(wù)的執(zhí)行結(jié)果都會(huì)放到一個(gè)統(tǒng)一的隊(duì)列中,然后再啟動(dòng)一個(gè)線程從這個(gè)隊(duì)列中拿數(shù)據(jù),最后合并這些數(shù)據(jù)返回。Fork/Join 框架使用了如下兩個(gè)類來(lái)完成以上兩個(gè)步驟:
ForkJoinTask 類 在上文的實(shí)例中也有提到,表示 ForkJoin 任務(wù),在使用框架時(shí)首先必須先定義任務(wù),通常只需要繼承自 ForkJoinTask 類的子類 RecursiveAction(無(wú)返回結(jié)果) 或者 RecursiveTask(有返回結(jié)果)即可。 ForkJoinPool 從名字也可以猜到一二了,就是用來(lái)執(zhí)行 ForkJoinTask 的線程池。大任務(wù)拆分出的子任務(wù)會(huì)添加到當(dāng)前線程的雙端隊(duì)列的頭部。喜歡思考的你,心中想必會(huì)想到這么一種場(chǎng)景,當(dāng)我們需要完成一個(gè)大任務(wù)時(shí),會(huì)先把這個(gè)大任務(wù)拆分為多個(gè)獨(dú)立的子任務(wù),這些子任務(wù)會(huì)放到獨(dú)立的隊(duì)列中,并為每個(gè)隊(duì)列都創(chuàng)建一個(gè)單獨(dú)的線程去執(zhí)行隊(duì)列里的任務(wù),即這里線程和隊(duì)列時(shí)一對(duì)一的關(guān)系,那么當(dāng)有的線程可能會(huì)先把自己隊(duì)列的任務(wù)執(zhí)行完成了,而有的線程則沒(méi)有執(zhí)行完成,這就導(dǎo)致一些先執(zhí)行完任務(wù)的線程干等了,這是個(gè)好問(wèn)題。
既然是做并發(fā)的,肯定要最大程度壓榨計(jì)算機(jī)的性能,對(duì)于這種場(chǎng)景并發(fā)大師 Doug Lea 使用了工作竊取算法處理,使用工作竊取算法后,先完成自己隊(duì)列中任務(wù)的線程會(huì)去其它線程的隊(duì)列中”竊取“一個(gè)任務(wù)來(lái)執(zhí)行,哈哈,一方有難,八方支援。但是此時(shí)這個(gè)線程和隊(duì)列的持有線程會(huì)同時(shí)訪問(wèn)同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)的線程和被竊取任務(wù)的線程之間的競(jìng)爭(zhēng),F(xiàn)orkJoin 選擇了雙端隊(duì)列這種數(shù)據(jù)結(jié)構(gòu),這樣就可以按照這種規(guī)則執(zhí)行任務(wù)了:被竊取任務(wù)的線程始終從隊(duì)列頭部獲取任務(wù)并執(zhí)行,竊取任務(wù)的線程使用從隊(duì)列尾部獲取任務(wù)執(zhí)行。這個(gè)算法在絕大部分情況下都可以充分利用多線程進(jìn)行并行計(jì)算,但是在雙端隊(duì)列里只有一個(gè)任務(wù)等極端情況下還是會(huì)存在一定程度的競(jìng)爭(zhēng)。
Fork/Join 框架實(shí)現(xiàn)原理
Fork/Join 框架的實(shí)現(xiàn)核心是 ForkJoinPool 類,該類的重要組成部分為 ForkJoinTask 數(shù)組和 ForkJoinWorkerThread 數(shù)組,其中 ForkJoinTask 數(shù)組用來(lái)存放框架使用者給提交給 ForkJoinPool 的任務(wù),F(xiàn)orkJoinWorkerThread 數(shù)組則負(fù)責(zé)執(zhí)行這些任務(wù)。任務(wù)有如下四種狀態(tài):
NORMAL 已完成
CANCELLED 被取消
SIGNAL 信號(hào)
EXCEPTIONAL 發(fā)生異常
下面來(lái)看看這兩個(gè)類的核心方法實(shí)現(xiàn)原理,首先來(lái)看 ForkJoinTask 的 fork() 方法,源碼如下:
方法對(duì)于 ForkJoinWorkerThread 類型的線程,首先會(huì)調(diào)用 ForkJoinWorkerThread 的 workQueue 的 push() 方法異步的去執(zhí)行這個(gè)任務(wù),然后馬上返回結(jié)果。繼續(xù)跟進(jìn) ForkJoinPool 的 push() 方法,源碼如下:
方法將當(dāng)前任務(wù)添加到 ForkJoinTask 任務(wù)隊(duì)列數(shù)組中,然后再調(diào)用 ForkJoinPool 的 signalWork 方法創(chuàng)建或者喚醒一個(gè)工作線程來(lái)執(zhí)行該任務(wù)。然后再來(lái)看看 ForkJoinTask 的 join() 方法,方法源碼如下:
方法首先調(diào)用了 doJoin() 方法,該方法返回當(dāng)前任務(wù)的狀態(tài),根據(jù)返回的任務(wù)狀態(tài)做不同的處理:
已完成狀態(tài)則直接返回結(jié)果 被取消狀態(tài)則直接拋出異常(CancellationException) 發(fā)生異常狀態(tài)則直接拋出對(duì)應(yīng)的異常繼續(xù)跟進(jìn) doJoin() 方法,方法源碼如下:
方法首先判斷當(dāng)前任務(wù)狀態(tài)是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成則直接返回任務(wù)狀態(tài)。如果沒(méi)有執(zhí)行完成,則從任務(wù)數(shù)組中(workQueue)取出任務(wù)并執(zhí)行,任務(wù)執(zhí)行完成則設(shè)置任務(wù)狀態(tài)為 NORMAL,如果出現(xiàn)異常則記錄異常并設(shè)置任務(wù)狀態(tài)為 EXCEPTIONAL(在 doExec() 方法中)。
總結(jié)本文主要介紹了 Java 并發(fā)框架中的 Fork/Join 框架的基本原理和其使用的工作竊取算法(work-stealing)、設(shè)計(jì)方式和部分實(shí)現(xiàn)源碼。Fork/Join 框架在 JDK 的官方標(biāo)準(zhǔn)庫(kù)中也有應(yīng)用。比如 JDK 1.8+ 標(biāo)準(zhǔn)庫(kù)提供的 Arrays.parallelSort(array) 可以進(jìn)行并行排序,它的原理就是內(nèi)部通過(guò) Fork/Join 框架對(duì)大數(shù)組分拆進(jìn)行并行排序,可以提高排序的速度,還有集合中的 Collection.parallelStream() 方法底層也是基于 Fork/Join 框架實(shí)現(xiàn)的,最后就是定義小任務(wù)的閾值往往是需要通過(guò)測(cè)試驗(yàn)證才能合理給出,并且保證程序可以達(dá)到最好的性能。
到此這篇關(guān)于Java 并發(fā)中的Fork/Join 框架機(jī)制詳解的文章就介紹到這了,更多相關(guān)Java Fork/Join 框架內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
1. 基于 Python 實(shí)踐感知器分類算法2. Python如何批量生成和調(diào)用變量3. ASP.Net Core對(duì)USB攝像頭進(jìn)行截圖4. ajax動(dòng)態(tài)加載json數(shù)據(jù)并詳細(xì)解析5. Python 中如何使用 virtualenv 管理虛擬環(huán)境6. python利用opencv實(shí)現(xiàn)顏色檢測(cè)7. 通過(guò)CSS數(shù)學(xué)函數(shù)實(shí)現(xiàn)動(dòng)畫特效8. ASP.Net Core(C#)創(chuàng)建Web站點(diǎn)的實(shí)現(xiàn)9. ASP.NET MVC實(shí)現(xiàn)橫向展示購(gòu)物車10. windows服務(wù)器使用IIS時(shí)thinkphp搜索中文無(wú)效問(wèn)題
