JavaRush /Blogue Java /Random-PT /Você não pode estragar o Java com um thread: Parte V - Ex...
Viacheslav
Nível 3

Você não pode estragar o Java com um thread: Parte V - Executor, ThreadPool, Fork Join

Publicado no grupo Random-PT

Introdução

Portanto, sabemos que existem threads em Java, sobre os quais você pode ler na revisão “ Você não pode estragar Java com um thread: Parte I - Threads ”. Você não pode estragar Java com um Thread: Parte V - Executor, ThreadPool, Fork Join - 1Vejamos o código de exemplo novamente:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Como podemos ver, o código para iniciar a tarefa é bastante padronizado, mas para cada novo lançamento teremos que repeti-lo. Uma solução é movê-lo para um método separado, por exemplo execute(Runnable runnable). Mas os desenvolvedores Java já se preocuparam conosco e criaram uma interface Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Como você pode ver, o código se tornou mais conciso e nos permitiu simplesmente escrever código para executá-lo Runnableem um thread. Ótimo, não é? Mas isso é apenas o começo: Você não pode estragar o Java com um thread: Parte V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Como você pode ver, a interface Executorpossui uma interface descendente ExecutorService. O JavaDoc desta interface diz que ExecutorServiceé uma descrição de um Executor'a' especial que fornece métodos para interromper o trabalho Executor'a' e permite acompanhar java.util.concurrent.Futureo progresso da execução. Anteriormente, em “ You Can’t Spoil Java with Thread: Part IV – Callable, Future and Friends ”, revisamos brevemente as possibilidades Future. Se você esqueceu ou não leu, aconselho a refrescar a memória ;) O que mais há de interessante no JavaDoc? Que temos uma fábrica especial java.util.concurrent.Executorsque nos permite criar implementações que estão disponíveis por padrão ExecutorService.

ExecutorService

Vamos lembrar novamente. Temos Executorque executar (ou seja, executar) uma determinada tarefa em um thread, quando a implementação da criação de um thread está oculta para nós. Temos ExecutorServiceum especial Executorque possui um conjunto de capacidades para gerenciar o andamento da execução. E temos uma fábrica Executorsque permite criar arquivos ExecutorService. Vamos fazer isso nós mesmos agora:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Como podemos ver, especificamos um pool de threads fixo ( Fixed Thread Pool) de tamanho 2. Após o qual enviamos tarefas para o pool uma por uma. Cada tarefa retorna uma string ( String) contendo o nome do thread ( currentThread().getName()). É importante desligar bem no final ExecutorService, caso contrário nosso programa não será encerrado. ExecutorsExistem outros métodos de fábrica na fábrica . Por exemplo, podemos criar um pool de apenas um thread - newSingleThreadExecutorou um pool com cache newCachedThreadPool, onde os threads serão removidos do pool se ficarem inativos por 1 minuto. Na verdade, por trás deles ExecutorServiceexiste uma fila de bloqueio na qual as tarefas são colocadas e a partir da qual essas tarefas são executadas. Mais informações sobre bloqueio de filas podem ser vistas no vídeo " Bloqueio de filas - Coleções #5 - Java Avançado ". Você também pode ler a revisão “ Bloqueando filas do pacote simultâneo ” e a resposta à pergunta “ Quando preferir LinkedBlockingQueue em vez de ArrayBlockingQueue? ” Super simplificado - BlockingQueue(fila de bloqueio) bloqueia um thread, em dois casos:
  • um thread está tentando obter elementos de uma fila vazia
  • o thread está tentando colocar elementos em uma fila completa
Se observarmos a implementação dos métodos de fábrica, podemos ver como eles estão estruturados. Por exemplo:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
ou
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Como podemos ver, as implementações são criadas dentro de métodos de fábrica ExecutorService. E é basicamente isso ThreadPoolExecutor. Somente os atributos que afetam o trabalho mudam. Você não pode estragar o Java com um thread: Parte V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Como vimos anteriormente, dentro dos métodos de fábrica ThreadPoolExecutor,. A funcionalidade é afetada por quais valores são passados ​​como threads máximo e mínimo, bem como pela fila usada. E qualquer implementação da interface pode ser usada java.util.concurrent.BlockingQueue. Falando em ThreadPoolExecutor'ahs, vale destacar características interessantes durante a operação. Por exemplo, você não pode enviar tarefas para ThreadPoolExecutorse não houver espaço lá:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Este código falhará com um erro como:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Ou seja, taskvocê não pode se submeter, porque SynchronousQueueele é projetado de tal forma que na verdade consiste em um elemento e não permite colocar mais nele. Como podemos ver, queued taskshá 0 aqui, e não há nada de estranho nisso, porque isso é específico SynchronousQueue- na verdade, é uma fila de 1 elemento, que está sempre vazia. (!) Quando um thread coloca um elemento na fila, ele esperará até que outro thread retire o elemento da fila. Portanto, podemos substituir por new LinkedBlockingQueue<>(1)e o que for indicado no erro mudará queued tasks = 1. Porque a fila tem apenas 1 elemento, então não podemos adicionar o segundo. E vamos cair nisso. Continuando com o tema da fila, é importante notar que a classe ThreadPoolExecutorpossui métodos adicionais para atender a fila. Por exemplo, o método threadPoolExecutor.purge()removerá todas as tarefas canceladas da fila para liberar espaço na fila. Outro recurso interessante relacionado à fila é o manipulador de tarefas não aceitas:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Por exemplo, o manipulador simplesmente imprime uma palavra Rejectedpara cada recusa em aceitar uma tarefa na fila. Conveniente, não é? Além disso, ThreadPoolExecutorele tem um herdeiro interessante – ScheduledThreadPoolExecutorque é ScheduledExecutorService. Ele fornece a capacidade de executar uma tarefa em um cronômetro.

ScheduledExecutorService

ExecutorServicetipo ScheduledExecutorServicepermitem que você execute tarefas de acordo com uma programação. Vejamos um exemplo:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Tudo é simples aqui. As tarefas são enviadas, recebemos uma “tarefa agendada” java.util.concurrent.ScheduledFuture. O seguinte caso também pode ser útil com o cronograma:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aqui enviamos Runnablea tarefa para ser executada a uma taxa fixa com um certo atraso. Neste caso, após 1 segundo a cada 2 segundos, comece a executar a tarefa. Existe uma opção semelhante:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Mas aqui as tarefas são executadas com um determinado intervalo ENTRE a execução de diferentes tarefas. Ou seja, a tarefa taskserá concluída em 1 segundo. A seguir, assim que for concluído, passarão 2 segundos e então uma nova tarefa será iniciada. Você pode ler os seguintes materiais sobre este tópico: Você não pode estragar o Java com um thread: Parte V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Além dos pools de threads mencionados acima, existe mais um. Você poderia dizer que ele é um pouco especial. Seu nome é Work Stealing Pool. Resumindo, Work Stealing é um algoritmo de trabalho no qual threads ociosos começam a pegar tarefas de outros threads ou tarefas da fila geral. Vejamos um exemplo:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Se executarmos este código, ExecutorServiceele criará 5 threads, porque cada thread entrará na fila de espera no local do objeto lock. Já discutimos sobre monitores e bloqueios em “ Você não pode estragar o Java com um thread: Parte II – Sincronização ”. E agora vamos substituí-lo Executors.newCachedThreadPoolpor Executors.newWorkStealingPool(). O que vai mudar? Veremos que nossas tarefas são realizadas não em 5 threads, mas em menos. Lembra que cachedThreadPoolvocê criou seu próprio tópico para cada tarefa? Porque waitbloqueou o thread, mas as próximas tarefas queriam ser executadas e novos threads foram criados no pool para elas. No caso de StealingPoolthreads, eles não ficarão ociosos para sempre wait, eles começarão a executar tarefas vizinhas. Como isso é tão diferente de outros pools de threads WorkStealingPool? Porque na verdade há algo mágico vivendo dentro dele ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Na verdade, há mais uma diferença. Os threads criados por ForkJoinPoolpadrão são threads daemon, em oposição aos threads criados por meio de ThreadPool. Em geral, vale a pena lembrar dos threads daemon, porque... por exemplo, CompletableFuturethreads daemon também são usados, se você não especificar o seu próprio ThreadFactory, o que criará threads não-daemon. Este é o tipo de surpresas que podem esperar por você em um lugar inesperado!)

Bifurcar/juntar-se ao pool

Nesta parte falaremos sobre o mesmo ForkJoinPool(também chamado de framework fork/join) que vive “nos bastidores” do WorkStealingPool. Em geral, o Fork Join Framework apareceu no Java 1.7. E mesmo que o Java 11 já esteja disponível, ainda vale a pena lembrar. Não é a tarefa mais comum, mas bastante interessante. Há uma boa revisão sobre esse assunto na Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPoolopera em seu trabalho com um conceito como java.util.concurrent.RecursiveTask. Há também um analógico - java.util.concurrent.RecursiveAction. RecursiveActions não retornam resultado. Assim , RecursiveTasksemelhante a e semelhante a . Bem, olhando para o nome, vemos dois métodos principais - e . O método executa uma tarefa de forma assíncrona em um thread separado. E o método permite aguardar a conclusão do trabalho. Existem várias maneiras de usá-lo: Esta imagem faz parte de um slide do relatório de Alexey Shipilev “ Fork/Join: implementação, uso, desempenho ”. Para deixar mais claro, vale a pena assistir ao seu relatório no JEE CONF: “ Fork Join Implementation Features ”. CallableRecursiveActionRunnableforkjoinforkjoinVocê não pode estragar o Java com um thread: Parte V - Executor, ThreadPool, Fork Join - 5

Resumindo

Então, aqui estamos, finalizando a próxima parte da revisão. Descobrimos o que inventamos primeiro Executorpara executar threads. Então decidimos continuar a ideia e tivemos a ideia ExecutorService. ExecutorServicepermite enviar tarefas para execução usando submite invoke, bem como gerenciar o serviço desativando-o. Porque ExecutorService'precisamos de implementações, escrevemos uma classe com métodos de fábrica e a chamamos Executors. Ele permite que você crie pools de threads ThreadPoolExecutor. Ao mesmo tempo, existem pools de threads que também permitem especificar um cronograma de execução, mas WorkStealingPoolocultam ForkJoinPool. Espero que o que foi escrito acima não tenha sido apenas interessante para você, mas também compreensível). Fico sempre feliz em receber sugestões e comentários. #Viacheslav
Comentários
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION