Wstęp
Wiemy więc, że w Javie istnieją wątki, o czym przeczytasz w recenzji „ Jvy wątkiem nie zepsujesz: część I – wątki ”. Spójrzmy jeszcze raz na przykładowy kod:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Jak widzimy, kod do uruchomienia zadania jest dość standardowy, ale przy każdym nowym uruchomieniu będziemy musieli go powtarzać. Jednym z rozwiązań jest przeniesienie go do osobnej metody, np execute(Runnable runnable)
. . Ale programiści Java już się o nas martwili i wymyślili interfejs Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Jak widać, kod stał się bardziej zwięzły i pozwolił nam po prostu napisać kod uruchamiający go Runnable
w wątku. Świetnie, prawda? Ale to dopiero początek:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
ma interfejs potomny ExecutorService
. Dokument JavaDoc tego interfejsu mówi, że ExecutorService
jest to opis specjalnego Executor
„a”, który udostępnia metody zatrzymywania pracy Executor
„a” i umożliwia śledzenie java.util.concurrent.Future
postępu wykonywania. Wcześniej w artykule „ Nie można zepsuć Javy za pomocą wątku: część IV – Callable, Future and Friends ” pokrótce omówiliśmy możliwości Future
. Jeśli zapomniałeś lub nie przeczytałeś, radzę odświeżyć pamięć ;) Co jeszcze ciekawego jest w JavaDoc? Że mamy specjalną fabrykę java.util.concurrent.Executors
, która pozwala nam tworzyć implementacje, które są domyślnie dostępne ExecutorService
.
Usługa wykonawcy
Przypomnijmy jeszcze raz. MusimyExecutor
wykonać (tj. wykonać) określone zadanie w wątku, gdy realizacja utworzenia wątku jest przed nami ukryta. Mamy ExecutorService
specjalny Executor
, który posiada zestaw możliwości zarządzania postępem realizacji. I mamy fabrykę Executors
, która pozwala tworzyć ExecutorService
. Zróbmy to teraz sami:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Jak widzimy, określiliśmy stałą pulę wątków ( Fixed Thread Pool
) o rozmiarze 2. Po czym po kolei wysyłamy zadania do puli. Każde zadanie zwraca ciąg ( String
) zawierający nazwę wątku ( currentThread().getName()
). Ważne jest, aby zamknąć program na samym końcu ExecutorService
, ponieważ w przeciwnym razie nasz program nie zakończy działania. W fabryce Executors
istnieją inne metody fabryczne . Przykładowo możemy stworzyć pulę zawierającą tylko jeden wątek - newSingleThreadExecutor
lub pulę z buforowaniem newCachedThreadPool
, w której wątki będą usuwane z puli jeśli będą bezczynne przez 1 minutę. W rzeczywistości za nimi ExecutorService
znajduje się kolejka blokująca , w której umieszczane są zadania i z której te zadania są wykonywane. Więcej informacji na temat blokowania kolejek można zobaczyć w filmie „ Blokowanie kolejki – Kolekcje #5 – Zaawansowana Java ”. Możesz także przeczytać recenzję „ Blokowanie kolejek pakietu współbieżnego ” i odpowiedź na pytanie „ Kiedy preferować LinkedBlockingQueue zamiast ArrayBlockingQueue? ” Super uproszczone - BlockingQueue
(kolejka blokująca) blokuje wątek w dwóch przypadkach:
- wątek próbuje pobrać elementy z pustej kolejki
- wątek próbuje umieścić elementy w pełnej kolejce
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Lub
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Jak widać implementacje tworzone są wewnątrz metod fabrycznych ExecutorService
. I to w zasadzie tyle ThreadPoolExecutor
. Zmieniają się tylko atrybuty mające wpływ na pracę.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Jak widzieliśmy wcześniej, wewnątrz metod fabrycznychThreadPoolExecutor
. Na funkcjonalność wpływa to, jakie wartości są przekazywane jako maksymalna i minimalna liczba wątków, a także używana kolejka. Można zastosować dowolną implementację interfejsu java.util.concurrent.BlockingQueue
. Skoro już o tym ThreadPoolExecutor
mowa, warto zwrócić uwagę na ciekawe funkcje podczas pracy. Na przykład nie możesz wysyłać zadań do, ThreadPoolExecutor
jeśli nie ma tam miejsca:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ten kod zakończy się niepowodzeniem z powodu błędu takiego jak:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Oznacza to, że task
nie możesz się poddać, ponieważ SynchronousQueue
jest zaprojektowany w taki sposób, że właściwie składa się z jednego elementu i nie pozwala na umieszczenie w nim więcej. Jak widać queued tasks
jest tu 0 i nie ma w tym nic dziwnego, bo to jest specyficzne SynchronousQueue
- tak naprawdę jest to kolejka 1 elementu, która jest zawsze pusta. (!) Gdy jeden wątek umieści element w kolejce, będzie czekał, aż inny wątek pobierze element z kolejki. Dlatego możemy zastąpić new LinkedBlockingQueue<>(1)
i to, co zostanie wskazane w błędzie, ulegnie zmianie queued tasks = 1
. Ponieważ kolejka ma tylko 1 element, wówczas nie możemy dodać drugiego. I na tym spadniemy. Kontynuując temat kolejki warto zauważyć, że klasa ThreadPoolExecutor
posiada dodatkowe metody obsługi kolejki. Na przykład metoda threadPoolExecutor.purge()
usunie wszystkie anulowane zadania z kolejki, aby zwolnić miejsce w kolejce. Kolejną interesującą funkcją związaną z kolejką jest procedura obsługi nieakceptowanych zadań:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Na przykład procedura obsługi po prostu wypisuje słowo Rejected
w przypadku każdej odmowy przyjęcia zadania do kolejki. Wygodne, prawda? Poza tym ThreadPoolExecutor
ma ciekawego spadkobiercę – ScheduledThreadPoolExecutor
kim jest ScheduledExecutorService
. Zapewnia możliwość wykonania zadania na timerze.
Usługa ScheduledExecutor
ExecutorService
typu ScheduledExecutorService
pozwalają na uruchamianie zadań według harmonogramu. Spójrzmy na przykład:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Tutaj wszystko jest proste. Zadania są wysyłane, my otrzymujemy „zaplanowane zadanie” java.util.concurrent.ScheduledFuture
. Poniższy przypadek może być również przydatny z harmonogramem:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Tutaj wysyłamy Runnable
zadanie do wykonania ze stałą szybkością (Fixed Rate) z pewnym opóźnieniem. W takim przypadku po 1 sekundzie co 2 sekundy rozpocznij wykonywanie zadania. Istnieje podobna opcja:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ale tutaj zadania są wykonywane z określonym odstępem POMIĘDZY wykonaniem różnych zadań. Oznacza to, że zadanie task
zostanie wykonane w ciągu 1 sekundy. Następnie, gdy tylko zostanie ukończone, upłyną 2 sekundy, po czym zostanie uruchomione nowe zadanie. Możesz przeczytać następujące materiały na ten temat:
- Wprowadzenie do pul wątków
- Wprowadzenie do pul wątków
- Wielowątkowy bieg z przeszkodami w Javie: anulowanie zadań w executorach
- Wybieranie odpowiednich modułów wykonawczych Java do zadań w tle
https://zone.com/articles/diving-into-java-8s-newworkstealingpools
Praca KradzieżBasen
Oprócz wspomnianych powyżej pul wątków istnieje jeszcze jedna. Można powiedzieć, że jest trochę wyjątkowy. Nazywa się Pula Kradzieży Pracy. Krótko mówiąc, Work Stealing to algorytm pracy, w którym bezczynne wątki zaczynają pobierać zadania z innych wątków lub zadania z kolejki ogólnej. Spójrzmy na przykład:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Jeśli uruchomimy ten kod, ExecutorService
utworzy on 5 wątków, ponieważ każdy wątek dołączy do kolejki oczekiwania w lokalizacji obiektu lock
. O monitorach i blokadach pisaliśmy już w artykule „ Nie można zepsuć Javy wątkiem: część II – Synchronizacja ”. A teraz zastąpimy Executors.newCachedThreadPool
go Executors.newWorkStealingPool()
. Co się zmieni? Zobaczymy, że nasze zadania są wykonywane nie w 5 wątkach, ale w mniejszej liczbie. Pamiętasz, że cachedThreadPool
dla każdego zadania utworzyłeś własny wątek? Bo wait
zablokował wątek, ale kolejne zadania chciały zostać wykonane i w puli zostały utworzone dla nich nowe wątki. W przypadku StealingPool
wątków nie będą one wiecznie bezczynne wait
, zaczną wykonywać sąsiednie zadania. Czym to się tak różni od innych pul wątków WorkStealingPool
? Bo naprawdę żyje w nim coś magicznego ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Właściwie jest jeszcze jedna różnica. Wątki tworzone domyślnie ForkJoinPool
są wątkami demona, w przeciwieństwie do wątków tworzonych za pomocą zwykłych wątków ThreadPool
. Generalnie warto pamiętać o wątkach demonicznych, bo... na przykład CompletableFuture
wątki demoniczne są również używane, jeśli nie określisz własnych ThreadFactory
, co spowoduje utworzenie wątków innych niż demony. Takie niespodzianki mogą na Ciebie czekać w nieoczekiwanym miejscu!)
Rozwidlić/dołączyć do puli
W tej części porozmawiamy o tym samym frameworkuForkJoinPool
(zwanym także frameworkiem fork/join), który żyje „pod maską” WorkStealingPool
. Ogólnie rzecz biorąc, Fork Join Framework pojawił się w Javie 1.7. I nawet jeśli Java 11 jest już na podwórku, nadal warto o niej pamiętać. Nie jest to najczęstsze zadanie, ale całkiem interesujące. W Internecie można znaleźć dobrą recenzję na ten temat: „ Fork/Join Framework in Java 7 ”. Fork/JoinPool
w swojej twórczości posługuje się taką koncepcją jak java.util.concurrent.RecursiveTask
. Istnieje również analog - java.util.concurrent.RecursiveAction
. RecursiveActions nie zwracają wyniku. Zatem RecursiveTask
podobny do Callable
i RecursiveAction
podobny do Runnable
. Cóż, patrząc na nazwę, widzimy dwie kluczowe metody - fork
i join
. Metoda fork
uruchamia zadanie asynchronicznie w osobnym wątku. Metoda join
pozwala poczekać na zakończenie pracy. Można go wykorzystać na kilka sposobów: To zdjęcie jest częścią slajdu z raportu Aleksieja Shipilewa „ Fork/Join: implementacja, użycie, wydajność ”. Aby było to jaśniejsze, warto obejrzeć jego relację na JEE CONF: „ Funkcje implementacyjne Fork Join ”.
Zreasumowanie
I tak oto kończymy kolejną część recenzji. Ustaliliśmy, co wpadliśmyExecutor
na pomysł wykonywania wątków. Potem postanowiliśmy kontynuować ten pomysł i wpadliśmy na to ExecutorService
. ExecutorService
umożliwia wysyłanie zadań do realizacji za pomocą submit
i invoke
, a także zarządzanie usługą poprzez jej wyłączenie. Ponieważ ExecutorService
„potrzebujemy implementacji, napisaliśmy klasę z metodami fabrycznymi i nazwaliśmy ją Executors
. Umożliwia tworzenie pul wątków ThreadPoolExecutor
. Jednocześnie istnieją pule wątków, które również pozwalają określić harmonogram wykonania, ale WorkStealingPool
jest to ukryte za ForkJoinPool
. Mam nadzieję, że to, co napisano powyżej było dla Ciebie nie tylko interesujące, ale i zrozumiałe) Zawsze chętnie otrzymuję sugestie i uwagi. #Wiaczesław
GO TO FULL VERSION