Метод newFixedThreadPool у класса Executors создаст нам executorService с фиксированным количеством потоков. По сравнению с методом newSingleThreadExecutor, мы указываем, какое количество потоков мы хотим видеть в пуле. Под капотом вызывается


new ThreadPoolExecutor( nThreads, nThreads,
                                      	0L, TimeUnit.MILLISECONDS,
                                      	new LinkedBlockingQueue());

В качестве параметров corePoolSize (какое количество потоков будет готово (запущено) при старте executor сервиса) и maximumPoolSize (максимальное количество потоков, которое может создать executor сервис) передается одно и то же число — количество потоков передаваемое в newFixedThreadPool(nThreads). Точно так же мы можем передать в параметры и собственную реализацию ThreadFactory.

Итак, давай разберемся, зачем нам нужен такой ExecutorService.

ExecutorService с фиксированным количеством (n) потоков обладает следующей логикой:

  • Максимум n потоков будут активны для обработки задач.
  • Если передано более n задач, они будут удерживаться в очереди до момента, пока потоки не освободятся.
  • Если в работе одного из потоков произойдет сбой и он завершит свою работу, будет создан новый поток на место сломанного.
  • Любой поток из пула активен до тех пор, пока пул не закрыт.

Как пример представь очередь в аэропорту, где все стоят в одной очереди, а непосредственно перед контролем расходятся на работающее количество пунктов. Если в одном из пунктов будет задержка, очередь будет идти только через второй, пока первый не освободится, а если один из пунктов вовсе сломается, то откроют другой пункт на замену этому и контроль продолжится через два пункта.

Сразу стоит отметить, что условия хоть и идеальные, где обещанные n потоков стабильно работают и всегда будет подмена для завершившихся с ошибкой потоков (чего уже добиться в реальной работе аэропорта невозможно из-за ограниченных ресурсов), все равно система имеет несколько неприятных особенностей, так как потоков больше не станет ни при каких условиях, даже в случае, когда очередь будет увеличиваться быстрее, чем потоки будут обрабатывать задачи.

Предлагаю разобраться на практике, как работает ExecutorService с фиксированным количеством потоков. Давай создадим класс, реализующий Runnable. Объекты этого класса и будут нашими задачами для ExecutorService.


public class Task implements Runnable {
    int taskNumber;
 
    public Task(int taskNumber) {
        this.taskNumber = taskNumber;
    }
 
    @Override
    public void run() {
try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Обработан запрос пользователя №" + taskNumber + " на потоке " + Thread.currentThread().getName());
    }
}
    

В методе run() мы блокируем поток на 2 секунды, имитируя нагрузку, и выводим номер текущей задачи и имя потока, на котором выполняем данную задачу.


ExecutorService executorService = Executors.newFixedThreadPool(3);
 
        for (int i = 0; i < 30; i++) {
            executorService.execute(new Task(i));
        }
        
        executorService.shutdown();
    

Для начала в main создадим ExecutorService и отправим 30 задач на выполнение.

Обработан запрос пользователя №1 на потоке pool-1-thread-2
Обработан запрос пользователя №0 на потоке pool-1-thread-1
Обработан запрос пользователя №2 на потоке pool-1-thread-3
Обработан запрос пользователя №5 на потоке pool-1-thread-3
Обработан запрос пользователя №3 на потоке pool-1-thread-2
Обработан запрос пользователя №4 на потоке pool-1-thread-1
Обработан запрос пользователя №8 на потоке pool-1-thread-1
Обработан запрос пользователя №6 на потоке pool-1-thread-3
Обработан запрос пользователя №7 на потоке pool-1-thread-2
Обработан запрос пользователя №10 на потоке pool-1-thread-3
Обработан запрос пользователя №9 на потоке pool-1-thread-1
Обработан запрос пользователя №11 на потоке pool-1-thread-2
Обработан запрос пользователя №12 на потоке pool-1-thread-3
Обработан запрос пользователя №14 на потоке pool-1-thread-2
Обработан запрос пользователя №13 на потоке pool-1-thread-1
Обработан запрос пользователя №15 на потоке pool-1-thread-3
Обработан запрос пользователя №16 на потоке pool-1-thread-2
Обработан запрос пользователя №17 на потоке pool-1-thread-1
Обработан запрос пользователя №18 на потоке pool-1-thread-3
Обработан запрос пользователя №19 на потоке pool-1-thread-2
Обработан запрос пользователя №20 на потоке pool-1-thread-1
Обработан запрос пользователя №21 на потоке pool-1-thread-3
Обработан запрос пользователя №22 на потоке pool-1-thread-2
Обработан запрос пользователя №23 на потоке pool-1-thread-1
Обработан запрос пользователя №25 на потоке pool-1-thread-2
Обработан запрос пользователя №24 на потоке pool-1-thread-3
Обработан запрос пользователя №26 на потоке pool-1-thread-1
Обработан запрос пользователя №27 на потоке pool-1-thread-2
Обработан запрос пользователя №28 на потоке pool-1-thread-3
Обработан запрос пользователя №29 на потоке pool-1-thread-1

В консоли мы видим, как задачи выполняются на разных потоках по мере их освобождения от предыдущей задачи.

Теперь мы увеличим количество задач до 100, а после отправки на выполнение 100 задач вызовем метод awaitTermination(11, SECONDS). В параметры передаем количество и временную единицу. Этот метод заблокирует основной поток на 11 секунд, после чего мы вызовем shutdownNow() и принудим завершить работу ExecutorService, не дожидаясь выполнения всех задач.


ExecutorService executorService = Executors.newFixedThreadPool(3);
 
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Task(i));
        }
 
        executorService.awaitTermination(11, SECONDS);
 
        executorService.shutdownNow();
        System.out.println(executorService);
    

В конце выведем данные о состоянии executorService.

В консоли мы видим следующее:

Обработан запрос пользователя №0 на потоке pool-1-thread-1
Обработан запрос пользователя №2 на потоке pool-1-thread-3
Обработан запрос пользователя №1 на потоке pool-1-thread-2
Обработан запрос пользователя №4 на потоке pool-1-thread-3
Обработан запрос пользователя №5 на потоке pool-1-thread-2
Обработан запрос пользователя №3 на потоке pool-1-thread-1
Обработан запрос пользователя №6 на потоке pool-1-thread-3
Обработан запрос пользователя №7 на потоке pool-1-thread-2
Обработан запрос пользователя №8 на потоке pool-1-thread-1
Обработан запрос пользователя №9 на потоке pool-1-thread-3
Обработан запрос пользователя №11 на потоке pool-1-thread-1
Обработан запрос пользователя №10 на потоке pool-1-thread-2
Обработан запрос пользователя №13 на потоке pool-1-thread-1
Обработан запрос пользователя №14 на потоке pool-1-thread-2
Обработан запрос пользователя №12 на потоке pool-1-thread-3
java.util.concurrent.ThreadPoolExecutor@452b3a41[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 15]
Обработан запрос пользователя №17 на потоке pool-1-thread-3
Обработан запрос пользователя №15 на потоке pool-1-thread-1
Обработан запрос пользователя №16 на потоке pool-1-thread-2

Далее следуют 3 InterruptedException, которые выкидывают методы sleep из 3 активных задач.

Мы видим, что на момент завершения у нас выполнилось 15 задач, но еще остается 3 активных потока в пуле, которые не закончили выполнение своей задачи. У этих 3-х потоков вызывается interrupt(), а это значит, что выполнение задачи завершится, но в нашем случае метод sleep выкидывает нам InterruptedException. Также мы видим, что после вызова метода shutdownNow() очистилась очередь из задач.

Таким образом, стоит отметить, что принцип работы нужно учитывать при использовании ExecutorService с фиксированным количеством потоков в пуле. Использовать этот вид стоит для задач с заранее известной постоянной нагрузкой.

Есть еще один интересный вопрос: если нужно использовать экзекутор на 1 поток, какой метод вызвать — newSingleThreadExecutor() или newFixedThreadPool(1)?

В поведении оба экзекутора будут эквивалентными. Разница будет лишь в том, что метод newSingleThreadExecutor() вернет такой экзекутор, который в дальнейшем никак нельзя перенастроить на использование дополнительных потоков.