JavaRush /Blog Java /Random-FR /Vous ne pouvez pas gâcher Java avec un thread : partie V ...
Viacheslav
Niveau 3

Vous ne pouvez pas gâcher Java avec un thread : partie V - Exécuteur, ThreadPool, Fork Join

Publié dans le groupe Random-FR

Introduction

Ainsi, nous savons qu'il existe des threads en Java, que vous pouvez lire dans la revue « Vous ne pouvez pas gâcher Java avec un thread : partie I - Threads ». Vous ne pouvez pas gâcher Java avec un thread : Partie V – Executor, ThreadPool, Fork Join – 1Regardons à nouveau l'exemple de code :
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Comme on peut le constater, le code de lancement de la tâche est assez standard, mais à chaque nouveau lancement nous devrons le répéter. Une solution consiste à le déplacer vers une méthode distincte, par exemple execute(Runnable runnable). Mais les développeurs Java se sont déjà inquiétés de nous et ont imaginé une interface Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Comme vous pouvez le constater, le code est devenu plus concis et nous a permis d'écrire simplement du code pour l'exécuter Runnabledans un thread. Super, n'est-ce pas ? Mais ce n'est que le début: Vous ne pouvez pas gâcher Java avec un thread : Partie V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Comme vous pouvez le voir, l'interface Executora une interface descendante ExecutorService. Le JavaDoc de cette interface indique qu'il ExecutorServices'agit d'une description d'un Executor« a » spécial qui fournit des méthodes pour arrêter le travail Executor« a » et vous permet de java.util.concurrent.Futuresuivre la progression de l'exécution. Auparavant, dans « Vous ne pouvez pas gâcher Java avec Thread : Partie IV - Callable, Future and Friends », nous avons brièvement passé en revue les possibilités Future. Si vous l'avez oublié ou ne l'avez pas lu, je vous conseille de vous rafraîchir la mémoire ;) Quelles autres choses intéressantes sont écrites dans JavaDoc ? Que nous disposons d'une usine spéciale java.util.concurrent.Executorsqui nous permet de créer des implémentations disponibles par défaut ExecutorService.

ExécuteurService

Souvenons-nous encore. Nous devons Executorexécuter (c'est-à-dire exécuter) une certaine tâche dans un thread, lorsque l'implémentation de la création d'un thread nous est cachée. Nous en avons ExecutorServiceun spécial Executorqui dispose d'un ensemble de capacités pour gérer la progression de l'exécution. Et nous avons une usine Executorsqui vous permet de créer ExecutorService. Faisons-le nous-mêmes maintenant :
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Comme nous pouvons le voir, nous avons spécifié un pool de threads fixe ( Fixed Thread Pool) de taille 2. Après quoi nous envoyons les tâches au pool une par une. Chaque tâche renvoie une chaîne ( String) contenant le nom du thread ( currentThread().getName()). Il est important de s'arrêter à la toute fin ExecutorService, sinon notre programme ne se fermera pas. ExecutorsIl existe d'autres méthodes d'usine en usine . Par exemple, nous pouvons créer un pool d'un seul thread - newSingleThreadExecutorou un pool avec mise en cache newCachedThreadPool, où les threads seront supprimés du pool s'ils sont inactifs pendant 1 minute. En fait, derrière ceux-ci ExecutorServicese trouve une file d’attente de blocage dans laquelle les tâches sont placées et à partir de laquelle ces tâches sont exécutées. Plus d'informations sur le blocage des files d'attente peuvent être vues dans la vidéo " File d'attente de blocage - Collections #5 - Java avancé ". Vous pouvez également lire la revue « Blocage des files d'attente du package concurrent » et la réponse à la question « Quand préférer LinkedBlockingQueue à ArrayBlockingQueue ? » Super simplifié - BlockingQueue(file d'attente de blocage) bloque un thread, dans deux cas :
  • un thread essaie d'obtenir des éléments d'une file d'attente vide
  • le thread essaie de mettre des éléments dans une file d'attente complète
Si nous regardons la mise en œuvre des méthodes d’usine, nous pouvons voir comment elles sont structurées. Par exemple:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
ou
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Comme nous pouvons le voir, les implémentations sont créées à l'intérieur des méthodes d'usine ExecutorService. Et c'est essentiellement tout ThreadPoolExecutor. Seuls les attributs qui affectent le travail changent. Vous ne pouvez pas ruiner Java avec un thread : Partie V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExécuteur

Comme nous l'avons vu précédemment, à l'intérieur des méthodes d'usine ThreadPoolExecutor, . La fonctionnalité est affectée par les valeurs transmises comme threads maximum et minimum, ainsi que par la file d'attente utilisée. Et n'importe quelle implémentation de l'interface peut être utilisée java.util.concurrent.BlockingQueue. En parlant de ThreadPoolExecutor'ahs, il convient de noter des fonctionnalités intéressantes pendant le fonctionnement. Par exemple, vous ne pouvez pas envoyer de tâches ThreadPoolExecutors'il n'y a pas d'espace à cet endroit :
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ce code échouera avec une erreur du type :
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Autrement dit, taskvous ne pouvez pas soumettre, car SynchronousQueueil est conçu de telle manière qu'il se compose en réalité d'un seul élément et ne permet pas d'y mettre plus. Comme nous pouvons le voir, queued tasksil y a 0 ici, et il n'y a rien d'étrange à cela, car c'est spécifique SynchronousQueue- en fait, c'est une file d'attente de 1 élément, qui est toujours vide. (!) Lorsqu'un thread place un élément dans la file d'attente, il attendra qu'un autre thread prenne l'élément de la file d'attente. Par conséquent, nous pouvons remplacer par new LinkedBlockingQueue<>(1)et ce qui sera indiqué dans l’erreur changera queued tasks = 1. Parce que la file d'attente ne contient qu'un seul élément, nous ne pouvons donc pas ajouter le second. Et nous tomberons là-dessus. Poursuivant le thème de la file d'attente, il convient de noter que la classe ThreadPoolExecutordispose de méthodes supplémentaires pour gérer la file d'attente. Par exemple, la méthode threadPoolExecutor.purge()supprimera toutes les tâches annulées de la file d'attente pour libérer de l'espace dans la file d'attente. Une autre fonctionnalité intéressante liée à la file d'attente est le gestionnaire de tâches non acceptées :
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Par exemple, le gestionnaire imprime simplement un mot Rejectedpour chaque refus d'accepter une tâche dans la file d'attente. Pratique, n'est-ce pas ? De plus, ThreadPoolExecutoril a un héritier intéressant - ScheduledThreadPoolExecutorqui est ScheduledExecutorService. Il offre la possibilité d’effectuer une tâche avec une minuterie.

Service d'exécution planifié

ExecutorServicetype ScheduledExecutorServicevous permet d’exécuter des tâches selon un calendrier. Regardons un exemple :
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Tout est simple ici. Les tâches sont envoyées, nous recevons une « tâche planifiée » java.util.concurrent.ScheduledFuture. Le cas suivant peut également être utile avec le planning :
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ici, nous envoyons Runnablela tâche à exécuter à un taux fixe (Fixed Rate) avec un certain délai. Dans ce cas, après 1 seconde toutes les 2 secondes, commencez à exécuter la tâche. Il existe une option similaire :
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Mais ici les tâches sont exécutées avec un intervalle donné ENTRE l'exécution des différentes tâches. Autrement dit, la tâche tasksera terminée en 1 seconde. Ensuite, dès qu'elle est terminée, 2 secondes s'écouleront, puis une nouvelle tâche sera lancée. Vous pouvez lire les documents suivants sur ce sujet : Vous ne pouvez pas ruiner Java avec un thread : Partie V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

TravailVolerPool

En plus des pools de threads mentionnés ci-dessus, il en existe un autre. On pourrait dire qu'il est un peu spécial. Son nom est Work Stealing Pool. En bref, Work Stealing est un algorithme de travail dans lequel les threads inactifs commencent à prendre des tâches d'autres threads ou des tâches de la file d'attente générale. Regardons un exemple :
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Si nous exécutons ce code, ExecutorServicecela créera 5 threads, car chaque thread rejoindra la file d'attente à l'emplacement de l'objet lock. Nous avons déjà discuté des moniteurs et des verrous dans « Vous ne pouvez pas ruiner Java avec un fil : partie II - synchronisation ». Et maintenant nous allons le remplacer Executors.newCachedThreadPoolpar Executors.newWorkStealingPool(). Qu'est-ce qui va changer ? Nous verrons que nos tâches sont effectuées non pas en 5 threads, mais en moins. Vous souvenez-vous que cachedThreadPoolvous avez créé votre propre fil de discussion pour chaque tâche ? Parce que waitcela bloquait le thread, mais les tâches suivantes voulaient être exécutées et de nouveaux threads étaient créés dans le pool pour elles. Dans le cas des StealingPoolthreads, ils ne resteront pas inactifs éternellement wait; ils commenceront à exécuter des tâches voisines. En quoi est-ce si différent des autres pools de threads WorkStealingPool? Parce qu'il y a vraiment quelque chose de magique qui vit en lui ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Il y a en fait une autre différence. Les threads créés par ForkJoinPooldéfaut sont des threads démons, par opposition aux threads créés via Regular ThreadPool. En général, il convient de se souvenir des threads démons, car... par exemple, CompletableFuturedes threads démons sont également utilisés, si vous ne spécifiez pas les vôtres ThreadFactory, ce qui créera des threads non démons. C'est le genre de surprises qui peuvent vous attendre dans un endroit inattendu !)

Fork/Rejoindre le pool

Dans cette partie, nous parlerons du même framework ForkJoinPool(également appelé framework fork/join) qui vit « sous le capot » de WorkStealingPool. En général, le Fork Join Framework est apparu dans Java 1.7. Et même si Java 11 est déjà dans la cour, cela vaut quand même la peine de le rappeler. Ce n'est pas la tâche la plus courante, mais plutôt intéressante. Il existe une bonne critique sur ce sujet sur Internet : « Fork/Join Framework in Java 7 ». Fork/JoinPoolopère dans son travail avec un concept tel que java.util.concurrent.RecursiveTask. Il existe également un analogue - java.util.concurrent.RecursiveAction. Les RecursiveActions ne renvoient pas de résultat. Donc RecursiveTasksimilaire à Callable, et RecursiveActionsimilaire à Runnable. Eh bien, en regardant le nom, nous voyons deux méthodes clés - forket join. La méthode forkexécute une tâche de manière asynchrone dans un thread distinct. Et la méthode joinpermet d'attendre la fin des travaux. Il existe plusieurs façons de l'utiliser : Vous ne pouvez pas ruiner Java avec un thread : Partie V - Executor, ThreadPool, Fork Join - 5Cette image fait partie d'une diapositive du rapport d'Alexey Shipilev « Fork/Join : mise en œuvre, utilisation, performances ». Pour que ce soit plus clair, cela vaut la peine de regarder son rapport à la JEE CONF : " Fork Join Implementation Features ".

Résumer

Nous voilà donc en train de terminer la prochaine partie de la revue. Nous avons compris ce que nous avions trouvé en premier Executorpour exécuter les threads. Ensuite, nous avons décidé de poursuivre l'idée et avons eu l'idée ExecutorService. ExecutorServicevous permet d'envoyer des tâches pour exécution à l'aide submitde et invoke, ainsi que de gérer le service en le désactivant. Parce que ExecutorService« Nous avons besoin d'implémentations, nous avons écrit une classe avec des méthodes d'usine et l'avons appelée Executors. Il vous permet de créer des pools de threads ThreadPoolExecutor. Dans le même temps, il existe des pools de threads qui vous permettent également de spécifier un calendrier d'exécution, mais WorkStealingPoolqui se cachent ForkJoinPool. J'espère que ce qui a été écrit ci-dessus vous a non seulement intéressé, mais aussi compréhensible). Je suis toujours heureux de recevoir des suggestions et des commentaires. #Viacheslav
Commentaires
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION