導入
そのため、Java にはスレッドがあることがわかりました。これについては、「スレッドで Java を台無しにすることはできません: パート I - スレッド」のレビューで読むことができます。 もう一度サンプルコードを見てみましょう。public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
ご覧のとおり、タスクを起動するコードは非常に標準的ですが、新しく起動するたびにそれを繰り返す必要があります。1 つの解決策は、これを別のメソッド (たとえば、 ) に移動することですexecute(Runnable runnable)
。しかし、Java 開発者はすでに私たちのことを心配しており、次のようなインターフェースを考え出してくれていますExecutor
。
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
ご覧のとおり、コードはより簡潔になり、Runnable
スレッドで実行するためのコードを簡単に記述できるようになりました。素晴らしいですね。しかし、これはほんの始まりにすぎません。
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
子孫インターフェイスがありますExecutorService
。このインターフェースの JavaDoc には、これは作業「a」を停止するメソッドを提供し、実行の進行状況を追跡できるようにするExecutorService
特別な「a」の説明であると記載されています。以前、「スレッドで Java を台無しにすることはできません: パート IV - 呼び出し可能、将来、そして友達」で、その可能性について簡単にレビューしました。忘れてしまった、または読んでいない場合は、記憶を新たにすることをお勧めします ;) JavaDoc には他にどのような興味深いことが書かれていますか? デフォルトで利用可能な実装を作成できる特別なファクトリーがあること。 Executor
Executor
java.util.concurrent.Future
Future
java.util.concurrent.Executors
ExecutorService
ExecutorService
もう一度思い出してみましょう。Executor
スレッド作成の実装が隠蔽されている場合、スレッド内で特定のタスクを実行する (つまり、実行する) 必要があります。実行の進行状況を管理するための一連の機能を備えたExecutorService
特別なものがあります。そして、私たちはあなたが作ることができるExecutor
工場を持っています。さあ、自分たちでやってみましょう: Executors
ExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
ご覧のとおり、Fixed Thread Pool
サイズ 2 の固定スレッド プール ( ) を指定しました。その後、タスクを 1 つずつプールに送信します。String
各タスクは、スレッド名( ) を含む文字列( currentThread().getName()
) を返します。最後にシャットダウンすることが重要ですExecutorService
。そうしないとプログラムが終了しなくなるからです。ファクトリにはExecutors
他のファクトリ メソッドもあります。たとえば、1 つのスレッドだけのプールを作成したり、スレッドが 1 分間アイドル状態になるとプールから削除されるnewSingleThreadExecutor
キャッシュ付きのプールを作成したりできます。newCachedThreadPool
実際、これらの背後には、タスクが配置され、そこからタスクが実行されるブロッキング キューがExecutorService
あります。ブロックキューの詳細については、ビデオ「ブロックキュー - コレクション #5 - 高度な Java」を参照してください。また、レビュー「同時実行パッケージのキューのブロック」と質問「ArrayBlockingQueue より LinkedBlockingQueue を優先するのはどのような場合ですか?」の回答も読むことができます。超単純化 - (ブロックキュー) は、次の 2 つのケースでスレッドをブロックします。 BlockingQueue
- スレッドが空のキューから要素を取得しようとしています
- スレッドは要素を満杯のキューに入れようとしています
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
または
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ご覧のとおり、実装はファクトリ メソッド内で作成されますExecutorService
。基本的にはそれだけですThreadPoolExecutor
。作品に影響を与える属性のみが変化します。
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
スレッドプールエグゼキュータ
前に見たように、ファクトリ メソッド内ではThreadPoolExecutor
、 、 。機能は、最大スレッドと最小スレッドとしてどのような値が渡されるか、またどのキューが使用されるかによって影響を受けます。また、インターフェイスの任意の実装を使用できますjava.util.concurrent.BlockingQueue
。「ああ」と言えばThreadPoolExecutor
、動作中の興味深い機能に注目する価値があります。たとえば、ThreadPoolExecutor
スペースがない場合はタスクを送信できません。
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
このコードは次のようなエラーで失敗します。
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
つまり、task
送信できません。SynchronousQueue
実際には 1 つの要素で構成され、それ以上の要素を配置できないように設計されています。ご覧のとおり、queued tasks
ここには 0 があり、何もおかしな点はありません。これは具体的ですSynchronousQueue
。実際、これは 1 つの要素からなるキューであり、常に空です。(!) あるスレッドが要素をキューに入れると、別のスレッドがキューから要素を取得するまで待機します。したがって、 に置き換えることができnew LinkedBlockingQueue<>(1)
、エラーで示される内容が変わりますqueued tasks = 1
。なぜなら キューには 1 つの要素しかないため、2 番目の要素を追加することはできません。そして、私たちはこれに陥るでしょう。キューのテーマを続けると、このクラスにはThreadPoolExecutor
キューを処理するための追加メソッドがあることに注意してください。たとえば、このメソッドは、threadPoolExecutor.purge()
キャンセルされたすべてのタスクをキューから削除して、キュー内のスペースを解放します。キューに関連するもう 1 つの興味深い機能は、未承認のタスク ハンドラーです。
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
たとえば、ハンドラーは、Rejected
キューへのタスクの受け入れを拒否するたびに単語を出力するだけです。便利ですね。さらに、ThreadPoolExecutor
彼には興味深い後継者がいます -ScheduledThreadPoolExecutor
ですScheduledExecutorService
。タイマーでタスクを実行する機能を提供します。
ScheduledExecutorService
ExecutorService
タイプを使用すると、ScheduledExecutorService
スケジュールに従ってタスクを実行できます。例を見てみましょう:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
ここではすべてがシンプルです。タスクが送信され、「スケジュールされたタスク」を受け取りますjava.util.concurrent.ScheduledFuture
。次のようなケースでもスケジュールが役立つ場合があります。
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Runnable
ここでは、一定の遅延を伴って固定レート (Fixed Rate) で実行されるタスクを 送信します。この場合、2 秒ごとに 1 秒後にタスクの実行を開始します。同様のオプションがあります。
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ただし、ここでは、タスクは、異なるタスクの実行の間に指定された間隔で実行されます。つまり、タスクはtask
1 秒で完了します。次に、完了するとすぐに 2 秒が経過し、新しいタスクが起動されます。このトピックに関する次の資料を読むことができます。
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
仕事盗みプール
上記のスレッド プールに加えて、もう 1 つあります。彼は少し特殊だと言えるでしょう。その名も「仕事盗みプール」。つまり、ワーク スティーリングは、アイドル状態のスレッドが他のスレッドからタスクを取得したり、一般キューからタスクを取得したりする作業アルゴリズムです。例を見てみましょう:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
このコードを実行すると、ExecutorService
5 つのスレッドが作成されます。各スレッドはオブジェクトの位置で待機キューに参加しますlock
。モニターとそのロックについては、「スレッドで Java を台無しにすることはできません: パート II - 同期」ですでに説明しました。Executors.newCachedThreadPool
そして今、それを に置き換えますExecutors.newWorkStealingPool()
。何が変わるのでしょうか?タスクが 5 つのスレッドではなく、より少ないスレッドで実行されることがわかります。cachedThreadPool
タスクごとに独自のスレッドを作成したことを覚えていますか? wait
スレッドはブロックされましたが、次のタスクが実行される必要があり、それらのタスクのためにプールに新しいスレッドが作成されたためです。StealingPool
スレッドの場合、スレッドは永久にアイドル状態になることはなくwait
、隣接するタスクの実行を開始します。これは他のスレッド プールとどう違うのでしょうかWorkStealingPool
? なぜなら、彼の中には実際に魔法のようなものが住んでいるからですForkJoinPool
。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
実はもう一つ違いがあります。ForkJoinPool
通常のスレッドとは対照的に、デフォルトで作成されるスレッドはデーモン スレッドですThreadPool
。一般に、デーモン スレッドについては覚えておく価値があります。たとえば、CompletableFuture
独自の を指定しない場合はデーモン スレッドも使用され、ThreadFactory
非デーモン スレッドが作成されます。思いがけない場所で、こんなサプライズが待っているかもしれません!)
プールのフォーク/結合
ForkJoinPool
このパートでは、 の「内部」に存在する同じフレームワーク (フォーク/ジョイン フレームワークとも呼ばれる) について説明しますWorkStealingPool
。一般に、Fork Join フレームワークは Java 1.7 で登場しました。Java 11 がすでに登場しているとしても、覚えておく価値はあります。最も一般的なタスクではありませんが、非常に興味深いものです。このトピックについては、インターネット上に優れたレビュー「Fork/Join Framework in Java 7」があります。 Fork/JoinPool
というようなコンセプトで作品を制作していますjava.util.concurrent.RecursiveTask
。アナログ版もあります - java.util.concurrent.RecursiveAction
。RecursiveAction は結果を返しません。したがって、RecursiveTask
に似ておりCallable
、RecursiveAction
にも似ていますRunnable
。名前を見ると、fork
とという 2 つの主要なメソッドがわかりますjoin
。このメソッドは、fork
別のスレッドでタスクを非同期に実行します。このメソッドでは、join
作業が完了するまで待つことができます。これを使用するにはいくつかの方法があります。 この画像は、Alexey Shipilev のレポート「フォーク/結合: 実装、使用、パフォーマンス」のスライドの一部です。これを明確にするために、JEE CONF での彼のレポート「フォーク結合の実装機能」を参照する価値があります。
要約する
さて、ここでレビューの次の部分を終了します。私たちはスレッドを実行するために最初に思いついたものを考え出しましたExecutor
。そこで私たちはそのアイデアを継続することを決意し、それを思いつきましたExecutorService
。を使用すると、と をExecutorService
使用してタスクを実行用に送信したり、サービスをオフにして管理したりできます。なぜなら 「実装が必要なので、ファクトリ メソッドを含むクラスを作成し、それを呼び出しました。」これにより、スレッド プールを作成できます。同時に、実行スケジュールを指定できるスレッド プールもありますが、これは の背後に隠されています。上記に書かれたことがあなたにとって興味深いだけでなく、理解できることを願っています)提案やコメントをいつでも喜んで受け取ります。#ヴィアチェスラフsubmit
invoke
ExecutorService
Executors
ThreadPoolExecutor
WorkStealingPool
ForkJoinPool
GO TO FULL VERSION