Fixed Thread Executor

Модуль 2. Java Core
14 уровень , 2 лекция
Открыта

Метод newFixedThreadPool у класса Executors создаст нам executorService с фиксированным количеством потоков. По сравнению с методом newSingleThreadExecutor, мы указываем, какое количество потоков мы хотим видеть в пуле. Под капотом вызывается


new ThreadPoolExecutor( nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());

В качестве параметров corePoolSize (какое количество потоков будет готово (запущено) при старте executor сервиса) и maximumPoolSize (максимальное количество потоков, которое может создать executor сервис) передается одно и то же число — количество потоков передаваемое в newFixedThreadPool(nThreads). Точно так же мы можем передать в параметры и собственную реализацию ThreadFactory.

Итак, давай разберемся, зачем нам нужен такой ExecutorService.

ExecutorService с фиксированным количеством (n) потоков обладает следующей логикой:

  • Максимум n потоков будут активны для обработки задач.
  • Если передано более n задач, они будут удерживаться в очереди до момента, пока потоки не освободятся.
  • Если в работе одного из потоков произойдет сбой и он завершит свою работу, будет создан новый поток на место сломанного.
  • Любой поток из пула активен до тех пор, пока пул не закрыт.

Как пример представь очередь в аэропорту, где все стоят в одной очереди, а непосредственно перед контролем расходятся на работающее количество пунктов. Если в одном из пунктов будет задержка, очередь будет идти только через второй, пока первый не освободится, а если один из пунктов вовсе сломается, то откроют другой пункт на замену этому и контроль продолжится через два пункта.

Сразу стоит отметить, что условия хоть и идеальные, где обещанные n потоков стабильно работают и всегда будет подмена для завершившихся с ошибкой потоков (чего уже добиться в реальной работе аэропорта невозможно из-за ограниченных ресурсов), все равно система имеет несколько неприятных особенностей, так как потоков больше не станет ни при каких условиях, даже в случае, когда очередь будет увеличиваться быстрее, чем потоки будут обрабатывать задачи.

Предлагаю разобраться на практике, как работает ExecutorService с фиксированным количеством потоков. Давай создадим класс, реализующий Runnable. Объекты этого класса и будут нашими задачами для ExecutorService.


public class Task implements Runnable {
    int taskNumber;
 
    public Task(int taskNumber) {
        this.taskNumber = taskNumber;
    }
 
    @Override
    public void run() {
try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Обработан запрос пользователя №" + taskNumber + " на потоке " + Thread.currentThread().getName());
    }
}
    

В методе run() мы блокируем поток на 2 секунды, имитируя нагрузку, и выводим номер текущей задачи и имя потока, на котором выполняем данную задачу.


ExecutorService executorService = Executors.newFixedThreadPool(3);
 
        for (int i = 0; i < 30; i++) {
            executorService.execute(new Task(i));
        }
        
        executorService.shutdown();
    

Для начала в main создадим ExecutorService и отправим 30 задач на выполнение.

Обработан запрос пользователя №1 на потоке pool-1-thread-2
Обработан запрос пользователя №0 на потоке pool-1-thread-1
Обработан запрос пользователя №2 на потоке pool-1-thread-3
Обработан запрос пользователя №5 на потоке pool-1-thread-3
Обработан запрос пользователя №3 на потоке pool-1-thread-2
Обработан запрос пользователя №4 на потоке pool-1-thread-1
Обработан запрос пользователя №8 на потоке pool-1-thread-1
Обработан запрос пользователя №6 на потоке pool-1-thread-3
Обработан запрос пользователя №7 на потоке pool-1-thread-2
Обработан запрос пользователя №10 на потоке pool-1-thread-3
Обработан запрос пользователя №9 на потоке pool-1-thread-1
Обработан запрос пользователя №11 на потоке pool-1-thread-2
Обработан запрос пользователя №12 на потоке pool-1-thread-3
Обработан запрос пользователя №14 на потоке pool-1-thread-2
Обработан запрос пользователя №13 на потоке pool-1-thread-1
Обработан запрос пользователя №15 на потоке pool-1-thread-3
Обработан запрос пользователя №16 на потоке pool-1-thread-2
Обработан запрос пользователя №17 на потоке pool-1-thread-1
Обработан запрос пользователя №18 на потоке pool-1-thread-3
Обработан запрос пользователя №19 на потоке pool-1-thread-2
Обработан запрос пользователя №20 на потоке pool-1-thread-1
Обработан запрос пользователя №21 на потоке pool-1-thread-3
Обработан запрос пользователя №22 на потоке pool-1-thread-2
Обработан запрос пользователя №23 на потоке pool-1-thread-1
Обработан запрос пользователя №25 на потоке pool-1-thread-2
Обработан запрос пользователя №24 на потоке pool-1-thread-3
Обработан запрос пользователя №26 на потоке pool-1-thread-1
Обработан запрос пользователя №27 на потоке pool-1-thread-2
Обработан запрос пользователя №28 на потоке pool-1-thread-3
Обработан запрос пользователя №29 на потоке pool-1-thread-1

В консоли мы видим, как задачи выполняются на разных потоках по мере их освобождения от предыдущей задачи.

Теперь мы увеличим количество задач до 100, а после отправки на выполнение 100 задач вызовем метод awaitTermination(11, SECONDS). В параметры передаем количество и временную единицу. Этот метод заблокирует основной поток на 11 секунд, после чего мы вызовем shutdownNow() и принудим завершить работу ExecutorService, не дожидаясь выполнения всех задач.


ExecutorService executorService = Executors.newFixedThreadPool(3);
 
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Task(i));
        }
 
        executorService.awaitTermination(11, SECONDS);
 
        executorService.shutdownNow();
        System.out.println(executorService);
    

В конце выведем данные о состоянии executorService.

В консоли мы видим следующее:

Обработан запрос пользователя №0 на потоке pool-1-thread-1
Обработан запрос пользователя №2 на потоке pool-1-thread-3
Обработан запрос пользователя №1 на потоке pool-1-thread-2
Обработан запрос пользователя №4 на потоке pool-1-thread-3
Обработан запрос пользователя №5 на потоке pool-1-thread-2
Обработан запрос пользователя №3 на потоке pool-1-thread-1
Обработан запрос пользователя №6 на потоке pool-1-thread-3
Обработан запрос пользователя №7 на потоке pool-1-thread-2
Обработан запрос пользователя №8 на потоке pool-1-thread-1
Обработан запрос пользователя №9 на потоке pool-1-thread-3
Обработан запрос пользователя №11 на потоке pool-1-thread-1
Обработан запрос пользователя №10 на потоке pool-1-thread-2
Обработан запрос пользователя №13 на потоке pool-1-thread-1
Обработан запрос пользователя №14 на потоке pool-1-thread-2
Обработан запрос пользователя №12 на потоке pool-1-thread-3
java.util.concurrent.ThreadPoolExecutor@452b3a41[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 15]
Обработан запрос пользователя №17 на потоке pool-1-thread-3
Обработан запрос пользователя №15 на потоке pool-1-thread-1
Обработан запрос пользователя №16 на потоке pool-1-thread-2

Далее следуют 3 InterruptedException, которые выкидывают методы sleep из 3 активных задач.

Мы видим, что на момент завершения у нас выполнилось 15 задач, но еще остается 3 активных потока в пуле, которые не закончили выполнение своей задачи. У этих 3-х потоков вызывается interrupt(), а это значит, что выполнение задачи завершится, но в нашем случае метод sleep выкидывает нам InterruptedException. Также мы видим, что после вызова метода shutdownNow() очистилась очередь из задач.

Таким образом, стоит отметить, что принцип работы нужно учитывать при использовании ExecutorService с фиксированным количеством потоков в пуле. Использовать этот вид стоит для задач с заранее известной постоянной нагрузкой.

Есть еще один интересный вопрос: если нужно использовать экзекутор на 1 поток, какой метод вызвать — newSingleThreadExecutor() или newFixedThreadPool(1)?

В поведении оба экзекутора будут эквивалентными. Разница будет лишь в том, что метод newSingleThreadExecutor() вернет такой экзекутор, который в дальнейшем никак нельзя перенастроить на использование дополнительных потоков.

Комментарии (6)
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ
Руслан Уровень 108 Expert
22 мая 2022
Странный валидатор в задаче task2811. В тз написано: "В цикле отправь на исполнение в пул 20 задач Runnable". Исполнял таски с помощью execute, но валидатор не принимал решение пока не поменял на submit, который нужен для Callable<T>. Но в чем смысл? Мы ничего не ждем в ответ, нам просто нужно запустить исполнение задач, для чего там submit?
Igor Уровень 1 Expert
21 июня 2022
Тоже пытался через execute и не понимал почему не проходит валидатор...
Джама Уровень 108 Expert
27 июня 2022
submit перегруженный метод и один из принимаемых параметров как раз Runnable.
Дмитрий Уровень 108 Expert
15 сентября 2022
выходит, лучше всегда использовать сабмит?
Михаил Уровень 102 Expert
16 сентября 2022
Нет, потому что когда другой разработчик откроет код и увидит там sumbit он будет ожидать какой - то возврат
Руслан Уровень 46
29 октября 2025
Согласен ответ вводит в заблуждение метод submit действительно возвращает значение