Для чого може знадобитися ExecutorService на 1 потік?

За допомогою методу Executors.newSingleThreadExecutor можна створити ExecutorService з пулом, що містить єдиний потік. Логіка роботи такого пулу наступна:

  • Сервіс виконує за раз лише одне завдання.
  • Якщо ми відправляємо N завдань на виконання, всі N завдання одне за одним будуть виконуватись одним потоком.
  • Якщо потік перерветься, то створиться новий потік для виконання інших завдань.

Давай уявимо ситуацію, в якій наша програма вимагає такого функціоналу:

Нам необхідно протягом 30 секунд обробляти запити від користувачів, але не більше одного запиту в одиницю часу.

Створюємо клас задачі обробки запиту від користувачів:


class Task implements Runnable {
   private final int taskNumber;

   public Task(int taskNumber) {
       this.taskNumber = taskNumber;
   }

   @Override
   public void run() {
       try {
           Thread.sleep(1000);
       } catch (InterruptedException ignored) {
       }
       System.out.printf("Оброблено запит #%d у потоці id=%d\n", taskNumber, Thread.currentThread().getId());
   }
}
    

Клас моделює поведінку обробки запиту, що прийшов, і виводить його номер.

Далі в методі main ми створюємо ExecutorService на 1 потік, в якому послідовно обробляємо запити, що прийшли. Оскільки в умові було зазначено "протягом 30 секунд", то додаємо очікування завершення 30 секунд, після чого примусово зупиняємо ExecutorService.


public static void main(String[] args) throws InterruptedException {
   ExecutorService executorService = Executors.newSingleThreadExecutor();

   for (int i = 0; i < 1_000; i++) {
       executorService.execute(new Task(i));
   }
   executorService.awaitTermination(30, TimeUnit.SECONDS);
   executorService.shutdownNow();
}
    

Під час запуску ми бачимо в консолі повідомлення про обробку:

Оброблено запит #0 в потоці id=16
Оброблено запит #1 у потоці id=16
Оброблено запит #2 у потоці id=16
….
Оброблено запит #29 у потоці id=16

Після обробки запитів протягом 30 секунд у executorService викличеться метод shutdownNow(), який зупиняє поточне завдання, що зараз виконується, та скасовує всі завдання, що очікують на виконання. Після цього програма успішно завершує своє виконання.

Але не все завжди так ідеально, адже в роботі програми може виникнути ситуація, де одне із завдань, що надходить до нашого єдиного потоку в пулі, некоректно відпрацює і навіть завершить наш потік. Ми можемо змоделювати таку ситуацію, щоб розібратися, як відпрацює executorService з єдиним потоком у такому випадку.

Для цього на етапі виконання одного із завдань завершуємо наш потік за допомогою небезпечного та застарілого методу Thread.currentThread().stop(). Ми це робимо спеціально, щоб змоделювати ситуацію завершення роботи потоку на одному із завдань.

Змінюємо наш метод run у класі Task:


@Override
public void run() {
   try {
       Thread.sleep(1000);
   } catch (InterruptedException ignored) {
   }

   if (taskNumber == 5) {
       Thread.currentThread().stop();
   }

   System.out.printf("Оброблено запит #%d у потоці id=%d\n", taskNumber, Thread.currentThread().getId());
}
    

Переривати ми будемо на завданні #5.

Подивимося, який вигляд має виведення з перериванням потоку в кінці виконання завдання #5:

Оброблено запит #0 в потоці id=16
Оброблено запит #1 у потоці id=16
Оброблено запит #2 у потоці id=16
Оброблено запит #3 у потоці id=16
Оброблено запит #4 у потоці id=16
Оброблено запит #6 у потоці id=17
Оброблено запит #7 у потоці id=17

Оброблено запит #29 у потоці id=17

Ми бачимо, що після переривання потоку в кінці 5 завдання, завдання виконуються в потоці з id = 17, хоча до цього вони виконувалися в потоці з id = 16. Оскільки в нашому пулі лише один потік, це може означати лише те, що executorService замінив зупинений потік новим і завдання надалі виконуються.

Таким чином використання newSingleThreadExecutor для роботи з пулом з єдиним потоком, що обробляє завдання, необхідне для послідовних завдань, в яких передбачено одночасне виконання лише одного завдання. Але водночас вимагається продовження обробки завдань з черги, незалежно від результату завершення завдання (випадок, де в одному із завдань потік може бути вбито).

ThreadFactory

Коли мова заходить про створення й перестворення потоків, варто згадати про ThreadFactory.

ThreadFactory — це об'єкт, який за вимогою створює нові потоки.

Ми можемо створити свою фабрику створення потоків і передати її екземпляр до методу Executors.newSingleThreadExecutor(ThreadFactory threadFactory).


ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MyThread");
            }
        });
                    
Перевизначили метод створення нового потоку, передавши до конструктора ім'я.

ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "MyThread");
                thread.setPriority(Thread.MAX_PRIORITY);
                return thread;
            }
        });
                    
Змінили ім'я та пріоритет для потоку, що створюється.

Таким чином ми з'ясували, що є 2 перевантажених методи Executors.newSingleThreadExecutor. Один — без параметрів, другий — з параметром типу ThreadFactory.

За допомогою фабрики створення потоків ThreadFactory можна робити різні налаштування для потоків, що створюються. Наприклад, встановити пріоритети, використовувати підкласи потоків, додати потік UncaughtExceptionHandler тощо.