Introducción
Entonces, sabemos que hay subprocesos en Java, sobre los cuales puede leer en la revisión " No se puede estropear Java con un subproceso: Parte I - Subprocesos ". Miremos el código de muestra nuevamente:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Como vemos, el código para ejecutar la tarea es bastante estándar, pero para cada nuevo lanzamiento tendremos que repetirlo. Una solución es moverlo a un método separado, por ejemplo execute(Runnable runnable)
. Pero los desarrolladores de Java ya se preocuparon por nosotros y crearon una interfaz Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Como puede ver, el código se ha vuelto más conciso y nos permite simplemente escribir código para ejecutarlo Runnable
en un hilo. Genial, ¿no? Pero esto es solo el principio:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
tiene una interfaz descendiente ExecutorService
. El JavaDoc de esta interfaz dice que ExecutorService
es una descripción de una Executor
'a' especial que proporciona métodos para detener el trabajo Executor
'a' y le permite realizar un java.util.concurrent.Future
seguimiento del progreso de la ejecución. Anteriormente, en " No se puede estropear Java con subprocesos: Parte IV: invocables, futuros y amigos ", revisamos brevemente las posibilidades Future
. Si lo olvidaste o no lo has leído, te aconsejo que refresques tu memoria ;) ¿Qué otras cosas interesantes están escritas en JavaDoc? Que tenemos una fábrica especial java.util.concurrent.Executors
que nos permite crear implementaciones que están disponibles por defecto ExecutorService
.
Servicio Ejecutor
Recordemos de nuevo. TenemosExecutor
que ejecutar (es decir, ejecutar) una determinada tarea en un hilo, cuando la implementación de la creación de un hilo está oculta para nosotros. Tenemos ExecutorService
uno especial Executor
que tiene un conjunto de capacidades para gestionar el progreso de la ejecución. Y tenemos una fábrica Executors
que te permite crear ExecutorService
. Hagámoslo nosotros mismos ahora:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Como podemos ver, hemos especificado un grupo de subprocesos fijo ( Fixed Thread Pool
) de tamaño 2. Después de lo cual enviamos tareas al grupo una por una. Cada tarea devuelve una cadena ( String
) que contiene el nombre del hilo ( currentThread().getName()
). Es importante cerrar al final ExecutorService
, porque de lo contrario nuestro programa no saldrá. Executors
Hay otros métodos de fábrica en la fábrica . Por ejemplo, podemos crear un grupo de un solo subproceso, newSingleThreadExecutor
o un grupo con almacenamiento en caché newCachedThreadPool
, donde los subprocesos se eliminarán del grupo si están inactivos durante 1 minuto. De hecho, detrás de estos ExecutorService
hay una cola de bloqueo en la que se colocan las tareas y desde la que se ejecutan. Puede ver más información sobre el bloqueo de colas en el vídeo " Cola de bloqueo - Colecciones n.° 5 - Java avanzado ". También puede leer la reseña " Bloqueo de colas del paquete concurrente " y la respuesta a la pregunta "¿ Cuándo preferir LinkedBlockingQueue a ArrayBlockingQueue? " Súper simplificado: BlockingQueue
(cola de bloqueo) bloquea un hilo, en dos casos:
- un hilo está intentando obtener elementos de una cola vacía
- el hilo está intentando poner elementos en una cola llena
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
o
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Como podemos ver, las implementaciones se crean dentro de los métodos de fábrica ExecutorService
. Y eso es básicamente todo ThreadPoolExecutor
. Sólo cambian los atributos que afectan el trabajo.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolEjecutor
Como vimos anteriormente, dentro de los métodos de fábricaThreadPoolExecutor
. La funcionalidad se ve afectada por los valores que se pasan como subprocesos máximo y mínimo, así como por la cola que se utiliza. Y se puede utilizar cualquier implementación de la interfaz java.util.concurrent.BlockingQueue
. Hablando de ThreadPoolExecutor
'ahs, vale la pena señalar características interesantes durante la operación. Por ejemplo, no puedes enviar tareas a ThreadPoolExecutor
si no hay espacio allí:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Este código fallará con un error como:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Es decir, task
no puedes presentarte, porque SynchronousQueue
está diseñado de tal manera que en realidad consta de un elemento y no permite colocar más allí. Como podemos ver, queued tasks
aquí hay 0, y no hay nada extraño en esto, porque esto es específico SynchronousQueue
; de hecho, es una cola de 1 elemento, que siempre está vacía. (!) Cuando un hilo pone un elemento en la cola, esperará hasta que otro hilo tome el elemento de la cola. Por tanto, podemos sustituir por new LinkedBlockingQueue<>(1)
y cambiará lo que se indicará en el error queued tasks = 1
. Porque la cola tiene solo 1 elemento, entonces no podemos agregar el segundo. Y en esto caeremos. Continuando con el tema de la cola, vale la pena señalar que la clase ThreadPoolExecutor
tiene métodos adicionales para dar servicio a la cola. Por ejemplo, el método threadPoolExecutor.purge()
eliminará todas las tareas canceladas de la cola para liberar espacio en la cola. Otra característica interesante relacionada con la cola es el controlador de tareas no aceptadas:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Por ejemplo, el controlador simplemente imprime una palabra Rejected
por cada negativa a aceptar una tarea en la cola. Conveniente, ¿no? Además, ThreadPoolExecutor
tiene un heredero interesante ScheduledThreadPoolExecutor
: ScheduledExecutorService
. Proporciona la posibilidad de realizar una tarea con un temporizador.
Servicio de ejecución programado
ExecutorService
El tipo ScheduledExecutorService
le permite ejecutar tareas según un cronograma. Veamos un ejemplo:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Aquí todo es sencillo. Se envían tareas, recibimos una “tarea programada” java.util.concurrent.ScheduledFuture
. El siguiente caso también puede resultar útil con el cronograma:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aquí enviamos Runnable
la tarea a ejecutar a una tarifa fija (Tasa Fija) con un cierto retraso. En este caso, después de 1 segundo cada 2 segundos, comience a ejecutar la tarea. Hay una opción similar:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Pero aquí las tareas se ejecutan con un intervalo determinado ENTRE la ejecución de diferentes tareas. Es decir, la tarea task
se completará en 1 segundo. A continuación, tan pronto como se complete, pasarán 2 segundos y luego se lanzará una nueva tarea. Puede leer los siguientes materiales sobre este tema:
- Una introducción a los grupos de subprocesos
- Introducción a los grupos de subprocesos
- Carrera de obstáculos multiproceso de Java: cancelación de tareas en ejecutores
- Elegir ejecutores de Java correctos para tareas en segundo plano
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
TrabajoRobarPiscina
Además de los grupos de subprocesos mencionados anteriormente, hay uno más. Se podría decir que es un poco especial. Su nombre es Work Stealing Pool. En resumen, Work Stealing es un algoritmo de trabajo en el que los subprocesos inactivos comienzan a tomar tareas de otros subprocesos o tareas de la cola general. Veamos un ejemplo:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Si ejecutamos este código, ExecutorService
creará 5 subprocesos, porque Cada hilo se unirá a la cola de espera en la ubicación del objeto lock
. Ya hemos hablado sobre los monitores y sus bloqueos en " No se puede arruinar Java con un hilo: Parte II - sincronización ". Y ahora lo reemplazaremos Executors.newCachedThreadPool
con Executors.newWorkStealingPool()
. ¿Qué cambiará? Veremos que nuestras tareas se realizan no en 5 hilos, sino en menos. ¿ Recuerdas que cachedThreadPool
creaste tu propio hilo para cada tarea? Porque wait
bloqueó el hilo, pero las siguientes tareas querían ejecutarse y se crearon nuevos hilos en el grupo para ellas. En el caso de StealingPool
los subprocesos, no permanecerán inactivos para siempre wait
; comenzarán a ejecutar tareas vecinas. ¿ En qué se diferencia esto de otros grupos de subprocesos WorkStealingPool
? Porque realmente hay algo mágico viviendo dentro de él ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
En realidad, hay una diferencia más. Los subprocesos que se crean de ForkJoinPool
forma predeterminada son subprocesos de demonio, a diferencia de los subprocesos creados mediante ThreadPool
. En general, vale la pena recordar los hilos de demonio, porque... por ejemplo, CompletableFuture
también se utilizan subprocesos de demonio, si no especifica el suyo propio ThreadFactory
, lo que creará subprocesos que no son de demonio. ¡Este es el tipo de sorpresas que te pueden esperar en un lugar inesperado!)
Bifurcar/Unirse al grupo
En esta parte hablaremos sobre el mismoForkJoinPool
(también llamado marco de bifurcación/join) que se encuentra "debajo del capó" de WorkStealingPool
. En general, Fork Join Framework apareció en Java 1.7. E incluso si Java 11 ya está disponible, vale la pena recordarlo. No es la tarea más común, pero sí bastante interesante. Hay una buena reseña sobre este tema en Internet: “ Fork/Join Framework en Java 7 ”. Fork/JoinPool
opera en su trabajo con un concepto como java.util.concurrent.RecursiveTask
. También hay un análogo - java.util.concurrent.RecursiveAction
. Las acciones recursivas no devuelven ningún resultado. Por tanto RecursiveTask
similar a Callable
y RecursiveAction
similar a Runnable
. Bueno, al observar el nombre, vemos dos métodos clave: fork
y join
. El método fork
ejecuta una tarea de forma asincrónica en un hilo separado. Y el método join
le permite esperar a que se complete el trabajo. Hay varias formas de usarlo: Esta imagen es parte de una diapositiva del informe de Alexey Shipilev " Fork/Join: implementación, uso, rendimiento ". Para que quede más claro, vale la pena ver su informe en JEE CONF: " Características de implementación de Fork Join ".
resumiendo
Así que aquí estamos, terminando la siguiente parte de la revisión. Descubrimos lo que se nos ocurrió por primera vezExecutor
para ejecutar subprocesos. Luego decidimos continuar con la idea y se nos ocurrió ExecutorService
. ExecutorService
le permite enviar tareas para su ejecución usando submit
y invoke
, así como administrar el servicio apagándolo. Porque ExecutorService
'Necesitamos implementaciones, escribimos una clase con métodos de fábrica y la llamamos Executors
. Le permite crear grupos de subprocesos ThreadPoolExecutor
. Al mismo tiempo, existen grupos de subprocesos que también le permiten especificar un cronograma de ejecución, pero está WorkStealingPool
oculto detrás ForkJoinPool
. Espero que lo escrito anteriormente no solo haya sido interesante para usted, sino también comprensible). Siempre estaré feliz de recibir sugerencias y comentarios. #viacheslav
GO TO FULL VERSION