JavaRush /Java Blog /Random-JA /スレッドで Java を台無しにすることはできません: パート V - エグゼキュータ、スレッドプール、フォーク結...
Viacheslav
レベル 3

スレッドで Java を台無しにすることはできません: パート V - エグゼキュータ、スレッドプール、フォーク結合

Random-JA グループに公開済み

導入

そのため、Java にはスレッドがあることがわかりました。これについては、「スレッドで Java を台無しにすることはできません: パート I - スレッド」のレビューで読むことができます。 スレッドで Java を台無しにすることはできません: パート V - エグゼキュータ、スレッドプール、フォーク結合 - 1もう一度サンプルコードを見てみましょう。
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
ご覧のとおり、タスクを起動するコードは非常に標準的ですが、新しく起動するたびにそれを繰り返す必要があります。1 つの解決策は、これを別のメソッド (たとえば、 ) に移動することですexecute(Runnable runnable)。しかし、Java 開発者はすでに私たちのことを心配しており、次のようなインターフェースを考え出してくれていますExecutor
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
ご覧のとおり、コードはより簡潔になり、Runnableスレッドで実行するためのコードを簡単に記述できるようになりました。素晴らしいですね。しかし、これはほんの始まりにすぎません。 スレッドで Java を台無しにすることはできません: パート V - エグゼキュータ、スレッドプール、フォーク結合 - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

ご覧のとおり、インターフェイスにはExecutor子孫インターフェイスがありますExecutorService。このインターフェースの JavaDoc には、これは作業「a」を停止するメソッドを提供し、実行の進行状況を追跡できるようにするExecutorService特別な「a」の説明であると記載されています。以前、「スレッドで Java を台無しにすることはできません: パート IV - 呼び出し可能、将来、そして友達」で、その可能性について簡単にレビューしました。忘れてしまった、または読んでいない場合は、記憶を新たにすることをお勧めします ;) JavaDoc には他にどのような興味深いことが書かれていますか? デフォルトで利用可能な実装を作成できる特別なファクトリーがあること。 ExecutorExecutorjava.util.concurrent.FutureFuturejava.util.concurrent.ExecutorsExecutorService

ExecutorService

もう一度思い出してみましょう。Executorスレッド作成の実装が隠蔽されている場合、スレッド内で特定のタスクを実行する (つまり、実行する) 必要があります。実行の進行状況を管理するための一連の機能を備えたExecutorService特別なものがあります。そして、私たちはあなたが作ることができるExecutor工場を持っています。さあ、自分たちでやってみましょう: ExecutorsExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
ご覧のとおり、Fixed Thread Poolサイズ 2 の固定スレッド プール ( ) を指定しました。その後、タスクを 1 つずつプールに送信します。String各タスクは、スレッド名( ) を含む文字列( currentThread().getName()) を返します。最後にシャットダウンすることが重要ですExecutorService。そうしないとプログラムが終了しなくなるからです。ファクトリにはExecutors他のファクトリ メソッドもあります。たとえば、1 つのスレッドだけのプールを作成したり、スレッドが 1 分間アイドル状態になるとプールから削除されるnewSingleThreadExecutorキャッシュ付きのプールを作成したりできます。newCachedThreadPool実際、これらの背後には、タスクが配置され、そこからタスクが実行されるブロッキング キューがExecutorServiceあります。ブロックキューの詳細については、ビデオ「ブロックキュー - コレクション #5 - 高度な Java」を参照してください。また、レビュー「同時実行パッケージのキューのブロック」と質問「ArrayBlockingQueue より LinkedBlockingQueue を優先するのはどのような場合ですか?」の回答も読むことができます。超単純化 - (ブロックキュー) は、次の 2 つのケースでスレッドをブロックします。 BlockingQueue
  • スレッドが空のキューから要素を取得しようとしています
  • スレッドは要素を満杯のキューに入れようとしています
ファクトリ メソッドの実装を見ると、それらがどのように構造化されているかがわかります。例えば:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
または
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
ご覧のとおり、実装はファクトリ メソッド内で作成されますExecutorService。基本的にはそれだけですThreadPoolExecutor。作品に影響を与える属性のみが変化します。 スレッドで Java を破壊することはできません: パート V - エグゼキュータ、スレッドプール、フォーク結合 - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

スレッドプールエグゼキュータ

前に見たように、ファクトリ メソッド内ではThreadPoolExecutor、 、 。機能は、最大スレッドと最小スレッドとしてどのような値が渡されるか、またどのキューが使用されるかによって影響を受けます。また、インターフェイスの任意の実装を使用できますjava.util.concurrent.BlockingQueue。「ああ」と言えばThreadPoolExecutor、動作中の興味深い機能に注目する価値があります。たとえば、ThreadPoolExecutorスペースがない場合はタスクを送信できません。
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
このコードは次のようなエラーで失敗します。
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
つまり、task送信できません。SynchronousQueue実際には 1 つの要素で構成され、それ以上の要素を配置できないように設計されています。ご覧のとおり、queued tasksここには 0 があり、何もおかしな点はありません。これは具体的ですSynchronousQueue。実際、これは 1 つの要素からなるキューであり、常に空です。(!) あるスレッドが要素をキューに入れると、別のスレッドがキューから要素を取得するまで待機します。したがって、 に置き換えることができnew LinkedBlockingQueue<>(1)、エラーで示される内容が変わりますqueued tasks = 1。なぜなら キューには 1 つの要素しかないため、2 番目の要素を追加することはできません。そして、私たちはこれに陥るでしょう。キューのテーマを続けると、このクラスにはThreadPoolExecutorキューを処理するための追加メソッドがあることに注意してください。たとえば、このメソッドは、threadPoolExecutor.purge()キャンセルされたすべてのタスクをキューから削除して、キュー内のスペースを解放します。キューに関連するもう 1 つの興味深い機能は、未承認のタスク ハンドラーです。
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
たとえば、ハンドラーは、Rejectedキューへのタスクの受け入れを拒否するたびに単語を出力するだけです。便利ですね。さらに、ThreadPoolExecutor彼には興味深い後継者がいます -ScheduledThreadPoolExecutorですScheduledExecutorService。タイマーでタスクを実行する機能を提供します。

ScheduledExecutorService

ExecutorServiceタイプを使用すると、ScheduledExecutorServiceスケジュールに従ってタスクを実行できます。例を見てみましょう:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
ここではすべてがシンプルです。タスクが送信され、「スケジュールされたタスク」を受け取りますjava.util.concurrent.ScheduledFuture。次のようなケースでもスケジュールが役立つ場合があります。
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Runnableここでは、一定の遅延を伴って固定レート (Fixed Rate) で実行されるタスクを 送信します。この場合、2 秒ごとに 1 秒後にタスクの実行を開始します。同様のオプションがあります。
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ただし、ここでは、タスクは、異なるタスクの実行の間に指定された間隔で実行されます。つまり、タスクはtask1 秒で完了します。次に、完了するとすぐに 2 秒が経過し、新しいタスクが起動されます。このトピックに関する次の資料を読むことができます。 スレッドで Java を破壊することはできません: パート V - エグゼキュータ、スレッドプール、フォーク結合 - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

仕事盗みプール

上記のスレッド プールに加えて、もう 1 つあります。彼は少し特殊だと言えるでしょう。その名も「仕事盗みプール」。つまり、ワーク スティーリングは、アイドル状態のスレッドが他のスレッドからタスクを取得したり、一般キューからタスクを取得したりする作業アルゴリズムです。例を見てみましょう:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
このコードを実行すると、ExecutorService5 つのスレッドが作成されます。各スレッドはオブジェクトの位置で待機キューに参加しますlock。モニターとそのロックについては、「スレッドで Java を台無しにすることはできません: パート II - 同期」ですでに説明しました。Executors.newCachedThreadPoolそして今、それを に置き換えますExecutors.newWorkStealingPool()。何が変わるのでしょうか?タスクが 5 つのスレッドではなく、より少ないスレッドで実行されることがわかります。cachedThreadPoolタスクごとに独自のスレッドを作成したことを覚えていますか? waitスレッドはブロックされましたが、次のタスクが実行される必要があり、それらのタスクのためにプールに新しいスレッドが作成されたためです。StealingPoolスレッドの場合、スレッドは永久にアイドル状態になることはなくwait、隣接するタスクの実行を開始します。これは他のスレッド プールとどう違うのでしょうかWorkStealingPool? なぜなら、彼の中には実際に魔法のようなものが住んでいるからですForkJoinPool
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
実はもう一つ違いがあります。ForkJoinPool通常のスレッドとは対照的に、デフォルトで作成されるスレッドはデーモン スレッドですThreadPool。一般に、デーモン スレッドについては覚えておく価値があります。たとえば、CompletableFuture独自の を指定しない場合はデーモン スレッドも使用され、ThreadFactory非デーモン スレッドが作成されます。思いがけない場所で、こんなサプライズが待っているかもしれません!)

プールのフォーク/結合

ForkJoinPoolこのパートでは、 の「内部」に存在する同じフレームワーク (フォーク/ジョイン フレームワークとも呼ばれる) について説明しますWorkStealingPool。一般に、Fork Join フレームワークは Java 1.7 で登場しました。Java 11 がすでに登場しているとしても、覚えておく価値はあります。最も一般的なタスクではありませんが、非常に興味深いものです。このトピックについては、インターネット上に優れたレビュー「Fork/Join Framework in Java 7」があります。 Fork/JoinPoolというようなコンセプトで作品を制作していますjava.util.concurrent.RecursiveTask。アナログ版もあります - java.util.concurrent.RecursiveAction。RecursiveAction は結果を返しません。したがって、RecursiveTaskに似ておりCallableRecursiveActionにも似ていますRunnable。名前を見ると、forkとという 2 つの主要なメソッドがわかりますjoin。このメソッドは、fork別のスレッドでタスクを非同期に実行します。このメソッドでは、join作業が完了するまで待つことができます。これを使用するにはいくつかの方法があります。 この画像は、Alexey Shipilev のレポート「フォーク/結合: 実装、使用、パフォーマンススレッドで Java を破壊することはできません: パート V - エグゼキュータ、スレッドプール、フォーク結合 - 5のスライドの一部です。これを明確にするために、JEE CONF での彼のレポート「フォーク結合の実装機能」を参照する価値があります。

要約する

さて、ここでレビューの次の部分を終了します。私たちはスレッドを実行するために最初に思いついたものを考え出しましたExecutor。そこで私たちはそのアイデアを継続することを決意し、それを思いつきましたExecutorService。を使用すると、と をExecutorService使用してタスクを実行用に送信したり、サービスをオフにして管理したりできます。なぜなら 「実装が必要なので、ファクトリ メソッドを含むクラスを作成し、それを呼び出しました。」これにより、スレッド プールを作成できます。同時に、実行スケジュールを指定できるスレッド プールもありますが、これは の背後に隠されています。上記に書かれたことがあなたにとって興味深いだけでなく、理解できることを願っています)提案やコメントをいつでも喜んで受け取ります。#ヴィアチェスラフsubmitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
コメント
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION