JavaRush /Java-Blog /Random-DE /Sie können Java nicht mit einem Thread verderben: Teil V ...
Viacheslav
Level 3

Sie können Java nicht mit einem Thread verderben: Teil V – Executor, ThreadPool, Fork Join

Veröffentlicht in der Gruppe Random-DE

Einführung

Wir wissen also, dass es Threads in Java gibt, worüber Sie in der Rezension „ You Can’t Spoil Java with a Thread: Part I – Threads “ nachlesen können . Sie können Java nicht mit einem Thread verderben: Teil V – Executor, ThreadPool, Fork Join – 1Schauen wir uns den Beispielcode noch einmal an:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Wie wir sehen können, ist der Code zum Starten der Aufgabe ziemlich standardmäßig, aber bei jedem neuen Start müssen wir ihn wiederholen. Eine Lösung besteht beispielsweise darin, es in eine separate Methode zu verschieben execute(Runnable runnable). Aber die Java-Entwickler haben sich schon um uns gekümmert und sich eine Schnittstelle ausgedacht Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Wie Sie sehen, ist der Code prägnanter geworden und ermöglicht es uns, einfach Code zu schreiben, um ihn Runnablein einem Thread auszuführen. Großartig, nicht wahr? Aber das ist erst der Anfang: Sie können Java nicht mit einem Thread verderben: Teil V – Executor, ThreadPool, Fork Join – 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Wie Sie sehen können, Executorverfügt die Schnittstelle über eine abgeleitete Schnittstelle ExecutorService. Das JavaDoc dieser Schnittstelle besagt, dass ExecutorServicees sich um eine Beschreibung eines speziellen Executor„a“ handelt, das Methoden zum Stoppen der Arbeit Executor„a“ bereitstellt und es Ihnen ermöglicht, java.util.concurrent.Futureden Fortschritt der Ausführung zu verfolgen. Zuvor haben wir in „ Mit Thread kann man Java nicht verderben: Teil IV – Callable, Future and Friends “ kurz die Möglichkeiten besprochen Future. Wenn Sie es vergessen oder nicht gelesen haben, empfehle ich Ihnen, Ihr Gedächtnis aufzufrischen ;) Welche anderen interessanten Dinge sind in JavaDoc geschrieben? Dass wir über eine spezielle Factory verfügen java.util.concurrent.Executors, die es uns ermöglicht, Implementierungen zu erstellen, die standardmäßig verfügbar sind ExecutorService.

ExecutorService

Erinnern wir uns noch einmal. Wir müssen Executoreine bestimmte Aufgabe in einem Thread ausführen (d. h. ausführen), wobei uns die Implementierung der Erstellung eines Threads verborgen bleibt. Wir haben ExecutorServiceein spezielles Programm Executor, das über eine Reihe von Funktionen zur Verwaltung des Ausführungsfortschritts verfügt. Und wir haben eine Fabrik Executors, in der Sie erstellen können ExecutorService. Machen wir es jetzt selbst:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Wie wir sehen können, haben wir einen festen Thread-Pool ( Fixed Thread Pool) der Größe 2 angegeben. Anschließend senden wir Aufgaben nacheinander an den Pool. Jede Aufgabe gibt eine Zeichenfolge ( String) zurück, die den Threadnamen ( currentThread().getName()) enthält. Es ist wichtig, ganz am Ende herunterzufahren ExecutorService, da unser Programm sonst nicht beendet wird. ExecutorsEs gibt andere Fabrikmethoden in der Fabrik . Wir können beispielsweise einen Pool mit nur einem Thread erstellen – newSingleThreadExecutoroder einen Pool mit Caching newCachedThreadPool, bei dem Threads aus dem Pool entfernt werden, wenn sie eine Minute lang inaktiv sind. Tatsächlich ExecutorServicebefindet sich dahinter eine Blockierungswarteschlange , in die Aufgaben gestellt werden und von der aus diese Aufgaben ausgeführt werden. Weitere Informationen zu Blocking Queues finden Sie im Video „ Blocking Queue – Collections #5 – Advanced Java “. Sie können auch die Rezension „ Blockieren von Warteschlangen des gleichzeitigen Pakets “ und die Antwort auf die Frage „ Wann sollte man LinkedBlockingQueue gegenüber ArrayBlockingQueue bevorzugen? “ lesen. Super vereinfacht – BlockingQueue(Blockierungswarteschlange) blockiert einen Thread in zwei Fällen:
  • Ein Thread versucht, Elemente aus einer leeren Warteschlange abzurufen
  • Der Thread versucht, Elemente in eine volle Warteschlange zu stellen
Wenn wir uns die Implementierung von Factory-Methoden ansehen, können wir sehen, wie diese strukturiert sind. Zum Beispiel:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
oder
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Wie wir sehen können, werden Implementierungen innerhalb von Factory-Methoden erstellt ExecutorService. Und das ist es im Grunde ThreadPoolExecutor. Nur die Attribute, die sich auf die Arbeit auswirken, ändern sich. Sie können Java nicht mit einem Thread ruinieren: Teil V – Executor, ThreadPool, Fork Join – 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Wie wir zuvor gesehen haben, sind Inside-Factory-Methoden ThreadPoolExecutor, . Die Funktionalität wird davon beeinflusst, welche Werte als maximale und minimale Threads übergeben werden und welche Warteschlange verwendet wird. Und jede Implementierung der Schnittstelle kann verwendet werden java.util.concurrent.BlockingQueue. Apropos ThreadPoolExecutor'ahs', es ist erwähnenswert, dass es während des Betriebs interessante Funktionen gibt. Beispielsweise können Sie keine Aufgaben an senden, ThreadPoolExecutorwenn dort kein Platz vorhanden ist:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Dieser Code schlägt mit einem Fehler wie dem folgenden fehl:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Das heißt, taskSie können nicht einreichen, weil SynchronousQueueEs ist so konzipiert, dass es tatsächlich aus einem Element besteht und es nicht möglich ist, dort mehr zu platzieren. Wie wir sehen können, queued tasksgibt es hier 0, und daran ist nichts Seltsames, denn Dies ist spezifisch SynchronousQueue– tatsächlich handelt es sich um eine Warteschlange mit einem Element, das immer leer ist. (!) Wenn ein Thread ein Element in die Warteschlange stellt, wartet er, bis ein anderer Thread das Element aus der Warteschlange nimmt. Daher können wir durch ersetzen new LinkedBlockingQueue<>(1)und was im Fehler angezeigt wird, wird sich ändern queued tasks = 1. Weil Wenn die Warteschlange nur aus einem Element besteht, können wir kein zweites hinzufügen. Und wir werden darauf hereinfallen. Um das Thema der Warteschlange fortzusetzen, ist es erwähnenswert, dass die Klasse ThreadPoolExecutorüber zusätzliche Methoden zur Bedienung der Warteschlange verfügt. Beispielsweise threadPoolExecutor.purge()entfernt die Methode alle abgebrochenen Aufgaben aus der Warteschlange, um Platz in der Warteschlange freizugeben. Ein weiteres interessantes Feature im Zusammenhang mit der Warteschlange ist der Handler für nicht akzeptierte Aufgaben:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Beispielsweise gibt der Handler einfach ein Wort Rejectedfür jede Weigerung aus, eine Aufgabe in die Warteschlange aufzunehmen. Praktisch, nicht wahr? Darüber hinaus ThreadPoolExecutorhat er einen interessanten Erben – ScheduledThreadPoolExecutornämlich ScheduledExecutorService. Es bietet die Möglichkeit, eine Aufgabe mit einem Timer auszuführen.

ScheduledExecutorService

ExecutorServiceMit diesem Typ ScheduledExecutorServicekönnen Sie Aufgaben nach einem Zeitplan ausführen. Schauen wir uns ein Beispiel an:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Hier ist alles einfach. Aufgaben werden gesendet, wir erhalten eine „geplante Aufgabe“ java.util.concurrent.ScheduledFuture. Der folgende Fall kann für den Zeitplan ebenfalls nützlich sein:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Hier senden wir Runnabledie auszuführende Aufgabe mit einer bestimmten Verzögerung zu einer festen Rate (Fixed Rate). Beginnen Sie in diesem Fall nach 1 Sekunde alle 2 Sekunden mit der Ausführung der Aufgabe. Es gibt eine ähnliche Option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Aber hier werden Aufgaben in einem bestimmten Intervall ZWISCHEN der Ausführung verschiedener Aufgaben ausgeführt. Das heißt, die Aufgabe taskwird in 1 Sekunde erledigt. Sobald der Vorgang abgeschlossen ist, vergehen 2 Sekunden und dann wird eine neue Aufgabe gestartet. Zu diesem Thema können Sie folgende Materialien lesen: Sie können Java nicht mit einem Thread ruinieren: Teil V – Executor, ThreadPool, Fork Join – 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Zusätzlich zu den oben genannten Thread-Pools gibt es noch einen weiteren. Man könnte sagen, er ist etwas Besonderes. Sein Name ist Work Stealing Pool. Kurz gesagt ist Work Stealing ein Arbeitsalgorithmus, bei dem inaktive Threads beginnen, Aufgaben von anderen Threads oder Aufgaben aus der allgemeinen Warteschlange zu übernehmen. Schauen wir uns ein Beispiel an:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Wenn wir diesen Code ausführen, ExecutorServicewerden 5 Threads erstellt, weil Jeder Thread wird in die Warteschlange am Standort des Objekts aufgenommen lock. Über Monitore und Sperren haben wir bereits in „ You Can’t Spoil Java with a Thread: Part II – Synchronization “ gesprochen. Executors.newCachedThreadPoolUnd jetzt werden wir es durch ersetzen Executors.newWorkStealingPool(). Was wird sich ändern? Wir werden sehen, dass unsere Aufgaben nicht in 5 Threads, sondern in weniger ausgeführt werden. Erinnern Sie sich, dass cachedThreadPoolSie für jede Aufgabe einen eigenen Thread erstellt haben? Weil waites den Thread blockierte, aber die nächsten Aufgaben ausgeführt werden wollten und dafür neue Threads im Pool erstellt wurden. Im Fall von StealingPoolThreads bleiben sie nicht ewig im Leerlauf wait, sondern beginnen mit der Ausführung benachbarter Aufgaben. Wie unterscheidet sich das von anderen Thread-Pools WorkStealingPool? Denn tatsächlich lebt in ihm etwas Magisches ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Es gibt tatsächlich noch einen weiteren Unterschied. Threads, die standardmäßig erstellt werden, ForkJoinPoolsind Daemon-Threads, im Gegensatz zu Threads, die über reguläre Threads erstellt werden ThreadPool. Im Allgemeinen lohnt es sich, sich an Daemon-Threads zu erinnern, weil... Beispielsweise CompletableFuturewerden auch Daemon-Threads verwendet, wenn Sie keine eigenen Threads angeben ThreadFactory, wodurch Nicht-Daemon-Threads erstellt werden. Solche Überraschungen können Sie an einem unerwarteten Ort erwarten!)

Fork/Join-Pool

In diesem Teil werden wir über dasselbe sprechen ForkJoinPool(auch Fork/Join-Framework genannt), das „unter der Haube“ von WorkStealingPool. Im Allgemeinen erschien das Fork Join Framework in Java 1.7. Und auch wenn Java 11 bereits auf dem Markt ist, lohnt es sich dennoch, daran zu denken. Nicht die häufigste Aufgabe, aber durchaus interessant. Zu diesem Thema gibt es im Internet eine gute Rezension: „ Fork/Join Framework in Java 7 “. Fork/JoinPooloperiert in seiner Arbeit mit einem solchen Konzept wie java.util.concurrent.RecursiveTask. Es gibt auch ein Analogon - java.util.concurrent.RecursiveAction. RecursiveActions geben kein Ergebnis zurück. Also RecursiveTaskähnlich Callableund RecursiveActionähnlich Runnable. Wenn wir uns den Namen ansehen, sehen wir zwei Schlüsselmethoden – forkund join. Die Methode forkführt eine Aufgabe asynchron in einem separaten Thread aus. Und mit dieser Methode joinkönnen Sie warten, bis die Arbeit abgeschlossen ist. Es gibt mehrere Möglichkeiten, es zu verwenden: Sie können Java nicht mit einem Thread ruinieren: Teil V – Executor, ThreadPool, Fork Join – 5Dieses Bild ist Teil einer Folie aus Alexey Shipilevs Bericht „ Fork/Join: Implementierung, Verwendung, Leistung “. Um es klarer zu machen, lohnt es sich, seinen Bericht auf der JEE CONF anzusehen: „ Fork-Join-Implementierungsfunktionen “.

Zusammenfassend

Hier sind wir also und beenden den nächsten Teil der Rezension. Wir haben herausgefunden, was wir uns zuerst zum Ausführen von Threads ausgedacht haben Executor. Dann beschlossen wir, die Idee weiterzuführen und kamen auf die Idee ExecutorService. ermöglicht es Ihnen, Aufgaben mit und ExecutorServicezur Ausführung zu senden und den Dienst zu verwalten, indem Sie ihn deaktivieren. Weil „Wir brauchen Implementierungen, wir haben eine Klasse mit Factory-Methoden geschrieben und sie genannt .“ Es ermöglicht Ihnen, Thread-Pools zu erstellen . Gleichzeitig gibt es Thread-Pools, in denen Sie auch einen Zeitplan für die Ausführung festlegen können, der jedoch dahinter verborgen ist . Ich hoffe, dass das, was oben geschrieben wurde, für Sie nicht nur interessant, sondern auch verständlich war. Ich freue mich immer über Anregungen und Kommentare. #WjatscheslawsubmitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
Kommentare
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION