Для чего нужен интерфейс Executor

До Java 5 для управления потоками нужно было писать весь код самому в приложении. Кроме этого, создание объекта new Thread — это ресурсоемкая операция, и создавать для “легковесных” задач каждый раз новый поток было нерационально. А так как с этой проблемой сталкивались абсолютно все разработчики многопоточных приложений, этот функционал решили вынести как фреймворк Executor в Java.

Какая главная идея? Все просто: вместо того, чтоб создавать новый поток под каждую новую задачу, потоки хранятся в неком “хранилище”, и когда поступает новая задача, берется уже существующий поток, а не создается новый.

Основные интерфейсы этого фреймворка — это Executor, ExecutorService и ScheduledExecutorService, каждый из которых расширяет функциональность предыдущего.

Интерфейс Executor — базовый интерфейс, который объявляет один метод void execute(Runnable command) — запуск задачи, которая описана в объекте типа Runnable.

Интерфейс ExecutorService уже более интересный. Он содержит методы для управления завершением работы, а также методы, которые могут возвращать какой-то результат. Посмотрим подробнее на его методы:

Метод Описание
void shutdown(); Вызов метода инициирует остановку ExecutorService. Все задачи, которые уже были отправлены на обработку, будут завершены, новые задачи приниматься не будут.
List<Runnable> shutdownNow();

Вызов метода инициирует остановку ExecutorService. Все задачи, которые уже были отправлены на обработку, получат команду Thread.interrupt. Задачи, находящиеся в очереди, возвращаются в виде списка как результат вызова метода.

Метод не ожидает завершения всех задач, которые находятся “в работе” на момент вызова метода.

Внимание: вызов метода может вызывать утечку ресурсов.

boolean isShutdown(); Проверяет, остановлен ли ExecutorService.
boolean isTerminated(); Возвращает true, если все задачи были завершены после остановки ExecutorService. Пока не будет вызван метод shutdown() или shutdownNow(), всегда будет возвращен false.
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

Блокирует поток, из которого запущен, после вызова метода shutdown(), пока не случится одно из условий:

  • закончится выполнение всех запланированных задач;
  • пройдет временной интервал, указанный в качестве параметров метода;
  • текущий поток прекратит свою работу.

Возвращает true, если все задачи завершились, и false, если раньше наступил таймаут.

<T> Future<T> submit(Callable<T> task);

Добавляет ExecutorService вызываемую (Callable) задачу и возвращает объект, имплементирующий интерфейс Future.

<T> — тип результата переданной задачи.

<T> Future<T> submit(Runnable task, T result);

Добавляет ExecutorService выполняемую (Runnable) задачу и возвращает объект, имплементирующий интерфейс Future.

Параметр T result — то, что вернет вызов метода get() у полученного объект Future.

Future<?> submit(Runnable task);

Добавляет ExecutorService выполняемую (Runnable) задачу и возвращает объект, имплементирующий интерфейс Future.

Если вызвать метод get() у полученного объекта Future, то получим null.

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

Передает ExecutorService список вызываемых (Callable) задач. Возвращает список Future, из которых можно получить результат работы. Этот список возвращается после завершения выполнения всех переданных задач.

Если во время работы метода модифицировать коллекцию tasks, то результат метода неопределен.

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

Передает ExecutorService список вызываемых (Callable) задач. Возвращает список Future, из которых можно получить результат работы. Этот список возвращается после завершения выполнения всех переданных задач или после того, как пройдет время, переданное в параметрах метода (что наступит раньше).

Если таймаут наступит раньше, невыполненные задачи отменяют свое выполнение.

Примечание: отмененная задача может не остановить свою работу, а продолжить выполняться (увидим этот сайд-эффект в примере).

Если во время работы метода модифицировать коллекцию tasks, то результат метода неопределен.

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

Передает ExecutorService список вызываемых (Callable) задач. Возвращает результат работы одной из задач, которая завершила выполнение без выбрасывания исключения (если такая нашлась).

Если во время работы метода модифицировать коллекцию tasks, то результат метода неопределен.

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Передает ExecutorService список вызываемых (Callable) задач. Возвращает результат работы одной из задач, которая завершила выполнение без выбрасывания исключения (если такая нашлась) до истечения времени, заданного в параметрах метода.

Если во время работы метода модифицировать коллекцию tasks, то результат метода неопределен.

Давай рассмотрим небольшой пример работы с ExecutorService.


import java.util.List;
import java.util.concurrent.*;

public class ExecutorServiceTest {
   public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
//Создаем ExecutorService на 2 потока
       java.util.concurrent.ExecutorService executorService = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
//Создаем 5 задач
       MyRunnable task1 = new MyRunnable();
       MyRunnable task2 = new MyRunnable();
       MyRunnable task3 = new MyRunnable();
       MyRunnable task4 = new MyRunnable();
       MyRunnable task5 = new MyRunnable();

       final List<MyRunnable> tasks = List.of(task1, task2, task3, task4, task5);
//Отправляем на обработку список, который содержит 5 ранее созданных задач
       final List<Future<Void>> futures = executorService.invokeAll(tasks, 6, TimeUnit.SECONDS);
       System.out.println("got futures");

//Останавливаем ExecutorService
       executorService.shutdown();

       try {
           TimeUnit.SECONDS.sleep(3);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println(executorService.isShutdown());
       System.out.println(executorService.isTerminated());
   }

   public static class MyRunnable implements Callable<Void> {

       @Override
       public Void call() {
// Добавляем 2 задержки времени. При остановке ExecutorService увидим какая из них отрабатывает при попытке остановить выполнение задачи
           try {
               TimeUnit.SECONDS.sleep(3);
           } catch (InterruptedException e) {
               System.out.println("sleep 1: " + e.getMessage());
           }
           try {
               TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
               System.out.println("sleep 2: " + e.getMessage());
           }
           System.out.println("done");
           return null;
       }
   }
}

Вывод программы:

done
done
got futures
sleep 1: sleep interrupted
sleep 1: sleep interrupted
done
done
true
true

Каждая задача выполняется по 5 секунд. Так как мы создали пул на два потока, первые две строки в выводе логичны и понятны.

Через 6 секунд после начала работы программы срабатывает таймаут метода invokeAll и возвращается результат в виде списка Future. Это видно по строке вывода “got futures”.

После завершения работы первых двух задач начали работать еще две. Но так как время, заданное в методе invokeAll вышло, эти две задачи не успели завершиться, и им была отправлена команда "cancel". Именно поэтому в выводе видно две строки “sleep 1: sleep interrupted”.

А дальше можно наблюдать еще две надписи “done”. Это и есть демонстрация сайд-эффекта, о котором я писал при описании метода invokeAll.

Последняя, пятая задача даже не запускалась, поэтому мы в выводе программы ничего о ней не видим.

Последние две строчки — это вывод результата вызова методов isShutdown и isTerminated.

Еще в этом примере интересно в дебаге посмотреть состояние задач после завершения таймаута (брейкпоинт на строке “executorService.shutdown();”):

Видим, что две задачи завершились: “Completed normally”, и три задачи “Cancelled”.

ScheduledExecutorService

Для завершения рассказа об экзекюторах, рассмотрим еще ScheduledExecutorService.

У него есть 4 метода:

Метод Описание
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); Планирует однократное выполнение переданной выполняемой (Runnable) задачи через время, заданное в качестве параметра.
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); Планирует однократное выполнение переданной вызываемой (Callable) задачи через время, заданное в качестве параметра.
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); Планирует периодическое выполнение переданной задачи, которая первый раз выполнится через время initialDelay, и каждый следующий запуск будет начинаться через period.
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); Планирует периодическое выполнение переданной задачи, которая первый раз выполнится через время initialDelay, и каждый следующий запуск будет начинаться через delay (между завершением предыдущего выполнения и стартом текущего).