JavaRush /Blog Java /Random-PL /Nie możesz zepsuć Javy wątkiem: część V - Executor, Threa...
Viacheslav
Poziom 3

Nie możesz zepsuć Javy wątkiem: część V - Executor, ThreadPool, Fork Join

Opublikowano w grupie Random-PL

Wstęp

Wiemy więc, że w Javie istnieją wątki, o czym przeczytasz w recenzji „ Jvy wątkiem nie zepsujesz: część I – wątki ”. Nie możesz zepsuć Javy wątkiem: Część V - Executor, ThreadPool, Fork Join - 1Spójrzmy jeszcze raz na przykładowy kod:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Jak widzimy, kod do uruchomienia zadania jest dość standardowy, ale przy każdym nowym uruchomieniu będziemy musieli go powtarzać. Jednym z rozwiązań jest przeniesienie go do osobnej metody, np execute(Runnable runnable). . Ale programiści Java już się o nas martwili i wymyślili interfejs Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Jak widać, kod stał się bardziej zwięzły i pozwolił nam po prostu napisać kod uruchamiający go Runnablew wątku. Świetnie, prawda? Ale to dopiero początek: Nie możesz zepsuć Javy wątkiem: Część V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Jak widać, interfejs Executorma interfejs potomny ExecutorService. Dokument JavaDoc tego interfejsu mówi, że ExecutorServicejest to opis specjalnego Executor„a”, który udostępnia metody zatrzymywania pracy Executor„a” i umożliwia śledzenie java.util.concurrent.Futurepostępu wykonywania. Wcześniej w artykule „ Nie można zepsuć Javy za pomocą wątku: część IV – Callable, Future and Friends ” pokrótce omówiliśmy możliwości Future. Jeśli zapomniałeś lub nie przeczytałeś, radzę odświeżyć pamięć ;) Co jeszcze ciekawego jest w JavaDoc? Że mamy specjalną fabrykę java.util.concurrent.Executors, która pozwala nam tworzyć implementacje, które są domyślnie dostępne ExecutorService.

Usługa wykonawcy

Przypomnijmy jeszcze raz. Musimy Executorwykonać (tj. wykonać) określone zadanie w wątku, gdy realizacja utworzenia wątku jest przed nami ukryta. Mamy ExecutorServicespecjalny Executor, który posiada zestaw możliwości zarządzania postępem realizacji. I mamy fabrykę Executors, która pozwala tworzyć ExecutorService. Zróbmy to teraz sami:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Jak widzimy, określiliśmy stałą pulę wątków ( Fixed Thread Pool) o rozmiarze 2. Po czym po kolei wysyłamy zadania do puli. Każde zadanie zwraca ciąg ( String) zawierający nazwę wątku ( currentThread().getName()). Ważne jest, aby zamknąć program na samym końcu ExecutorService, ponieważ w przeciwnym razie nasz program nie zakończy działania. W fabryce Executorsistnieją inne metody fabryczne . Przykładowo możemy stworzyć pulę zawierającą tylko jeden wątek - newSingleThreadExecutorlub pulę z buforowaniem newCachedThreadPool, w której wątki będą usuwane z puli jeśli będą bezczynne przez 1 minutę. W rzeczywistości za nimi ExecutorServiceznajduje się kolejka blokująca , w której umieszczane są zadania i z której te zadania są wykonywane. Więcej informacji na temat blokowania kolejek można zobaczyć w filmie „ Blokowanie kolejki – Kolekcje #5 – Zaawansowana Java ”. Możesz także przeczytać recenzję „ Blokowanie kolejek pakietu współbieżnego ” i odpowiedź na pytanie „ Kiedy preferować LinkedBlockingQueue zamiast ArrayBlockingQueue? ” Super uproszczone - BlockingQueue(kolejka blokująca) blokuje wątek w dwóch przypadkach:
  • wątek próbuje pobrać elementy z pustej kolejki
  • wątek próbuje umieścić elementy w pełnej kolejce
Jeśli przyjrzymy się implementacji metod fabrycznych, możemy zobaczyć, jaka jest ich struktura. Na przykład:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
Lub
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Jak widać implementacje tworzone są wewnątrz metod fabrycznych ExecutorService. I to w zasadzie tyle ThreadPoolExecutor. Zmieniają się tylko atrybuty mające wpływ na pracę. Nie możesz zrujnować Javy za pomocą wątku: Część V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Jak widzieliśmy wcześniej, wewnątrz metod fabrycznych ThreadPoolExecutor. Na funkcjonalność wpływa to, jakie wartości są przekazywane jako maksymalna i minimalna liczba wątków, a także używana kolejka. Można zastosować dowolną implementację interfejsu java.util.concurrent.BlockingQueue. Skoro już o tym ThreadPoolExecutormowa, warto zwrócić uwagę na ciekawe funkcje podczas pracy. Na przykład nie możesz wysyłać zadań do, ThreadPoolExecutorjeśli nie ma tam miejsca:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ten kod zakończy się niepowodzeniem z powodu błędu takiego jak:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Oznacza to, że tasknie możesz się poddać, ponieważ SynchronousQueuejest zaprojektowany w taki sposób, że właściwie składa się z jednego elementu i nie pozwala na umieszczenie w nim więcej. Jak widać queued tasksjest tu 0 i nie ma w tym nic dziwnego, bo to jest specyficzne SynchronousQueue- tak naprawdę jest to kolejka 1 elementu, która jest zawsze pusta. (!) Gdy jeden wątek umieści element w kolejce, będzie czekał, aż inny wątek pobierze element z kolejki. Dlatego możemy zastąpić new LinkedBlockingQueue<>(1)i to, co zostanie wskazane w błędzie, ulegnie zmianie queued tasks = 1. Ponieważ kolejka ma tylko 1 element, wówczas nie możemy dodać drugiego. I na tym spadniemy. Kontynuując temat kolejki warto zauważyć, że klasa ThreadPoolExecutorposiada dodatkowe metody obsługi kolejki. Na przykład metoda threadPoolExecutor.purge()usunie wszystkie anulowane zadania z kolejki, aby zwolnić miejsce w kolejce. Kolejną interesującą funkcją związaną z kolejką jest procedura obsługi nieakceptowanych zadań:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Na przykład procedura obsługi po prostu wypisuje słowo Rejectedw przypadku każdej odmowy przyjęcia zadania do kolejki. Wygodne, prawda? Poza tym ThreadPoolExecutorma ciekawego spadkobiercę – ScheduledThreadPoolExecutorkim jest ScheduledExecutorService. Zapewnia możliwość wykonania zadania na timerze.

Usługa ScheduledExecutor

ExecutorServicetypu ScheduledExecutorServicepozwalają na uruchamianie zadań według harmonogramu. Spójrzmy na przykład:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Tutaj wszystko jest proste. Zadania są wysyłane, my otrzymujemy „zaplanowane zadanie” java.util.concurrent.ScheduledFuture. Poniższy przypadek może być również przydatny z harmonogramem:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Tutaj wysyłamy Runnablezadanie do wykonania ze stałą szybkością (Fixed Rate) z pewnym opóźnieniem. W takim przypadku po 1 sekundzie co 2 sekundy rozpocznij wykonywanie zadania. Istnieje podobna opcja:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ale tutaj zadania są wykonywane z określonym odstępem POMIĘDZY wykonaniem różnych zadań. Oznacza to, że zadanie taskzostanie wykonane w ciągu 1 sekundy. Następnie, gdy tylko zostanie ukończone, upłyną 2 sekundy, po czym zostanie uruchomione nowe zadanie. Możesz przeczytać następujące materiały na ten temat: Nie możesz zrujnować Javy za pomocą wątku: Część V - Executor, ThreadPool, Fork Join - 4

https://zone.com/articles/diving-into-java-8s-newworkstealingpools

Praca KradzieżBasen

Oprócz wspomnianych powyżej pul wątków istnieje jeszcze jedna. Można powiedzieć, że jest trochę wyjątkowy. Nazywa się Pula Kradzieży Pracy. Krótko mówiąc, Work Stealing to algorytm pracy, w którym bezczynne wątki zaczynają pobierać zadania z innych wątków lub zadania z kolejki ogólnej. Spójrzmy na przykład:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Jeśli uruchomimy ten kod, ExecutorServiceutworzy on 5 wątków, ponieważ każdy wątek dołączy do kolejki oczekiwania w lokalizacji obiektu lock. O monitorach i blokadach pisaliśmy już w artykule „ Nie można zepsuć Javy wątkiem: część II – Synchronizacja ”. A teraz zastąpimy Executors.newCachedThreadPoolgo Executors.newWorkStealingPool(). Co się zmieni? Zobaczymy, że nasze zadania są wykonywane nie w 5 wątkach, ale w mniejszej liczbie. Pamiętasz, że cachedThreadPooldla każdego zadania utworzyłeś własny wątek? Bo waitzablokował wątek, ale kolejne zadania chciały zostać wykonane i w puli zostały utworzone dla nich nowe wątki. W przypadku StealingPoolwątków nie będą one wiecznie bezczynne wait, zaczną wykonywać sąsiednie zadania. Czym to się tak różni od innych pul wątków WorkStealingPool? Bo naprawdę żyje w nim coś magicznego ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Właściwie jest jeszcze jedna różnica. Wątki tworzone domyślnie ForkJoinPoolsą wątkami demona, w przeciwieństwie do wątków tworzonych za pomocą zwykłych wątków ThreadPool. Generalnie warto pamiętać o wątkach demonicznych, bo... na przykład CompletableFuturewątki demoniczne są również używane, jeśli nie określisz własnych ThreadFactory, co spowoduje utworzenie wątków innych niż demony. Takie niespodzianki mogą na Ciebie czekać w nieoczekiwanym miejscu!)

Rozwidlić/dołączyć do puli

W tej części porozmawiamy o tym samym frameworku ForkJoinPool(zwanym także frameworkiem fork/join), który żyje „pod maską” WorkStealingPool. Ogólnie rzecz biorąc, Fork Join Framework pojawił się w Javie 1.7. I nawet jeśli Java 11 jest już na podwórku, nadal warto o niej pamiętać. Nie jest to najczęstsze zadanie, ale całkiem interesujące. W Internecie można znaleźć dobrą recenzję na ten temat: „ Fork/Join Framework in Java 7 ”. Fork/JoinPoolw swojej twórczości posługuje się taką koncepcją jak java.util.concurrent.RecursiveTask. Istnieje również analog - java.util.concurrent.RecursiveAction. RecursiveActions nie zwracają wyniku. Zatem RecursiveTaskpodobny do Callablei RecursiveActionpodobny do Runnable. Cóż, patrząc na nazwę, widzimy dwie kluczowe metody - forki join. Metoda forkuruchamia zadanie asynchronicznie w osobnym wątku. Metoda joinpozwala poczekać na zakończenie pracy. Można go wykorzystać na kilka sposobów: Nie możesz zrujnować Javy za pomocą wątku: Część V - Executor, ThreadPool, Fork Join - 5To zdjęcie jest częścią slajdu z raportu Aleksieja Shipilewa „ Fork/Join: implementacja, użycie, wydajność ”. Aby było to jaśniejsze, warto obejrzeć jego relację na JEE CONF: „ Funkcje implementacyjne Fork Join ”.

Zreasumowanie

I tak oto kończymy kolejną część recenzji. Ustaliliśmy, co wpadliśmy Executorna pomysł wykonywania wątków. Potem postanowiliśmy kontynuować ten pomysł i wpadliśmy na to ExecutorService. ExecutorServiceumożliwia wysyłanie zadań do realizacji za pomocą submiti invoke, a także zarządzanie usługą poprzez jej wyłączenie. Ponieważ ExecutorService„potrzebujemy implementacji, napisaliśmy klasę z metodami fabrycznymi i nazwaliśmy ją Executors. Umożliwia tworzenie pul wątków ThreadPoolExecutor. Jednocześnie istnieją pule wątków, które również pozwalają określić harmonogram wykonania, ale WorkStealingPooljest to ukryte za ForkJoinPool. Mam nadzieję, że to, co napisano powyżej było dla Ciebie nie tylko interesujące, ale i zrozumiałe) Zawsze chętnie otrzymuję sugestie i uwagi. #Wiaczesław
Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION