Introdução
Portanto, sabemos que existem threads em Java, sobre os quais você pode ler na revisão “ Você não pode estragar Java com um thread: Parte I - Threads ”. Vejamos o código de exemplo novamente:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Como podemos ver, o código para iniciar a tarefa é bastante padronizado, mas para cada novo lançamento teremos que repeti-lo. Uma solução é movê-lo para um método separado, por exemplo execute(Runnable runnable)
. Mas os desenvolvedores Java já se preocuparam conosco e criaram uma interface Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Como você pode ver, o código se tornou mais conciso e nos permitiu simplesmente escrever código para executá-lo Runnable
em um thread. Ótimo, não é? Mas isso é apenas o começo:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
possui uma interface descendente ExecutorService
. O JavaDoc desta interface diz que ExecutorService
é uma descrição de um Executor
'a' especial que fornece métodos para interromper o trabalho Executor
'a' e permite acompanhar java.util.concurrent.Future
o progresso da execução. Anteriormente, em “ You Can’t Spoil Java with Thread: Part IV – Callable, Future and Friends ”, revisamos brevemente as possibilidades Future
. Se você esqueceu ou não leu, aconselho a refrescar a memória ;) O que mais há de interessante no JavaDoc? Que temos uma fábrica especial java.util.concurrent.Executors
que nos permite criar implementações que estão disponíveis por padrão ExecutorService
.
ExecutorService
Vamos lembrar novamente. TemosExecutor
que executar (ou seja, executar) uma determinada tarefa em um thread, quando a implementação da criação de um thread está oculta para nós. Temos ExecutorService
um especial Executor
que possui um conjunto de capacidades para gerenciar o andamento da execução. E temos uma fábrica Executors
que permite criar arquivos ExecutorService
. Vamos fazer isso nós mesmos agora:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Como podemos ver, especificamos um pool de threads fixo ( Fixed Thread Pool
) de tamanho 2. Após o qual enviamos tarefas para o pool uma por uma. Cada tarefa retorna uma string ( String
) contendo o nome do thread ( currentThread().getName()
). É importante desligar bem no final ExecutorService
, caso contrário nosso programa não será encerrado. Executors
Existem outros métodos de fábrica na fábrica . Por exemplo, podemos criar um pool de apenas um thread - newSingleThreadExecutor
ou um pool com cache newCachedThreadPool
, onde os threads serão removidos do pool se ficarem inativos por 1 minuto. Na verdade, por trás deles ExecutorService
existe uma fila de bloqueio na qual as tarefas são colocadas e a partir da qual essas tarefas são executadas. Mais informações sobre bloqueio de filas podem ser vistas no vídeo " Bloqueio de filas - Coleções #5 - Java Avançado ". Você também pode ler a revisão “ Bloqueando filas do pacote simultâneo ” e a resposta à pergunta “ Quando preferir LinkedBlockingQueue em vez de ArrayBlockingQueue? ” Super simplificado - BlockingQueue
(fila de bloqueio) bloqueia um thread, em dois casos:
- um thread está tentando obter elementos de uma fila vazia
- o thread está tentando colocar elementos em uma fila completa
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ou
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Como podemos ver, as implementações são criadas dentro de métodos de fábrica ExecutorService
. E é basicamente isso ThreadPoolExecutor
. Somente os atributos que afetam o trabalho mudam.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Como vimos anteriormente, dentro dos métodos de fábricaThreadPoolExecutor
,. A funcionalidade é afetada por quais valores são passados como threads máximo e mínimo, bem como pela fila usada. E qualquer implementação da interface pode ser usada java.util.concurrent.BlockingQueue
. Falando em ThreadPoolExecutor
'ahs, vale destacar características interessantes durante a operação. Por exemplo, você não pode enviar tarefas para ThreadPoolExecutor
se não houver espaço lá:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Este código falhará com um erro como:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Ou seja, task
você não pode se submeter, porque SynchronousQueue
ele é projetado de tal forma que na verdade consiste em um elemento e não permite colocar mais nele. Como podemos ver, queued tasks
há 0 aqui, e não há nada de estranho nisso, porque isso é específico SynchronousQueue
- na verdade, é uma fila de 1 elemento, que está sempre vazia. (!) Quando um thread coloca um elemento na fila, ele esperará até que outro thread retire o elemento da fila. Portanto, podemos substituir por new LinkedBlockingQueue<>(1)
e o que for indicado no erro mudará queued tasks = 1
. Porque a fila tem apenas 1 elemento, então não podemos adicionar o segundo. E vamos cair nisso. Continuando com o tema da fila, é importante notar que a classe ThreadPoolExecutor
possui métodos adicionais para atender a fila. Por exemplo, o método threadPoolExecutor.purge()
removerá todas as tarefas canceladas da fila para liberar espaço na fila. Outro recurso interessante relacionado à fila é o manipulador de tarefas não aceitas:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Por exemplo, o manipulador simplesmente imprime uma palavra Rejected
para cada recusa em aceitar uma tarefa na fila. Conveniente, não é? Além disso, ThreadPoolExecutor
ele tem um herdeiro interessante – ScheduledThreadPoolExecutor
que é ScheduledExecutorService
. Ele fornece a capacidade de executar uma tarefa em um cronômetro.
ScheduledExecutorService
ExecutorService
tipo ScheduledExecutorService
permitem que você execute tarefas de acordo com uma programação. Vejamos um exemplo:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Tudo é simples aqui. As tarefas são enviadas, recebemos uma “tarefa agendada” java.util.concurrent.ScheduledFuture
. O seguinte caso também pode ser útil com o cronograma:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aqui enviamos Runnable
a tarefa para ser executada a uma taxa fixa com um certo atraso. Neste caso, após 1 segundo a cada 2 segundos, comece a executar a tarefa. Existe uma opção semelhante:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Mas aqui as tarefas são executadas com um determinado intervalo ENTRE a execução de diferentes tarefas. Ou seja, a tarefa task
será concluída em 1 segundo. A seguir, assim que for concluído, passarão 2 segundos e então uma nova tarefa será iniciada. Você pode ler os seguintes materiais sobre este tópico:
- Uma introdução aos pools de threads
- Introdução aos pools de threads
- Java Multithreading Steeplechase: Cancelando tarefas em executores
- Escolhendo executores Java corretos para tarefas em segundo plano
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Além dos pools de threads mencionados acima, existe mais um. Você poderia dizer que ele é um pouco especial. Seu nome é Work Stealing Pool. Resumindo, Work Stealing é um algoritmo de trabalho no qual threads ociosos começam a pegar tarefas de outros threads ou tarefas da fila geral. Vejamos um exemplo:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Se executarmos este código, ExecutorService
ele criará 5 threads, porque cada thread entrará na fila de espera no local do objeto lock
. Já discutimos sobre monitores e bloqueios em “ Você não pode estragar o Java com um thread: Parte II – Sincronização ”. E agora vamos substituí-lo Executors.newCachedThreadPool
por Executors.newWorkStealingPool()
. O que vai mudar? Veremos que nossas tarefas são realizadas não em 5 threads, mas em menos. Lembra que cachedThreadPool
você criou seu próprio tópico para cada tarefa? Porque wait
bloqueou o thread, mas as próximas tarefas queriam ser executadas e novos threads foram criados no pool para elas. No caso de StealingPool
threads, eles não ficarão ociosos para sempre wait
, eles começarão a executar tarefas vizinhas. Como isso é tão diferente de outros pools de threads WorkStealingPool
? Porque na verdade há algo mágico vivendo dentro dele ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Na verdade, há mais uma diferença. Os threads criados por ForkJoinPool
padrão são threads daemon, em oposição aos threads criados por meio de ThreadPool
. Em geral, vale a pena lembrar dos threads daemon, porque... por exemplo, CompletableFuture
threads daemon também são usados, se você não especificar o seu próprio ThreadFactory
, o que criará threads não-daemon. Este é o tipo de surpresas que podem esperar por você em um lugar inesperado!)
Bifurcar/juntar-se ao pool
Nesta parte falaremos sobre o mesmoForkJoinPool
(também chamado de framework fork/join) que vive “nos bastidores” do WorkStealingPool
. Em geral, o Fork Join Framework apareceu no Java 1.7. E mesmo que o Java 11 já esteja disponível, ainda vale a pena lembrar. Não é a tarefa mais comum, mas bastante interessante. Há uma boa revisão sobre esse assunto na Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPool
opera em seu trabalho com um conceito como java.util.concurrent.RecursiveTask
. Há também um analógico - java.util.concurrent.RecursiveAction
. RecursiveActions não retornam resultado. Assim , RecursiveTask
semelhante a e semelhante a . Bem, olhando para o nome, vemos dois métodos principais - e . O método executa uma tarefa de forma assíncrona em um thread separado. E o método permite aguardar a conclusão do trabalho. Existem várias maneiras de usá-lo: Esta imagem faz parte de um slide do relatório de Alexey Shipilev “ Fork/Join: implementação, uso, desempenho ”. Para deixar mais claro, vale a pena assistir ao seu relatório no JEE CONF: “ Fork Join Implementation Features ”. Callable
RecursiveAction
Runnable
fork
join
fork
join
Resumindo
Então, aqui estamos, finalizando a próxima parte da revisão. Descobrimos o que inventamos primeiroExecutor
para executar threads. Então decidimos continuar a ideia e tivemos a ideia ExecutorService
. ExecutorService
permite enviar tarefas para execução usando submit
e invoke
, bem como gerenciar o serviço desativando-o. Porque ExecutorService
'precisamos de implementações, escrevemos uma classe com métodos de fábrica e a chamamos Executors
. Ele permite que você crie pools de threads ThreadPoolExecutor
. Ao mesmo tempo, existem pools de threads que também permitem especificar um cronograma de execução, mas WorkStealingPool
ocultam ForkJoinPool
. Espero que o que foi escrito acima não tenha sido apenas interessante para você, mas também compreensível). Fico sempre feliz em receber sugestões e comentários. #Viacheslav
GO TO FULL VERSION