Einführung
Wir wissen also, dass es Threads in Java gibt, worüber Sie in der Rezension „ You Can’t Spoil Java with a Thread: Part I – Threads “ nachlesen können . Schauen wir uns den Beispielcode noch einmal an:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Wie wir sehen können, ist der Code zum Starten der Aufgabe ziemlich standardmäßig, aber bei jedem neuen Start müssen wir ihn wiederholen. Eine Lösung besteht beispielsweise darin, es in eine separate Methode zu verschieben execute(Runnable runnable)
. Aber die Java-Entwickler haben sich schon um uns gekümmert und sich eine Schnittstelle ausgedacht Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Wie Sie sehen, ist der Code prägnanter geworden und ermöglicht es uns, einfach Code zu schreiben, um ihn Runnable
in einem Thread auszuführen. Großartig, nicht wahr? Aber das ist erst der Anfang:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
verfügt die Schnittstelle über eine abgeleitete Schnittstelle ExecutorService
. Das JavaDoc dieser Schnittstelle besagt, dass ExecutorService
es sich um eine Beschreibung eines speziellen Executor
„a“ handelt, das Methoden zum Stoppen der Arbeit Executor
„a“ bereitstellt und es Ihnen ermöglicht, java.util.concurrent.Future
den Fortschritt der Ausführung zu verfolgen. Zuvor haben wir in „ Mit Thread kann man Java nicht verderben: Teil IV – Callable, Future and Friends “ kurz die Möglichkeiten besprochen Future
. Wenn Sie es vergessen oder nicht gelesen haben, empfehle ich Ihnen, Ihr Gedächtnis aufzufrischen ;) Welche anderen interessanten Dinge sind in JavaDoc geschrieben? Dass wir über eine spezielle Factory verfügen java.util.concurrent.Executors
, die es uns ermöglicht, Implementierungen zu erstellen, die standardmäßig verfügbar sind ExecutorService
.
ExecutorService
Erinnern wir uns noch einmal. Wir müssenExecutor
eine bestimmte Aufgabe in einem Thread ausführen (d. h. ausführen), wobei uns die Implementierung der Erstellung eines Threads verborgen bleibt. Wir haben ExecutorService
ein spezielles Programm Executor
, das über eine Reihe von Funktionen zur Verwaltung des Ausführungsfortschritts verfügt. Und wir haben eine Fabrik Executors
, in der Sie erstellen können ExecutorService
. Machen wir es jetzt selbst:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Wie wir sehen können, haben wir einen festen Thread-Pool ( Fixed Thread Pool
) der Größe 2 angegeben. Anschließend senden wir Aufgaben nacheinander an den Pool. Jede Aufgabe gibt eine Zeichenfolge ( String
) zurück, die den Threadnamen ( currentThread().getName()
) enthält. Es ist wichtig, ganz am Ende herunterzufahren ExecutorService
, da unser Programm sonst nicht beendet wird. Executors
Es gibt andere Fabrikmethoden in der Fabrik . Wir können beispielsweise einen Pool mit nur einem Thread erstellen – newSingleThreadExecutor
oder einen Pool mit Caching newCachedThreadPool
, bei dem Threads aus dem Pool entfernt werden, wenn sie eine Minute lang inaktiv sind. Tatsächlich ExecutorService
befindet sich dahinter eine Blockierungswarteschlange , in die Aufgaben gestellt werden und von der aus diese Aufgaben ausgeführt werden. Weitere Informationen zu Blocking Queues finden Sie im Video „ Blocking Queue – Collections #5 – Advanced Java “. Sie können auch die Rezension „ Blockieren von Warteschlangen des gleichzeitigen Pakets “ und die Antwort auf die Frage „ Wann sollte man LinkedBlockingQueue gegenüber ArrayBlockingQueue bevorzugen? “ lesen. Super vereinfacht – BlockingQueue
(Blockierungswarteschlange) blockiert einen Thread in zwei Fällen:
- Ein Thread versucht, Elemente aus einer leeren Warteschlange abzurufen
- Der Thread versucht, Elemente in eine volle Warteschlange zu stellen
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
oder
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Wie wir sehen können, werden Implementierungen innerhalb von Factory-Methoden erstellt ExecutorService
. Und das ist es im Grunde ThreadPoolExecutor
. Nur die Attribute, die sich auf die Arbeit auswirken, ändern sich.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Wie wir zuvor gesehen haben, sind Inside-Factory-MethodenThreadPoolExecutor
, . Die Funktionalität wird davon beeinflusst, welche Werte als maximale und minimale Threads übergeben werden und welche Warteschlange verwendet wird. Und jede Implementierung der Schnittstelle kann verwendet werden java.util.concurrent.BlockingQueue
. Apropos ThreadPoolExecutor
'ahs', es ist erwähnenswert, dass es während des Betriebs interessante Funktionen gibt. Beispielsweise können Sie keine Aufgaben an senden, ThreadPoolExecutor
wenn dort kein Platz vorhanden ist:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Dieser Code schlägt mit einem Fehler wie dem folgenden fehl:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Das heißt, task
Sie können nicht einreichen, weil SynchronousQueue
Es ist so konzipiert, dass es tatsächlich aus einem Element besteht und es nicht möglich ist, dort mehr zu platzieren. Wie wir sehen können, queued tasks
gibt es hier 0, und daran ist nichts Seltsames, denn Dies ist spezifisch SynchronousQueue
– tatsächlich handelt es sich um eine Warteschlange mit einem Element, das immer leer ist. (!) Wenn ein Thread ein Element in die Warteschlange stellt, wartet er, bis ein anderer Thread das Element aus der Warteschlange nimmt. Daher können wir durch ersetzen new LinkedBlockingQueue<>(1)
und was im Fehler angezeigt wird, wird sich ändern queued tasks = 1
. Weil Wenn die Warteschlange nur aus einem Element besteht, können wir kein zweites hinzufügen. Und wir werden darauf hereinfallen. Um das Thema der Warteschlange fortzusetzen, ist es erwähnenswert, dass die Klasse ThreadPoolExecutor
über zusätzliche Methoden zur Bedienung der Warteschlange verfügt. Beispielsweise threadPoolExecutor.purge()
entfernt die Methode alle abgebrochenen Aufgaben aus der Warteschlange, um Platz in der Warteschlange freizugeben. Ein weiteres interessantes Feature im Zusammenhang mit der Warteschlange ist der Handler für nicht akzeptierte Aufgaben:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Beispielsweise gibt der Handler einfach ein Wort Rejected
für jede Weigerung aus, eine Aufgabe in die Warteschlange aufzunehmen. Praktisch, nicht wahr? Darüber hinaus ThreadPoolExecutor
hat er einen interessanten Erben – ScheduledThreadPoolExecutor
nämlich ScheduledExecutorService
. Es bietet die Möglichkeit, eine Aufgabe mit einem Timer auszuführen.
ScheduledExecutorService
ExecutorService
Mit diesem Typ ScheduledExecutorService
können Sie Aufgaben nach einem Zeitplan ausführen. Schauen wir uns ein Beispiel an:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Hier ist alles einfach. Aufgaben werden gesendet, wir erhalten eine „geplante Aufgabe“ java.util.concurrent.ScheduledFuture
. Der folgende Fall kann für den Zeitplan ebenfalls nützlich sein:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Hier senden wir Runnable
die auszuführende Aufgabe mit einer bestimmten Verzögerung zu einer festen Rate (Fixed Rate). Beginnen Sie in diesem Fall nach 1 Sekunde alle 2 Sekunden mit der Ausführung der Aufgabe. Es gibt eine ähnliche Option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Aber hier werden Aufgaben in einem bestimmten Intervall ZWISCHEN der Ausführung verschiedener Aufgaben ausgeführt. Das heißt, die Aufgabe task
wird in 1 Sekunde erledigt. Sobald der Vorgang abgeschlossen ist, vergehen 2 Sekunden und dann wird eine neue Aufgabe gestartet. Zu diesem Thema können Sie folgende Materialien lesen:
- Eine Einführung in Thread-Pools
- Einführung in Thread-Pools
- Java Multithreading Steeplechase: Abbrechen von Aufgaben in Executors
- Auswahl der richtigen Java-Executors für Hintergrundaufgaben
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Zusätzlich zu den oben genannten Thread-Pools gibt es noch einen weiteren. Man könnte sagen, er ist etwas Besonderes. Sein Name ist Work Stealing Pool. Kurz gesagt ist Work Stealing ein Arbeitsalgorithmus, bei dem inaktive Threads beginnen, Aufgaben von anderen Threads oder Aufgaben aus der allgemeinen Warteschlange zu übernehmen. Schauen wir uns ein Beispiel an:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Wenn wir diesen Code ausführen, ExecutorService
werden 5 Threads erstellt, weil Jeder Thread wird in die Warteschlange am Standort des Objekts aufgenommen lock
. Über Monitore und Sperren haben wir bereits in „ You Can’t Spoil Java with a Thread: Part II – Synchronization “ gesprochen. Executors.newCachedThreadPool
Und jetzt werden wir es durch ersetzen Executors.newWorkStealingPool()
. Was wird sich ändern? Wir werden sehen, dass unsere Aufgaben nicht in 5 Threads, sondern in weniger ausgeführt werden. Erinnern Sie sich, dass cachedThreadPool
Sie für jede Aufgabe einen eigenen Thread erstellt haben? Weil wait
es den Thread blockierte, aber die nächsten Aufgaben ausgeführt werden wollten und dafür neue Threads im Pool erstellt wurden. Im Fall von StealingPool
Threads bleiben sie nicht ewig im Leerlauf wait
, sondern beginnen mit der Ausführung benachbarter Aufgaben. Wie unterscheidet sich das von anderen Thread-Pools WorkStealingPool
? Denn tatsächlich lebt in ihm etwas Magisches ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Es gibt tatsächlich noch einen weiteren Unterschied. Threads, die standardmäßig erstellt werden, ForkJoinPool
sind Daemon-Threads, im Gegensatz zu Threads, die über reguläre Threads erstellt werden ThreadPool
. Im Allgemeinen lohnt es sich, sich an Daemon-Threads zu erinnern, weil... Beispielsweise CompletableFuture
werden auch Daemon-Threads verwendet, wenn Sie keine eigenen Threads angeben ThreadFactory
, wodurch Nicht-Daemon-Threads erstellt werden. Solche Überraschungen können Sie an einem unerwarteten Ort erwarten!)
Fork/Join-Pool
In diesem Teil werden wir über dasselbe sprechenForkJoinPool
(auch Fork/Join-Framework genannt), das „unter der Haube“ von WorkStealingPool
. Im Allgemeinen erschien das Fork Join Framework in Java 1.7. Und auch wenn Java 11 bereits auf dem Markt ist, lohnt es sich dennoch, daran zu denken. Nicht die häufigste Aufgabe, aber durchaus interessant. Zu diesem Thema gibt es im Internet eine gute Rezension: „ Fork/Join Framework in Java 7 “. Fork/JoinPool
operiert in seiner Arbeit mit einem solchen Konzept wie java.util.concurrent.RecursiveTask
. Es gibt auch ein Analogon - java.util.concurrent.RecursiveAction
. RecursiveActions geben kein Ergebnis zurück. Also RecursiveTask
ähnlich Callable
und RecursiveAction
ähnlich Runnable
. Wenn wir uns den Namen ansehen, sehen wir zwei Schlüsselmethoden – fork
und join
. Die Methode fork
führt eine Aufgabe asynchron in einem separaten Thread aus. Und mit dieser Methode join
können Sie warten, bis die Arbeit abgeschlossen ist. Es gibt mehrere Möglichkeiten, es zu verwenden: Dieses Bild ist Teil einer Folie aus Alexey Shipilevs Bericht „ Fork/Join: Implementierung, Verwendung, Leistung “. Um es klarer zu machen, lohnt es sich, seinen Bericht auf der JEE CONF anzusehen: „ Fork-Join-Implementierungsfunktionen “.
Zusammenfassend
Hier sind wir also und beenden den nächsten Teil der Rezension. Wir haben herausgefunden, was wir uns zuerst zum Ausführen von Threads ausgedacht habenExecutor
. Dann beschlossen wir, die Idee weiterzuführen und kamen auf die Idee ExecutorService
. ermöglicht es Ihnen, Aufgaben mit und ExecutorService
zur Ausführung zu senden und den Dienst zu verwalten, indem Sie ihn deaktivieren. Weil „Wir brauchen Implementierungen, wir haben eine Klasse mit Factory-Methoden geschrieben und sie genannt .“ Es ermöglicht Ihnen, Thread-Pools zu erstellen . Gleichzeitig gibt es Thread-Pools, in denen Sie auch einen Zeitplan für die Ausführung festlegen können, der jedoch dahinter verborgen ist . Ich hoffe, dass das, was oben geschrieben wurde, für Sie nicht nur interessant, sondern auch verständlich war. Ich freue mich immer über Anregungen und Kommentare. #Wjatscheslawsubmit
invoke
ExecutorService
Executors
ThreadPoolExecutor
WorkStealingPool
ForkJoinPool
GO TO FULL VERSION