Для чого потрібен інтерфейс Executor

До Java 5 для керування потоками потрібно було писати весь код самостійно у застосунку. Окрім цього, створення об'єкта new Thread — це ресурсомістка операція, і створювати щоразу для “легковагих” завдань новий потік нераціонально. А оскільки з цією проблемою стикалися абсолютно всі розробники багатопотокових застосунків, цей функціонал вирішили винести до фреймворку Executor у Java.

У чому полягає основна ідея? Все просто: замість того, щоб створювати новий потік під кожне нове завдання, потоки зберігаються у певному “сховищі”, і коли настає час для нового завдання, замість створення нового береться потік, який вже існує.

Основні інтерфейси цього фреймворка — це Executor, ExecutorService та ScheduledExecutorService, і кожний з них розширює функціональність попереднього.

Інтерфейс Executor — базовий інтерфейс, який оголошує один метод void execute(Runnable command) — запуск завдання, яке описане в об'єкті типу Runnable.

Інтерфейс ExecutorService вже цікавіший. Він містить методи для керуванням завершенням роботи, а також методи, що можуть повертати якийсь результат. Давай детальніше розглянемо його методи:

Метод Опис
void shutdown(); Виклик методу ініціює зупинку ExecutorService. Усі завдання, що вже надіслано на обробку, будуть завершені, а нові завдання не будуть прийматися.
List<Runnable> shutdownNow();

Виклик методу ініціює зупинку ExecutorService. Усі завдання, що вже надіслано на обробку, отримають команду Thread.interrupt. Завдання в черзі повертаються списком як результат виклику методу.

Метод не очікує на завершення всіх завдань, що є “в роботі” на момент виклику методу.

Увага: виклик методу може спричинити витік ресурсів.

boolean isShutdown(); Перевіряє, чи зупинено ExecutorService.
boolean isTerminated(); Повертає true, якщо всі завдання завершено після зупинки ExecutorService. Поки не викличеться метод shutdown() або shutdownNow(), завжди буде повертатися false.
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

Блокує потік, з якого його запущено, після виклику методу shutdown(), поки не виконається одна з умов:

  • завершиться виконання усіх запланованих завдань;
  • мине часовий інтервал, вказаний за параметри методу;
  • нинішній потік припинить свою роботу.

Повертає true, якщо всі завдання завершено, і false, якщо таймаут настав раніше.

<T> Future<T> submit(Callable<T> task);

Додає ExecutorService завдання, що викликається (Callable),і повертає об'єкт, що імплементує інтерфейс Future.

<T> — тип результату завдання, що передалося.

<T> Future<T> submit(Runnable task, T result);

Додає ExecutorService виконуване (Runnable) завдання й повертає об'єкт, що імплементує інтерфейс Future.

Параметр T result — те, що поверне виклик методу get() в отриманого об'єкта Future.

Future<?> submit(Runnable task);

Додає ExecutorService виконуване (Runnable) завдання й повертає об'єкт, що імплементує інтерфейс Future.

Якщо викликати метод get() в отриманого об'єкта Future, отримаємо null.

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

Передає ExecutorService список викликаних (Callable) завдань. Повертає список Future, з яких можна отримати результат роботи. Цей список повертається після завершення виконання всіх завдань, що передалися.

Якщо під час роботи методу модифікувати колекцію tasks, результат методу не визначений.

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

Передає ExecutorService список викликаних (Callable) завдань. Повертає список Future, з яких можна отримати результат роботи. Цей список повертається після завершення виконання всіх завдань, що передалися або після того, як мине час, що передали в параметрах методу (що прийде раніше).

Якщо таймаут прийде раніше, невиконані завдання скасовують своє виконання.

Примітка: скасоване завдання може й не зупинити свою роботу, а виконуватися далі (побачимо цей побічний ефект у прикладі).

Якщо під час роботи методу модифікувати колекцію tasks, то результат методу не визначений.

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

Передає ExecutorService список викликаних (Callable) завдань. Повертає результат роботи одного з завдань, яке виконалося без викидання винятку (якщо таке знайшлося).

Якщо під час роботи методу модифікувати колекцію tasks, то результат методу не визначений.

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Передає ExecutorService список викликаних (Callable) завдань. Повертає результат роботи одного з завдань, яке виконалося без викидання винятку (якщо таке знайшлося) до закінчення часу, що встановлений у параметрах методу.

Якщо під час роботи методу модифікувати колекцію tasks, то результат методу не визначений.

Давай розглянемо приклад роботи з ExecutorService.

import java.util.List;
import java.util.concurrent.*;

public class ExecutorServiceTest {
   public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
//Створюємо ExecutorService на 2 потоки
       java.util.concurrent.ExecutorService executorService = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
//Створюємо 5 завдань
       MyRunnable task1 = new MyRunnable();
       MyRunnable task2 = new MyRunnable();
       MyRunnable task3 = new MyRunnable();
       MyRunnable task4 = new MyRunnable();
       MyRunnable task5 = new MyRunnable();

       final List<MyRunnable> tasks = List.of(task1, task2, task3, task4, task5);
//Надсилаємо на обробку список, що містить 5 завдань, які створено раніше
       final List<Future<Void>> futures = executorService.invokeAll(tasks, 6, TimeUnit.SECONDS);
       System.out.println("got futures");

//Зупиняємо ExecutorService
       executorService.shutdown();

       try {
           TimeUnit.SECONDS.sleep(3);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println(executorService.isShutdown());
       System.out.println(executorService.isTerminated());
   }

   public static class MyRunnable implements Callable<Void> {

       @Override
       public Void call() {
// Додаємо 2 затримки часу. При зупинці ExecutorService побачимо, яка з них спрацює при спробі зупинити виконання завдання
           try {
               TimeUnit.SECONDS.sleep(3);
           } catch (InterruptedException e) {
               System.out.println("sleep 1: " + e.getMessage());
           }
           try {
               TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
               System.out.println("sleep 2: " + e.getMessage());
           }
           System.out.println("done");
           return null;
       }
   }
}

Виведення програми:

done
done
got futures
sleep 1: sleep interrupted
sleep 1: sleep interrupted
done
done
true
true

Кожне завдання виконується по 5 секунд. Оскільки ми створили пул на два потоки, перші два рядки у виведенні логічні й зрозумілі.

Через 6 секунд після початку роботи програми спрацьовує таймаут методу invokeAll і повертається результат як список Future. Це видно в рядку виведення “got futures”.

Після завершення роботи перших двох завдань почали працювати ще два. Але оскільки час, який задано в методі invokeAll, сплив, ці два завдання не встигли завершитися, і їм надіслано команду "cancel". Саме тому у виведенні видно два рядки “sleep 1: sleep interrupted”.

А далі можна спостерігати ще два надписи “done”. Це і є прикладом того побічного ефекту, про який ми згадували при описі методу invokeAll.

Останнє, п'яте завдання навіть не запускалося, тому у виведенні програми ми нічого про нього не побачимо.

Останні два рядки — це виведення результату виклику методу isShutdown та isTerminated.

Ще в цьому прикладі цікаво у дебазі подивитися на стан завдань після завершення таймауту (брейкпоінт на рядку “executorService.shutdown();”):

Бачимо, що два завдання завершилися: “Completed normally”, а три завдання “Cancelled”.

ScheduledExecutorService

На кінець розповіді про екзек'ютори розглянемо ще ScheduledExecutorService.

У нього є 4 методи:

Метод Опис
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); Планує одноразове виконання переданого виконуваного (Runnable) завдання через час, який задано як параметр.
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); Планує одноразове виконання переданого викликаного (Callable) завдання через час, який задано як параметр.
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); Планує одноразове виконання переданого виконуваного завдання, який перший раз виконається через час initialDelay, і кожний наступний запуск будет починатися через period.
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); Планує періодичне виконання переданого завдання, яке в перший раз виконається через час initialDelay, і кожний наступний запуск буде починатися через delay (між завершенням попереднього виконання та стартом поточного).