介紹
所以,我們知道 Java 中有線程,您可以在評論“你不能用線程破壞 Java:第一部分 - 線程”中閱讀有關線程的內容。 我們再看一下範例程式碼:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
正如我們所看到的,啟動任務的程式碼非常標準,但對於每次新的啟動,我們都必須重複它。例如,一種解決方案是將其移至單獨的方法中execute(Runnable runnable)
。但 Java 開發人員已經擔心我們了,並提出了一個介面Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
正如你所看到的,程式碼變得更加簡潔,讓我們只需編寫程式碼即可Runnable
在執行緒中運行它。太棒了,不是嗎?但這只是個開始:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
有一個後代介面ExecutorService
。這個介面的 JavaDoc 說它ExecutorService
是一個特殊的Executor
“a”的描述,它提供了停止工作Executor
“a”的方法,並允許您追蹤java.util.concurrent.Future
執行的進度。之前,在「你不能用線程破壞 Java:第四部分 - Callable、Future 和 Friends」中,我們簡要回顧了可能性Future
。如果你忘記了或沒有讀過,我建議你重溫一下記憶;)JavaDoc 還有什麼有趣的地方?我們有一個特殊的工廠java.util.concurrent.Executors
,允許我們創建預設可用的實作ExecutorService
。
執行服務
讓我們再回憶一下。當建立執行緒的實作對我們隱藏時,我們必須Executor
在執行緒中執行(即執行)某個任務。我們有ExecutorService
一個特殊的Executor
,它具有一組管理執行進度的功能。我們有一家工廠Executors
可以讓您創造ExecutorService
。現在讓我們自己做:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
正如我們所看到的,我們指定了一個Fixed Thread Pool
大小為2的固定執行緒池( )。之後我們將任務一個接一個地傳送到執行緒池中。String
每個任務傳回一個包含執行緒名稱 ( ) 的字串( currentThread().getName()
)。最後關閉很重要ExecutorService
,否則我們的程式將不會退出。工廠裡Executors
還有其他工廠方法。例如,我們可以建立一個只有一個執行緒的池,newSingleThreadExecutor
或一個有快取的池newCachedThreadPool
,如果執行緒空閒 1 分鐘,就會從池中刪除它們。事實上,在這些後面ExecutorService
有一個阻塞隊列,任務被放入其中並從中執行這些任務。有關阻塞隊列的更多信息,請參閱視頻“阻塞隊列 - 集合 #5 - 高級 Java ”。您還可以閱讀評論“並發包的阻塞隊列”以及問題“何時更喜歡 LinkedBlockingQueue 而不是 ArrayBlockingQueue? ”的答案。超級簡化-BlockingQueue
(阻塞佇列)阻塞一個線程,有兩種情況:
- 線程正在嘗試從空隊列中獲取元素
- 線程正在嘗試將元素放入完整隊列中
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
或者
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
正如我們所看到的,實作是在工廠方法內創建的ExecutorService
。基本上就是這樣ThreadPoolExecutor
。僅影響工作的屬性變更。
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
執行緒池執行器
正如我們之前看到的,在工廠方法內部ThreadPoolExecutor
,. 該功能受到作為最大和最小線程傳遞的值以及使用的隊列的影響。並且可以使用該介面的任何實作java.util.concurrent.BlockingQueue
。說到ThreadPoolExecutor
“啊”,值得注意的是操作過程中有趣的功能。例如,ThreadPoolExecutor
如果那裡沒有空間,則無法將任務傳送到:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
此程式碼將失敗並出現以下錯誤:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
也就是說,task
你不能提交,因為 SynchronousQueue
它的設計方式使其實際上由一個元素組成,並且不允許您在其中放置更多元素。我們可以看到,queued tasks
這裡有0,這並沒有什麼奇怪的,因為 這是具體的SynchronousQueue
- 事實上,它是一個只有 1 個元素的隊列,並且始終為空。(!) 當一個執行緒將一個元素放入佇列時,它將等待,直到另一個執行緒從佇列中取出該元素。因此,我們可以替換為new LinkedBlockingQueue<>(1)
,錯誤中指示的內容將會改變queued tasks = 1
。因為 佇列只有 1 個元素,那麼我們無法加入第二個元素。我們將在這一點上失敗。繼續隊列的主題,值得注意的是該類別ThreadPoolExecutor
還有用於服務隊列的附加方法。例如,該方法threadPoolExecutor.purge()
將從佇列中刪除所有已取消的任務以釋放佇列中的空間。與佇列相關的另一個有趣的功能是不接受的任務處理程序:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
例如,處理程序只需Rejected
為每次拒絕接受佇列中的任務列印一個單字。方便不是嗎?此外,ThreadPoolExecutor
他還有一位有趣的繼承人——ScheduledThreadPoolExecutor
他就是ScheduledExecutorService
。它提供了在計時器上執行任務的能力。
預定執行服務
ExecutorService
類型ScheduledExecutorService
可讓您根據計劃運行任務。讓我們來看一個例子:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
這裡一切都很簡單。任務發送後,我們收到「計劃任務」java.util.concurrent.ScheduledFuture
。以下情況對於時間表也可能有用:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
這裡我們發送Runnable
要以固定速率執行的任務,並且有一定的延遲。在這種情況下,每2秒1秒後,開始執行任務。還有一個類似的選項:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
但這裡的任務是在不同任務的執行之間以給定的間隔執行的。也就是說,任務task
將在1秒內完成。接下來,一旦完成,2秒鐘過去,就會啟動新的任務。您可以閱讀以下有關此主題的資料:
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
工作偷池
除了上面提到的線程池之外,還有一種。可以說他有點特別。它的名字是工作竊取池。簡而言之,Work Stealing 是一種工作演算法,其中空閒執行緒開始從其他執行緒取得任務或從通用佇列中取得任務。讓我們來看一個例子:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
如果我們運行這段程式碼,ExecutorService
它將創建 5 個線程,因為 每個執行緒都會加入物件所在位置的等待佇列lock
。我們已經在“你不能用線程破壞 Java:第二部分 - 同步”中討論了監視器和鎖。現在我們將其替換Executors.newCachedThreadPool
為Executors.newWorkStealingPool()
. 會發生什麼變化?我們將看到我們的任務不是在 5 個執行緒中執行,而是在更少的執行緒中執行。還記得cachedThreadPool
您為每個任務創建了自己的線程嗎?因為wait
它阻塞了線程,但接下來的任務想要執行,並且在池中為它們創建了新線程。對於StealingPool
執行緒來說,它們不會永遠空閒wait
,它們將開始執行相鄰的任務。這與其他線程池有何不同WorkStealingPool
?因為他的內心深處確實存在著某種神奇的東西ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
實際上還有一個差別。預設情況下創建的線程ForkJoinPool
是守護線程,而不是透過常規ThreadPool
. 一般來說,值得記住守護線程,因為... 例如,CompletableFuture
也使用守護線程,如果您不指定自己的ThreadFactory
,這將建立非守護線程。這些驚喜會在意想不到的地方等著您!)
分叉/加入池
在這一部分中,我們將討論ForkJoinPool
生活在WorkStealingPool
. 一般來說,Fork Join框架出現在Java 1.7。即使 Java 11 已經出現,它仍然值得記住。這不是最常見的任務,但很有趣。網路上有一個關於這個主題的很好的評論:「Fork/Join Framework in Java 7」。 Fork/JoinPool
他的作品中運用了這樣的概念java.util.concurrent.RecursiveTask
。還有一個類似物 - java.util.concurrent.RecursiveAction
。RecursiveActions 不傳回結果。因此RecursiveTask
類似於Callable
,並且RecursiveAction
類似於Runnable
。好吧,看名字,我們看到兩個關鍵方法 -fork
和join
。該方法fork
在單獨的執行緒中異步運行任務。並且該方法join
允許您等待工作完成。有幾種使用方法: 這張圖片是 Alexey Shipilev 報告「 Fork/Join:實作、使用、效能」投影片的一部分。為了更清楚地說明這一點,值得一看他在 JEE CONF 上的報告:「Fork Join 實現功能」。
總結
所以,我們在這裡完成了評論的下一部分。我們弄清楚了我們首先想到的Executor
執行緒的方法。然後我們決定繼續這個想法並想出了它ExecutorService
。允許您使用和ExecutorService
發送要執行的任務,以及透過關閉服務來管理該服務。因為 '我們需要實現,我們編寫了一個帶有工廠方法的類別並調用它。它允許您創建線程池。同時,還有線程池也可以讓你指定執行的時間表,只不過是隱藏在後面的。我希望上面寫的內容不僅對您來說有趣,而且可以理解)我總是很高興收到建議和評論。#維亞切斯拉夫submit
invoke
ExecutorService
Executors
ThreadPoolExecutor
WorkStealingPool
ForkJoinPool
GO TO FULL VERSION