JavaRush /Blog Java /Random-ES /No se puede estropear Java con un hilo: Parte V: Ejecutor...
Viacheslav
Nivel 3

No se puede estropear Java con un hilo: Parte V: Ejecutor, ThreadPool, unión a la bifurcación

Publicado en el grupo Random-ES

Introducción

Entonces, sabemos que hay subprocesos en Java, sobre los cuales puede leer en la revisión " No se puede estropear Java con un subproceso: Parte I - Subprocesos ". No puedes estropear Java con un hilo: Parte V - Ejecutor, ThreadPool, Fork Join - 1Miremos el código de muestra nuevamente:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Como vemos, el código para ejecutar la tarea es bastante estándar, pero para cada nuevo lanzamiento tendremos que repetirlo. Una solución es moverlo a un método separado, por ejemplo execute(Runnable runnable). Pero los desarrolladores de Java ya se preocuparon por nosotros y crearon una interfaz Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Como puede ver, el código se ha vuelto más conciso y nos permite simplemente escribir código para ejecutarlo Runnableen un hilo. Genial, ¿no? Pero esto es solo el principio: No puedes estropear Java con un hilo: Parte V - Ejecutor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Como puede ver, la interfaz Executortiene una interfaz descendiente ExecutorService. El JavaDoc de esta interfaz dice que ExecutorServicees una descripción de una Executor'a' especial que proporciona métodos para detener el trabajo Executor'a' y le permite realizar un java.util.concurrent.Futureseguimiento del progreso de la ejecución. Anteriormente, en " No se puede estropear Java con subprocesos: Parte IV: invocables, futuros y amigos ", revisamos brevemente las posibilidades Future. Si lo olvidaste o no lo has leído, te aconsejo que refresques tu memoria ;) ¿Qué otras cosas interesantes están escritas en JavaDoc? Que tenemos una fábrica especial java.util.concurrent.Executorsque nos permite crear implementaciones que están disponibles por defecto ExecutorService.

Servicio Ejecutor

Recordemos de nuevo. Tenemos Executorque ejecutar (es decir, ejecutar) una determinada tarea en un hilo, cuando la implementación de la creación de un hilo está oculta para nosotros. Tenemos ExecutorServiceuno especial Executorque tiene un conjunto de capacidades para gestionar el progreso de la ejecución. Y tenemos una fábrica Executorsque te permite crear ExecutorService. Hagámoslo nosotros mismos ahora:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Como podemos ver, hemos especificado un grupo de subprocesos fijo ( Fixed Thread Pool) de tamaño 2. Después de lo cual enviamos tareas al grupo una por una. Cada tarea devuelve una cadena ( String) que contiene el nombre del hilo ( currentThread().getName()). Es importante cerrar al final ExecutorService, porque de lo contrario nuestro programa no saldrá. ExecutorsHay otros métodos de fábrica en la fábrica . Por ejemplo, podemos crear un grupo de un solo subproceso, newSingleThreadExecutoro un grupo con almacenamiento en caché newCachedThreadPool, donde los subprocesos se eliminarán del grupo si están inactivos durante 1 minuto. De hecho, detrás de estos ExecutorServicehay una cola de bloqueo en la que se colocan las tareas y desde la que se ejecutan. Puede ver más información sobre el bloqueo de colas en el vídeo " Cola de bloqueo - Colecciones n.° 5 - Java avanzado ". También puede leer la reseña " Bloqueo de colas del paquete concurrente " y la respuesta a la pregunta "¿ Cuándo preferir LinkedBlockingQueue a ArrayBlockingQueue? " Súper simplificado: BlockingQueue(cola de bloqueo) bloquea un hilo, en dos casos:
  • un hilo está intentando obtener elementos de una cola vacía
  • el hilo está intentando poner elementos en una cola llena
Si observamos la implementación de los métodos de fábrica, podemos ver cómo están estructurados. Por ejemplo:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
o
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Como podemos ver, las implementaciones se crean dentro de los métodos de fábrica ExecutorService. Y eso es básicamente todo ThreadPoolExecutor. Sólo cambian los atributos que afectan el trabajo. No puedes arruinar Java con un hilo: Parte V - Ejecutor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolEjecutor

Como vimos anteriormente, dentro de los métodos de fábrica ThreadPoolExecutor. La funcionalidad se ve afectada por los valores que se pasan como subprocesos máximo y mínimo, así como por la cola que se utiliza. Y se puede utilizar cualquier implementación de la interfaz java.util.concurrent.BlockingQueue. Hablando de ThreadPoolExecutor'ahs, vale la pena señalar características interesantes durante la operación. Por ejemplo, no puedes enviar tareas a ThreadPoolExecutorsi no hay espacio allí:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Este código fallará con un error como:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Es decir, taskno puedes presentarte, porque SynchronousQueueestá diseñado de tal manera que en realidad consta de un elemento y no permite colocar más allí. Como podemos ver, queued tasksaquí hay 0, y no hay nada extraño en esto, porque esto es específico SynchronousQueue; de hecho, es una cola de 1 elemento, que siempre está vacía. (!) Cuando un hilo pone un elemento en la cola, esperará hasta que otro hilo tome el elemento de la cola. Por tanto, podemos sustituir por new LinkedBlockingQueue<>(1)y cambiará lo que se indicará en el error queued tasks = 1. Porque la cola tiene solo 1 elemento, entonces no podemos agregar el segundo. Y en esto caeremos. Continuando con el tema de la cola, vale la pena señalar que la clase ThreadPoolExecutortiene métodos adicionales para dar servicio a la cola. Por ejemplo, el método threadPoolExecutor.purge()eliminará todas las tareas canceladas de la cola para liberar espacio en la cola. Otra característica interesante relacionada con la cola es el controlador de tareas no aceptadas:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Por ejemplo, el controlador simplemente imprime una palabra Rejectedpor cada negativa a aceptar una tarea en la cola. Conveniente, ¿no? Además, ThreadPoolExecutortiene un heredero interesante ScheduledThreadPoolExecutor: ScheduledExecutorService. Proporciona la posibilidad de realizar una tarea con un temporizador.

Servicio de ejecución programado

ExecutorServiceEl tipo ScheduledExecutorServicele permite ejecutar tareas según un cronograma. Veamos un ejemplo:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Aquí todo es sencillo. Se envían tareas, recibimos una “tarea programada” java.util.concurrent.ScheduledFuture. El siguiente caso también puede resultar útil con el cronograma:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aquí enviamos Runnablela tarea a ejecutar a una tarifa fija (Tasa Fija) con un cierto retraso. En este caso, después de 1 segundo cada 2 segundos, comience a ejecutar la tarea. Hay una opción similar:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Pero aquí las tareas se ejecutan con un intervalo determinado ENTRE la ejecución de diferentes tareas. Es decir, la tarea taskse completará en 1 segundo. A continuación, tan pronto como se complete, pasarán 2 segundos y luego se lanzará una nueva tarea. Puede leer los siguientes materiales sobre este tema: No puedes arruinar Java con un hilo: Parte V - Ejecutor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

TrabajoRobarPiscina

Además de los grupos de subprocesos mencionados anteriormente, hay uno más. Se podría decir que es un poco especial. Su nombre es Work Stealing Pool. En resumen, Work Stealing es un algoritmo de trabajo en el que los subprocesos inactivos comienzan a tomar tareas de otros subprocesos o tareas de la cola general. Veamos un ejemplo:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Si ejecutamos este código, ExecutorServicecreará 5 subprocesos, porque Cada hilo se unirá a la cola de espera en la ubicación del objeto lock. Ya hemos hablado sobre los monitores y sus bloqueos en " No se puede arruinar Java con un hilo: Parte II - sincronización ". Y ahora lo reemplazaremos Executors.newCachedThreadPoolcon Executors.newWorkStealingPool(). ¿Qué cambiará? Veremos que nuestras tareas se realizan no en 5 hilos, sino en menos. ¿ Recuerdas que cachedThreadPoolcreaste tu propio hilo para cada tarea? Porque waitbloqueó el hilo, pero las siguientes tareas querían ejecutarse y se crearon nuevos hilos en el grupo para ellas. En el caso de StealingPoollos subprocesos, no permanecerán inactivos para siempre wait; comenzarán a ejecutar tareas vecinas. ¿ En qué se diferencia esto de otros grupos de subprocesos WorkStealingPool? Porque realmente hay algo mágico viviendo dentro de él ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
En realidad, hay una diferencia más. Los subprocesos que se crean de ForkJoinPoolforma predeterminada son subprocesos de demonio, a diferencia de los subprocesos creados mediante ThreadPool. En general, vale la pena recordar los hilos de demonio, porque... por ejemplo, CompletableFuturetambién se utilizan subprocesos de demonio, si no especifica el suyo propio ThreadFactory, lo que creará subprocesos que no son de demonio. ¡Este es el tipo de sorpresas que te pueden esperar en un lugar inesperado!)

Bifurcar/Unirse al grupo

En esta parte hablaremos sobre el mismo ForkJoinPool(también llamado marco de bifurcación/join) que se encuentra "debajo del capó" de WorkStealingPool. En general, Fork Join Framework apareció en Java 1.7. E incluso si Java 11 ya está disponible, vale la pena recordarlo. No es la tarea más común, pero sí bastante interesante. Hay una buena reseña sobre este tema en Internet: “ Fork/Join Framework en Java 7 ”. Fork/JoinPoolopera en su trabajo con un concepto como java.util.concurrent.RecursiveTask. También hay un análogo - java.util.concurrent.RecursiveAction. Las acciones recursivas no devuelven ningún resultado. Por tanto RecursiveTasksimilar a Callabley RecursiveActionsimilar a Runnable. Bueno, al observar el nombre, vemos dos métodos clave: forky join. El método forkejecuta una tarea de forma asincrónica en un hilo separado. Y el método joinle permite esperar a que se complete el trabajo. Hay varias formas de usarlo: No puedes arruinar Java con un hilo: Parte V - Ejecutor, ThreadPool, Fork Join - 5Esta imagen es parte de una diapositiva del informe de Alexey Shipilev " Fork/Join: implementación, uso, rendimiento ". Para que quede más claro, vale la pena ver su informe en JEE CONF: " Características de implementación de Fork Join ".

resumiendo

Así que aquí estamos, terminando la siguiente parte de la revisión. Descubrimos lo que se nos ocurrió por primera vez Executorpara ejecutar subprocesos. Luego decidimos continuar con la idea y se nos ocurrió ExecutorService. ExecutorServicele permite enviar tareas para su ejecución usando submity invoke, así como administrar el servicio apagándolo. Porque ExecutorService'Necesitamos implementaciones, escribimos una clase con métodos de fábrica y la llamamos Executors. Le permite crear grupos de subprocesos ThreadPoolExecutor. Al mismo tiempo, existen grupos de subprocesos que también le permiten especificar un cronograma de ejecución, pero está WorkStealingPooloculto detrás ForkJoinPool. Espero que lo escrito anteriormente no solo haya sido interesante para usted, sino también comprensible). Siempre estaré feliz de recibir sugerencias y comentarios. #viacheslav
Comentarios
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION