Для чего может понадобиться ExecutorService на 1 поток?

С помощью метода Executors.newSingleThreadExecutor можно создать ExecutorService с пулом, включающим в себя единственный поток. Логика работы такого пула следующая:

  • Сервис выполняет за раз только одну задачу.
  • Если мы отправляем N задач на исполнение, все N задач одна за другой будет выполняться одним потоком.
  • Если поток будет прерван, то создастся новый поток для выполнения остальных задач.

Давайте представим ситуацию, в которой наша программа требует следующий функционал:

Нам необходимо в течении 30 секунд обрабатывать запросы от пользователей, но не более одного запроса в единицу времени.

Создаем класс задачи обработки запроса от пользователей:


class Task implements Runnable {
   private final int taskNumber;

   public Task(int taskNumber) {
       this.taskNumber = taskNumber;
   }

   @Override
   public void run() {
       try {
           Thread.sleep(1000);
       } catch (InterruptedException ignored) {
       }
       System.out.printf("Обработан запрос #%d в потоке id=%d\n", taskNumber, Thread.currentThread().getId());
   }
}
    

Класс моделирует поведение обработки пришедшего запроса и выводит его номер.

Далее в методе main мы создаем ExecutorService на 1 поток, в котором последовательно обрабатываем пришедшие запросы. Так как в условии было указано “в течение 30 секунд”, то добавляем ожидание завершения 30 секунд, после чего принудительно останавливаем ExecutorService.


public static void main(String[] args) throws InterruptedException {
   ExecutorService executorService = Executors.newSingleThreadExecutor();

   for (int i = 0; i < 1_000; i++) {
       executorService.execute(new Task(i));
   }
   executorService.awaitTermination(30, TimeUnit.SECONDS);
   executorService.shutdownNow();
}
    

При запуске мы видим в консоли выводимые сообщения об обработке:

Обработан запрос #0 в потоке id=16
Обработан запрос #1 в потоке id=16
Обработан запрос #2 в потоке id=16
….
Обработан запрос #29 в потоке id=16

После обработки запросов в течении 30 секунд у executorService будет вызван метод shutdownNow(), который останавливает текущую задачу (которая выполняется) и отменяет все ожидающие выполнения задачи. После этого программа успешно завершает свое выполнение.

Но не все всегда так идеально, ведь в работе программы запросто может возникнуть ситуация, где одна из задач, приходящая в наш единственный поток в пуле, некорректно отработает и даже завершит наш поток. Мы можем смоделировать такую ситуацию для того, чтоб разобраться, как же отработает executorService с единственным потоком в таком случае.

Для этого на этапе выполнения одной из задач завершаем наш поток с помощью небезопасного и устаревшего метода Thread.currentThread().stop(). Мы это делаем специально, чтоб смоделировать ситуацию завершения работы потока на одной из задач.

Меняем наш метод run в классе Task:


@Override
public void run() {
   try {
       Thread.sleep(1000);
   } catch (InterruptedException ignored) {
   }

   if (taskNumber == 5) {
       Thread.currentThread().stop();
   }

   System.out.printf("Обработан запрос #%d в потоке id=%d\n", taskNumber, Thread.currentThread().getId());
}
    

Прерывать мы будем на задаче #5.

Посмотрим, как выглядит вывод с прерыванием потока в конце выполнения задачи #5:

Обработан запрос #0 в потоке id=16
Обработан запрос #1 в потоке id=16
Обработан запрос #2 в потоке id=16
Обработан запрос #3 в потоке id=16
Обработан запрос #4 в потоке id=16
Обработан запрос #6 в потоке id=17
Обработан запрос #7 в потоке id=17

Обработан запрос #29 в потоке id=17

Мы видим, что после прерывания потока в конце 5 задачи задачи начинают выполняться в потоке с id = 17, хотя до этой задачи выполнялись в потоке с id = 16. И поскольку в нашем пуле всего один поток, это может означать лишь одно — executorService заменил остановленный поток новым и задачи продолжили выполняться.

Таким образом использование newSingleThreadExecutor для работы с пулом с единственным обрабатывающим задачи потоком необходим для последовательных задач, которые подразумевают одновременное выполнение только одной задачи, но при этом требуют продолжения обработки задач из очереди, несмотря на исход завершения задачи (случай, где в одной из задач поток может быть убит).

ThreadFactory

Говоря о создании и пересоздании потоков, мы не можем не упомянуть о ThreadFactory.

ThreadFactory — это объект, который создает новые потоки по требованию.

Мы можем создать свою фабрику создания потоков и передать ее экземпляр в метод Executors.newSingleThreadExecutor(ThreadFactory threadFactory).


ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MyThread");
            }
        });
                    
Переопределили метод создания нового потока, передав в конструктор имя.

ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "MyThread");
                thread.setPriority(Thread.MAX_PRIORITY);
                return thread;
            }
        });
                    
Поменяли имя и приоритет для создаваемого потока.

Таким образом выяснили, что есть 2 перегруженных метода Executors.newSingleThreadExecutor. Один — без параметров, второй — с параметром типа ThreadFactory.

С помощью фабрики создания потоков ThreadFactory можно делать разные настройки создаваемым потокам, например, – выставить приоритеты, использовать подклассы потоков, добавить поток UncaughtExceptionHandler и так далее.