introduzione
Quindi, sappiamo che ci sono thread in Java, di cui puoi leggere nella recensione " Non puoi rovinare Java con un thread: Parte I - Thread ". Diamo un'occhiata ancora al codice di esempio:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Come possiamo vedere, il codice per l'avvio dell'attività è abbastanza standard, ma per ogni nuovo avvio dovremo ripeterlo. Una soluzione è spostarlo in un metodo separato, ad esempio execute(Runnable runnable)
. Ma gli sviluppatori Java si sono già preoccupati per noi e hanno ideato un'interfaccia Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Come puoi vedere, il codice è diventato più conciso e ci ha permesso di scrivere semplicemente codice per eseguirlo Runnable
in un thread. Fantastico, non è vero? Ma questo è solo l'inizio:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
ha un'interfaccia discendente ExecutorService
. Il JavaDoc di questa interfaccia dice che ExecutorService
si tratta di una descrizione di uno speciale Executor
"a" che fornisce metodi per interrompere il lavoro Executor
"a" e consente di monitorare java.util.concurrent.Future
l'avanzamento dell'esecuzione. In precedenza, in " Non puoi rovinare Java con Thread: Parte IV - Callable, Future and Friends ", abbiamo brevemente esaminato le possibilità Future
. Se l'hai dimenticato o non l'hai letto, ti consiglio di rinfrescarti la memoria ;) Quali altre cose interessanti sono scritte in JavaDoc? Che disponiamo di una fabbrica speciale java.util.concurrent.Executors
che ci consente di creare implementazioni disponibili per impostazione predefinita ExecutorService
.
ExecutorService
Ricordiamolo ancora. DobbiamoExecutor
eseguire (cioè eseguire) un determinato compito in un thread, quando l'implementazione della creazione di un thread ci è nascosta. Ne abbiamo ExecutorService
uno speciale Executor
che ha una serie di funzionalità per gestire l'avanzamento dell'esecuzione. E abbiamo una fabbrica Executors
che ti consente di creare file ExecutorService
. Facciamolo da soli ora:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Come possiamo vedere, abbiamo specificato un pool di thread fisso ( Fixed Thread Pool
) di dimensione 2. Dopodiché inviamo le attività al pool una per una. Ogni attività restituisce una stringa ( String
) contenente il nome del thread ( currentThread().getName()
). È importante chiudere alla fine ExecutorService
, perché altrimenti il nostro programma non uscirà. Executors
Esistono altri metodi di fabbrica in fabbrica . Ad esempio, possiamo creare un pool di un solo thread newSingleThreadExecutor
o un pool con caching newCachedThreadPool
, in cui i thread verranno rimossi dal pool se rimangono inattivi per 1 minuto. Dietro a questi, infatti, ExecutorService
c'è una coda di blocco in cui vengono inseriti i compiti e da cui questi compiti vengono eseguiti. Maggiori informazioni sul blocco delle code possono essere visualizzate nel video " Blocco coda - Raccolte #5 - Java avanzato ". Puoi anche leggere la recensione “ Blocco delle code del pacchetto simultaneo ” e la risposta alla domanda “ Quando preferire LinkedBlockingQueue rispetto ad ArrayBlockingQueue? ” Super semplificato: BlockingQueue
(coda di blocco) blocca un thread, in due casi:
- un thread sta tentando di ottenere elementi da una coda vuota
- il thread sta tentando di inserire gli elementi in una coda completa
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
O
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Come possiamo vedere, le implementazioni vengono create all'interno dei metodi factory ExecutorService
. E questo è fondamentalmente tutto ThreadPoolExecutor
. Cambiano solo gli attributi che influenzano il lavoro.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Come abbiamo visto in precedenza, all'interno dei metodi factoryThreadPoolExecutor
, . La funzionalità è influenzata da quali valori vengono passati come thread massimo e minimo, nonché da quale coda viene utilizzata. Ed è possibile utilizzare qualsiasi implementazione dell'interfaccia java.util.concurrent.BlockingQueue
. A proposito di ThreadPoolExecutor
"ah", vale la pena notare caratteristiche interessanti durante il funzionamento. Ad esempio, non puoi inviare attività a ThreadPoolExecutor
se non c'è spazio lì:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Questo codice fallirà con un errore come:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Cioè, task
non puoi sottometterti, perché SynchronousQueue
è progettato in modo tale da consistere effettivamente di un elemento e non consente di inserirne altri. Come possiamo vedere, queued tasks
qui c'è 0 e non c'è niente di strano in questo, perché questo è specifico SynchronousQueue
: in effetti è una coda di 1 elemento, che è sempre vuota. (!) Quando un thread inserisce un elemento nella coda, attenderà finché un altro thread non prenderà l'elemento dalla coda. Pertanto possiamo sostituire con new LinkedBlockingQueue<>(1)
e ciò che verrà indicato nell'errore cambierà queued tasks = 1
. Perché la coda è composta da solo 1 elemento, quindi non possiamo aggiungere il secondo. E ci cadremo su questo. Continuando il tema della coda, vale la pena notare che la classe ThreadPoolExecutor
dispone di metodi aggiuntivi per la manutenzione della coda. Ad esempio, il metodo threadPoolExecutor.purge()
rimuoverà tutte le attività annullate dalla coda per liberare spazio nella coda. Un'altra caratteristica interessante relativa alla coda è il gestore delle attività non accettate:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ad esempio, il gestore stampa semplicemente una parola Rejected
per ogni rifiuto di accettare un'attività in coda. Comodo, no? Inoltre, ThreadPoolExecutor
ha un erede interessante: ScheduledThreadPoolExecutor
chi è ScheduledExecutorService
. Fornisce la possibilità di eseguire un'attività su un timer.
ScheduledExecutorService
ExecutorService
il tipo ScheduledExecutorService
consente di eseguire attività in base a una pianificazione. Diamo un'occhiata ad un esempio:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Tutto è semplice qui. Le attività vengono inviate, riceviamo una "attività pianificata" java.util.concurrent.ScheduledFuture
. Con la pianificazione può essere utile anche il seguente caso:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Qui inviamo Runnable
l'attività da eseguire a una tariffa fissa (Fixed Rate) con un certo ritardo. In questo caso, dopo 1 secondo ogni 2 secondi, inizia a eseguire l'attività. C'è un'opzione simile:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ma qui i compiti vengono eseguiti con un dato intervallo TRA l'esecuzione di compiti diversi. Cioè, l'attività task
verrà completata in 1 secondo. Successivamente, non appena sarà completato, passeranno 2 secondi e quindi verrà avviata una nuova attività. Puoi leggere i seguenti materiali su questo argomento:
- Un'introduzione ai pool di thread
- Introduzione ai pool di thread
- Java Multithreading Steeplechase: annullamento delle attività negli esecutori
- Scegliere gli esecutori Java corretti per le attività in background
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
Lavoro rubatoPiscina
Oltre ai pool di thread menzionati sopra, ce n'è uno in più. Si potrebbe dire che è un po' speciale. Il suo nome è Work Stealing Pool. In breve, il Work Stealing è un algoritmo di lavoro in cui i thread inattivi iniziano a prendere attività da altri thread o attività dalla coda generale. Diamo un'occhiata ad un esempio:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Se eseguiamo questo codice, ExecutorService
creerà 5 thread, perché ogni thread si unirà alla coda di attesa nella posizione dell'oggetto lock
. Abbiamo già discusso dei monitor e dei relativi blocchi in " Non puoi rovinare Java con un thread: Parte II - sincronizzazione ". E ora lo sostituiremo Executors.newCachedThreadPool
con Executors.newWorkStealingPool()
. Cosa cambierà? Vedremo che i nostri compiti non vengono eseguiti in 5 thread, ma in meno. Ricordi che cachedThreadPool
hai creato il tuo thread per ogni attività? Perché wait
ha bloccato il thread, ma le attività successive volevano essere eseguite e per esse sono stati creati nuovi thread nel pool. Nel caso dei StealingPool
thread, non rimarranno inattivi per sempre wait
; inizieranno ad eseguire attività vicine. In che modo è così diverso dagli altri pool di thread WorkStealingPool
? Perché c'è davvero qualcosa di magico che vive dentro di lui ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
In realtà c'è un'altra differenza. I thread creati per ForkJoinPool
impostazione predefinita sono thread daemon, al contrario dei thread creati tramite normali file ThreadPool
. In generale, vale la pena ricordare i thread dei demoni, perché... ad esempio, CompletableFuture
vengono utilizzati anche i thread daemon, se non si specifica il proprio ThreadFactory
, che creerà thread non daemon. Queste sono il tipo di sorprese che possono aspettarti in un posto inaspettato!)
Fork/Unisciti al pool
In questa parte parleremo dello stessoForkJoinPool
framework (detto anche fork/join) che vive “sotto il cofano” di WorkStealingPool
. In generale, il Fork Join Framework è apparso in Java 1.7. E anche se Java 11 è già in cantiere, vale comunque la pena ricordarlo. Non è il compito più comune, ma piuttosto interessante. C'è una buona recensione su questo argomento su Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPool
opera nel suo lavoro con un concetto come java.util.concurrent.RecursiveTask
. C'è anche un analogo - java.util.concurrent.RecursiveAction
. RecursiveActions non restituisce un risultato. Quindi RecursiveTask
simile a Callable
e RecursiveAction
simile a Runnable
. Bene, guardando il nome, vediamo due metodi chiave: fork
e join
. Il metodo fork
esegue un'attività in modo asincrono in un thread separato. E il metodo join
ti consente di attendere il completamento del lavoro. Esistono diversi modi per utilizzarlo: questa immagine fa parte di una diapositiva del rapporto di Alexey Shipilev " Fork/Join: implementazione, utilizzo, prestazioni ". Per renderlo più chiaro, vale la pena guardare il suo rapporto al JEE CONF: “ Caratteristiche di implementazione del Fork Join ”.
Riassumendo
Quindi, eccoci qui a finire la parte successiva della recensione. Abbiamo capito cosa abbiamo inventato per la prima voltaExecutor
per l'esecuzione dei thread. Poi abbiamo deciso di portare avanti l'idea e ci è venuta in mente ExecutorService
. ExecutorService
consente di inviare attività per l'esecuzione utilizzando submit
e invoke
, nonché di gestire il servizio disattivandolo. Perché ExecutorService
'abbiamo bisogno di implementazioni, abbiamo scritto una classe con metodi factory e l'abbiamo chiamata Executors
. Ti consente di creare pool di thread ThreadPoolExecutor
. Allo stesso tempo, esistono pool di thread che consentono anche di specificare una pianificazione per l'esecuzione, ma è WorkStealingPool
nascosta dietro ForkJoinPool
. Spero che quanto scritto sopra sia stato per voi non solo interessante, ma anche comprensibile) Sono sempre felice di ricevere suggerimenti e commenti. #Viacheslav
GO TO FULL VERSION