JavaRush /Java Blog /Random-IT /Non puoi rovinare Java con un thread: Parte V - Esecutore...
Viacheslav
Livello 3

Non puoi rovinare Java con un thread: Parte V - Esecutore, ThreadPool, Fork Join

Pubblicato nel gruppo Random-IT

introduzione

Quindi, sappiamo che ci sono thread in Java, di cui puoi leggere nella recensione " Non puoi rovinare Java con un thread: Parte I - Thread ". Non puoi rovinare Java con un thread: Parte V - Esecutore, ThreadPool, Fork Join - 1Diamo un'occhiata ancora al codice di esempio:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Come possiamo vedere, il codice per l'avvio dell'attività è abbastanza standard, ma per ogni nuovo avvio dovremo ripeterlo. Una soluzione è spostarlo in un metodo separato, ad esempio execute(Runnable runnable). Ma gli sviluppatori Java si sono già preoccupati per noi e hanno ideato un'interfaccia Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Come puoi vedere, il codice è diventato più conciso e ci ha permesso di scrivere semplicemente codice per eseguirlo Runnablein un thread. Fantastico, non è vero? Ma questo è solo l'inizio: Non puoi rovinare Java con un thread: Parte V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Come puoi vedere, l'interfaccia Executorha un'interfaccia discendente ExecutorService. Il JavaDoc di questa interfaccia dice che ExecutorServicesi tratta di una descrizione di uno speciale Executor"a" che fornisce metodi per interrompere il lavoro Executor"a" e consente di monitorare java.util.concurrent.Futurel'avanzamento dell'esecuzione. In precedenza, in " Non puoi rovinare Java con Thread: Parte IV - Callable, Future and Friends ", abbiamo brevemente esaminato le possibilità Future. Se l'hai dimenticato o non l'hai letto, ti consiglio di rinfrescarti la memoria ;) Quali altre cose interessanti sono scritte in JavaDoc? Che disponiamo di una fabbrica speciale java.util.concurrent.Executorsche ci consente di creare implementazioni disponibili per impostazione predefinita ExecutorService.

ExecutorService

Ricordiamolo ancora. Dobbiamo Executoreseguire (cioè eseguire) un determinato compito in un thread, quando l'implementazione della creazione di un thread ci è nascosta. Ne abbiamo ExecutorServiceuno speciale Executorche ha una serie di funzionalità per gestire l'avanzamento dell'esecuzione. E abbiamo una fabbrica Executorsche ti consente di creare file ExecutorService. Facciamolo da soli ora:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Come possiamo vedere, abbiamo specificato un pool di thread fisso ( Fixed Thread Pool) di dimensione 2. Dopodiché inviamo le attività al pool una per una. Ogni attività restituisce una stringa ( String) contenente il nome del thread ( currentThread().getName()). È importante chiudere alla fine ExecutorService, perché altrimenti il ​​nostro programma non uscirà. ExecutorsEsistono altri metodi di fabbrica in fabbrica . Ad esempio, possiamo creare un pool di un solo thread newSingleThreadExecutoro un pool con caching newCachedThreadPool, in cui i thread verranno rimossi dal pool se rimangono inattivi per 1 minuto. Dietro a questi, infatti, ExecutorServicec'è una coda di blocco in cui vengono inseriti i compiti e da cui questi compiti vengono eseguiti. Maggiori informazioni sul blocco delle code possono essere visualizzate nel video " Blocco coda - Raccolte #5 - Java avanzato ". Puoi anche leggere la recensione “ Blocco delle code del pacchetto simultaneo ” e la risposta alla domanda “ Quando preferire LinkedBlockingQueue rispetto ad ArrayBlockingQueue? ” Super semplificato: BlockingQueue(coda di blocco) blocca un thread, in due casi:
  • un thread sta tentando di ottenere elementi da una coda vuota
  • il thread sta tentando di inserire gli elementi in una coda completa
Se osserviamo l’implementazione dei metodi di fabbrica, possiamo vedere come sono strutturati. Per esempio:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
O
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Come possiamo vedere, le implementazioni vengono create all'interno dei metodi factory ExecutorService. E questo è fondamentalmente tutto ThreadPoolExecutor. Cambiano solo gli attributi che influenzano il lavoro. Non puoi rovinare Java con un thread: Parte V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Come abbiamo visto in precedenza, all'interno dei metodi factory ThreadPoolExecutor, . La funzionalità è influenzata da quali valori vengono passati come thread massimo e minimo, nonché da quale coda viene utilizzata. Ed è possibile utilizzare qualsiasi implementazione dell'interfaccia java.util.concurrent.BlockingQueue. A proposito di ThreadPoolExecutor"ah", vale la pena notare caratteristiche interessanti durante il funzionamento. Ad esempio, non puoi inviare attività a ThreadPoolExecutorse non c'è spazio lì:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Questo codice fallirà con un errore come:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Cioè, tasknon puoi sottometterti, perché SynchronousQueueè progettato in modo tale da consistere effettivamente di un elemento e non consente di inserirne altri. Come possiamo vedere, queued tasksqui c'è 0 e non c'è niente di strano in questo, perché questo è specifico SynchronousQueue: in effetti è una coda di 1 elemento, che è sempre vuota. (!) Quando un thread inserisce un elemento nella coda, attenderà finché un altro thread non prenderà l'elemento dalla coda. Pertanto possiamo sostituire con new LinkedBlockingQueue<>(1)e ciò che verrà indicato nell'errore cambierà queued tasks = 1. Perché la coda è composta da solo 1 elemento, quindi non possiamo aggiungere il secondo. E ci cadremo su questo. Continuando il tema della coda, vale la pena notare che la classe ThreadPoolExecutordispone di metodi aggiuntivi per la manutenzione della coda. Ad esempio, il metodo threadPoolExecutor.purge()rimuoverà tutte le attività annullate dalla coda per liberare spazio nella coda. Un'altra caratteristica interessante relativa alla coda è il gestore delle attività non accettate:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ad esempio, il gestore stampa semplicemente una parola Rejectedper ogni rifiuto di accettare un'attività in coda. Comodo, no? Inoltre, ThreadPoolExecutorha un erede interessante: ScheduledThreadPoolExecutorchi è ScheduledExecutorService. Fornisce la possibilità di eseguire un'attività su un timer.

ScheduledExecutorService

ExecutorServiceil tipo ScheduledExecutorServiceconsente di eseguire attività in base a una pianificazione. Diamo un'occhiata ad un esempio:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Tutto è semplice qui. Le attività vengono inviate, riceviamo una "attività pianificata" java.util.concurrent.ScheduledFuture. Con la pianificazione può essere utile anche il seguente caso:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Qui inviamo Runnablel'attività da eseguire a una tariffa fissa (Fixed Rate) con un certo ritardo. In questo caso, dopo 1 secondo ogni 2 secondi, inizia a eseguire l'attività. C'è un'opzione simile:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ma qui i compiti vengono eseguiti con un dato intervallo TRA l'esecuzione di compiti diversi. Cioè, l'attività taskverrà completata in 1 secondo. Successivamente, non appena sarà completato, passeranno 2 secondi e quindi verrà avviata una nuova attività. Puoi leggere i seguenti materiali su questo argomento: Non puoi rovinare Java con un thread: Parte V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

Lavoro rubatoPiscina

Oltre ai pool di thread menzionati sopra, ce n'è uno in più. Si potrebbe dire che è un po' speciale. Il suo nome è Work Stealing Pool. In breve, il Work Stealing è un algoritmo di lavoro in cui i thread inattivi iniziano a prendere attività da altri thread o attività dalla coda generale. Diamo un'occhiata ad un esempio:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Se eseguiamo questo codice, ExecutorServicecreerà 5 thread, perché ogni thread si unirà alla coda di attesa nella posizione dell'oggetto lock. Abbiamo già discusso dei monitor e dei relativi blocchi in " Non puoi rovinare Java con un thread: Parte II - sincronizzazione ". E ora lo sostituiremo Executors.newCachedThreadPoolcon Executors.newWorkStealingPool(). Cosa cambierà? Vedremo che i nostri compiti non vengono eseguiti in 5 thread, ma in meno. Ricordi che cachedThreadPoolhai creato il tuo thread per ogni attività? Perché waitha bloccato il thread, ma le attività successive volevano essere eseguite e per esse sono stati creati nuovi thread nel pool. Nel caso dei StealingPoolthread, non rimarranno inattivi per sempre wait; inizieranno ad eseguire attività vicine. In che modo è così diverso dagli altri pool di thread WorkStealingPool? Perché c'è davvero qualcosa di magico che vive dentro di lui ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
In realtà c'è un'altra differenza. I thread creati per ForkJoinPoolimpostazione predefinita sono thread daemon, al contrario dei thread creati tramite normali file ThreadPool. In generale, vale la pena ricordare i thread dei demoni, perché... ad esempio, CompletableFuturevengono utilizzati anche i thread daemon, se non si specifica il proprio ThreadFactory, che creerà thread non daemon. Queste sono il tipo di sorprese che possono aspettarti in un posto inaspettato!)

Fork/Unisciti al pool

In questa parte parleremo dello stesso ForkJoinPoolframework (detto anche fork/join) che vive “sotto il cofano” di WorkStealingPool. In generale, il Fork Join Framework è apparso in Java 1.7. E anche se Java 11 è già in cantiere, vale comunque la pena ricordarlo. Non è il compito più comune, ma piuttosto interessante. C'è una buona recensione su questo argomento su Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPoolopera nel suo lavoro con un concetto come java.util.concurrent.RecursiveTask. C'è anche un analogo - java.util.concurrent.RecursiveAction. RecursiveActions non restituisce un risultato. Quindi RecursiveTasksimile a Callablee RecursiveActionsimile a Runnable. Bene, guardando il nome, vediamo due metodi chiave: forke join. Il metodo forkesegue un'attività in modo asincrono in un thread separato. E il metodo jointi consente di attendere il completamento del lavoro. Esistono diversi modi per utilizzarlo: Non puoi rovinare Java con un thread: Parte V - Executor, ThreadPool, Fork Join - 5questa immagine fa parte di una diapositiva del rapporto di Alexey Shipilev " Fork/Join: implementazione, utilizzo, prestazioni ". Per renderlo più chiaro, vale la pena guardare il suo rapporto al JEE CONF: “ Caratteristiche di implementazione del Fork Join ”.

Riassumendo

Quindi, eccoci qui a finire la parte successiva della recensione. Abbiamo capito cosa abbiamo inventato per la prima volta Executorper l'esecuzione dei thread. Poi abbiamo deciso di portare avanti l'idea e ci è venuta in mente ExecutorService. ExecutorServiceconsente di inviare attività per l'esecuzione utilizzando submite invoke, nonché di gestire il servizio disattivandolo. Perché ExecutorService'abbiamo bisogno di implementazioni, abbiamo scritto una classe con metodi factory e l'abbiamo chiamata Executors. Ti consente di creare pool di thread ThreadPoolExecutor. Allo stesso tempo, esistono pool di thread che consentono anche di specificare una pianificazione per l'esecuzione, ma è WorkStealingPoolnascosta dietro ForkJoinPool. Spero che quanto scritto sopra sia stato per voi non solo interessante, ma anche comprensibile) Sono sempre felice di ricevere suggerimenti e commenti. #Viacheslav
Commenti
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION