Introduction
Ainsi, nous savons qu'il existe des threads en Java, que vous pouvez lire dans la revue « Vous ne pouvez pas gâcher Java avec un thread : partie I - Threads ». Regardons à nouveau l'exemple de code :public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Comme on peut le constater, le code de lancement de la tâche est assez standard, mais à chaque nouveau lancement nous devrons le répéter. Une solution consiste à le déplacer vers une méthode distincte, par exemple execute(Runnable runnable)
. Mais les développeurs Java se sont déjà inquiétés de nous et ont imaginé une interface Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Comme vous pouvez le constater, le code est devenu plus concis et nous a permis d'écrire simplement du code pour l'exécuter Runnable
dans un thread. Super, n'est-ce pas ? Mais ce n'est que le début:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
a une interface descendante ExecutorService
. Le JavaDoc de cette interface indique qu'il ExecutorService
s'agit d'une description d'un Executor
« a » spécial qui fournit des méthodes pour arrêter le travail Executor
« a » et vous permet de java.util.concurrent.Future
suivre la progression de l'exécution. Auparavant, dans « Vous ne pouvez pas gâcher Java avec Thread : Partie IV - Callable, Future and Friends », nous avons brièvement passé en revue les possibilités Future
. Si vous l'avez oublié ou ne l'avez pas lu, je vous conseille de vous rafraîchir la mémoire ;) Quelles autres choses intéressantes sont écrites dans JavaDoc ? Que nous disposons d'une usine spéciale java.util.concurrent.Executors
qui nous permet de créer des implémentations disponibles par défaut ExecutorService
.
ExécuteurService
Souvenons-nous encore. Nous devonsExecutor
exécuter (c'est-à-dire exécuter) une certaine tâche dans un thread, lorsque l'implémentation de la création d'un thread nous est cachée. Nous en avons ExecutorService
un spécial Executor
qui dispose d'un ensemble de capacités pour gérer la progression de l'exécution. Et nous avons une usine Executors
qui vous permet de créer ExecutorService
. Faisons-le nous-mêmes maintenant :
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Comme nous pouvons le voir, nous avons spécifié un pool de threads fixe ( Fixed Thread Pool
) de taille 2. Après quoi nous envoyons les tâches au pool une par une. Chaque tâche renvoie une chaîne ( String
) contenant le nom du thread ( currentThread().getName()
). Il est important de s'arrêter à la toute fin ExecutorService
, sinon notre programme ne se fermera pas. Executors
Il existe d'autres méthodes d'usine en usine . Par exemple, nous pouvons créer un pool d'un seul thread - newSingleThreadExecutor
ou un pool avec mise en cache newCachedThreadPool
, où les threads seront supprimés du pool s'ils sont inactifs pendant 1 minute. En fait, derrière ceux-ci ExecutorService
se trouve une file d’attente de blocage dans laquelle les tâches sont placées et à partir de laquelle ces tâches sont exécutées. Plus d'informations sur le blocage des files d'attente peuvent être vues dans la vidéo " File d'attente de blocage - Collections #5 - Java avancé ". Vous pouvez également lire la revue « Blocage des files d'attente du package concurrent » et la réponse à la question « Quand préférer LinkedBlockingQueue à ArrayBlockingQueue ? » Super simplifié - BlockingQueue
(file d'attente de blocage) bloque un thread, dans deux cas :
- un thread essaie d'obtenir des éléments d'une file d'attente vide
- le thread essaie de mettre des éléments dans une file d'attente complète
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ou
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Comme nous pouvons le voir, les implémentations sont créées à l'intérieur des méthodes d'usine ExecutorService
. Et c'est essentiellement tout ThreadPoolExecutor
. Seuls les attributs qui affectent le travail changent.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExécuteur
Comme nous l'avons vu précédemment, à l'intérieur des méthodes d'usineThreadPoolExecutor
, . La fonctionnalité est affectée par les valeurs transmises comme threads maximum et minimum, ainsi que par la file d'attente utilisée. Et n'importe quelle implémentation de l'interface peut être utilisée java.util.concurrent.BlockingQueue
. En parlant de ThreadPoolExecutor
'ahs, il convient de noter des fonctionnalités intéressantes pendant le fonctionnement. Par exemple, vous ne pouvez pas envoyer de tâches ThreadPoolExecutor
s'il n'y a pas d'espace à cet endroit :
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ce code échouera avec une erreur du type :
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Autrement dit, task
vous ne pouvez pas soumettre, car SynchronousQueue
il est conçu de telle manière qu'il se compose en réalité d'un seul élément et ne permet pas d'y mettre plus. Comme nous pouvons le voir, queued tasks
il y a 0 ici, et il n'y a rien d'étrange à cela, car c'est spécifique SynchronousQueue
- en fait, c'est une file d'attente de 1 élément, qui est toujours vide. (!) Lorsqu'un thread place un élément dans la file d'attente, il attendra qu'un autre thread prenne l'élément de la file d'attente. Par conséquent, nous pouvons remplacer par new LinkedBlockingQueue<>(1)
et ce qui sera indiqué dans l’erreur changera queued tasks = 1
. Parce que la file d'attente ne contient qu'un seul élément, nous ne pouvons donc pas ajouter le second. Et nous tomberons là-dessus. Poursuivant le thème de la file d'attente, il convient de noter que la classe ThreadPoolExecutor
dispose de méthodes supplémentaires pour gérer la file d'attente. Par exemple, la méthode threadPoolExecutor.purge()
supprimera toutes les tâches annulées de la file d'attente pour libérer de l'espace dans la file d'attente. Une autre fonctionnalité intéressante liée à la file d'attente est le gestionnaire de tâches non acceptées :
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Par exemple, le gestionnaire imprime simplement un mot Rejected
pour chaque refus d'accepter une tâche dans la file d'attente. Pratique, n'est-ce pas ? De plus, ThreadPoolExecutor
il a un héritier intéressant - ScheduledThreadPoolExecutor
qui est ScheduledExecutorService
. Il offre la possibilité d’effectuer une tâche avec une minuterie.
Service d'exécution planifié
ExecutorService
type ScheduledExecutorService
vous permet d’exécuter des tâches selon un calendrier. Regardons un exemple :
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Tout est simple ici. Les tâches sont envoyées, nous recevons une « tâche planifiée » java.util.concurrent.ScheduledFuture
. Le cas suivant peut également être utile avec le planning :
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ici, nous envoyons Runnable
la tâche à exécuter à un taux fixe (Fixed Rate) avec un certain délai. Dans ce cas, après 1 seconde toutes les 2 secondes, commencez à exécuter la tâche. Il existe une option similaire :
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Mais ici les tâches sont exécutées avec un intervalle donné ENTRE l'exécution des différentes tâches. Autrement dit, la tâche task
sera terminée en 1 seconde. Ensuite, dès qu'elle est terminée, 2 secondes s'écouleront, puis une nouvelle tâche sera lancée. Vous pouvez lire les documents suivants sur ce sujet :
- Une introduction aux pools de threads
- Introduction aux pools de threads
- Java Multithreading Steeplechase : annulation de tâches dans les exécuteurs
- Choisir les bons exécuteurs Java pour les tâches en arrière-plan
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
TravailVolerPool
En plus des pools de threads mentionnés ci-dessus, il en existe un autre. On pourrait dire qu'il est un peu spécial. Son nom est Work Stealing Pool. En bref, Work Stealing est un algorithme de travail dans lequel les threads inactifs commencent à prendre des tâches d'autres threads ou des tâches de la file d'attente générale. Regardons un exemple :public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Si nous exécutons ce code, ExecutorService
cela créera 5 threads, car chaque thread rejoindra la file d'attente à l'emplacement de l'objet lock
. Nous avons déjà discuté des moniteurs et des verrous dans « Vous ne pouvez pas ruiner Java avec un fil : partie II - synchronisation ». Et maintenant nous allons le remplacer Executors.newCachedThreadPool
par Executors.newWorkStealingPool()
. Qu'est-ce qui va changer ? Nous verrons que nos tâches sont effectuées non pas en 5 threads, mais en moins. Vous souvenez-vous que cachedThreadPool
vous avez créé votre propre fil de discussion pour chaque tâche ? Parce que wait
cela bloquait le thread, mais les tâches suivantes voulaient être exécutées et de nouveaux threads étaient créés dans le pool pour elles. Dans le cas des StealingPool
threads, ils ne resteront pas inactifs éternellement wait
; ils commenceront à exécuter des tâches voisines. En quoi est-ce si différent des autres pools de threads WorkStealingPool
? Parce qu'il y a vraiment quelque chose de magique qui vit en lui ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Il y a en fait une autre différence. Les threads créés par ForkJoinPool
défaut sont des threads démons, par opposition aux threads créés via Regular ThreadPool
. En général, il convient de se souvenir des threads démons, car... par exemple, CompletableFuture
des threads démons sont également utilisés, si vous ne spécifiez pas les vôtres ThreadFactory
, ce qui créera des threads non démons. C'est le genre de surprises qui peuvent vous attendre dans un endroit inattendu !)
Fork/Rejoindre le pool
Dans cette partie, nous parlerons du même frameworkForkJoinPool
(également appelé framework fork/join) qui vit « sous le capot » de WorkStealingPool
. En général, le Fork Join Framework est apparu dans Java 1.7. Et même si Java 11 est déjà dans la cour, cela vaut quand même la peine de le rappeler. Ce n'est pas la tâche la plus courante, mais plutôt intéressante. Il existe une bonne critique sur ce sujet sur Internet : « Fork/Join Framework in Java 7 ». Fork/JoinPool
opère dans son travail avec un concept tel que java.util.concurrent.RecursiveTask
. Il existe également un analogue - java.util.concurrent.RecursiveAction
. Les RecursiveActions ne renvoient pas de résultat. Donc RecursiveTask
similaire à Callable
, et RecursiveAction
similaire à Runnable
. Eh bien, en regardant le nom, nous voyons deux méthodes clés - fork
et join
. La méthode fork
exécute une tâche de manière asynchrone dans un thread distinct. Et la méthode join
permet d'attendre la fin des travaux. Il existe plusieurs façons de l'utiliser : Cette image fait partie d'une diapositive du rapport d'Alexey Shipilev « Fork/Join : mise en œuvre, utilisation, performances ». Pour que ce soit plus clair, cela vaut la peine de regarder son rapport à la JEE CONF : " Fork Join Implementation Features ".
Résumer
Nous voilà donc en train de terminer la prochaine partie de la revue. Nous avons compris ce que nous avions trouvé en premierExecutor
pour exécuter les threads. Ensuite, nous avons décidé de poursuivre l'idée et avons eu l'idée ExecutorService
. ExecutorService
vous permet d'envoyer des tâches pour exécution à l'aide submit
de et invoke
, ainsi que de gérer le service en le désactivant. Parce que ExecutorService
« Nous avons besoin d'implémentations, nous avons écrit une classe avec des méthodes d'usine et l'avons appelée Executors
. Il vous permet de créer des pools de threads ThreadPoolExecutor
. Dans le même temps, il existe des pools de threads qui vous permettent également de spécifier un calendrier d'exécution, mais WorkStealingPool
qui se cachent ForkJoinPool
. J'espère que ce qui a été écrit ci-dessus vous a non seulement intéressé, mais aussi compréhensible). Je suis toujours heureux de recevoir des suggestions et des commentaires. #Viacheslav
GO TO FULL VERSION