JavaRush /Java Blog /Random-TL /Hindi Mo Masisira ang Java gamit ang isang Thread: Part V...

Hindi Mo Masisira ang Java gamit ang isang Thread: Part V - Executor, ThreadPool, Fork Join

Nai-publish sa grupo

Panimula

Kaya, alam namin na may mga thread sa Java, na mababasa mo sa pagsusuri na " You Can't Spoil Java with a Thread: Part I - Threads ". Hindi mo masisira ang Java gamit ang isang Thread: Part V - Executor, ThreadPool, Fork Join - 1Tingnan natin muli ang sample code:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Tulad ng nakikita natin, ang code para sa paglulunsad ng gawain ay medyo pamantayan, ngunit para sa bawat bagong paglulunsad ay kailangan nating ulitin ito. Ang isang solusyon ay ilipat ito sa isang hiwalay na paraan, halimbawa execute(Runnable runnable). Ngunit ang mga developer ng Java ay nag-aalala na tungkol sa amin at nakabuo ng isang interface Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Gaya ng nakikita mo, ang code ay naging mas maigsi at pinahintulutan kaming magsulat lamang ng code upang patakbuhin ito Runnablesa isang thread. Mahusay, hindi ba? Ngunit ito ay simula pa lamang: Hindi mo masisira ang Java gamit ang isang thread: Part V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Gaya ng nakikita mo, ang interface Executoray may descendant na interface ExecutorService. Ang JavaDoc ng interface na ito ay nagsasabi na ExecutorServiceito ay isang paglalarawan ng isang espesyal na Executor'a' na nagbibigay ng mga pamamaraan para sa paghinto ng trabahong Executor'a' at nagbibigay-daan sa iyong java.util.concurrent.Futuremasubaybayan ang pag-usad ng pagpapatupad. Dati, sa " You Can't Spoil Java with Thread: Part IV - Callable, Future and Friends, " saglit naming sinuri ang mga posibilidad Future. Kung nakalimutan mo o hindi mo pa ito nabasa, ipinapayo ko sa iyo na i-refresh ang iyong memorya ;) Ano pa ang kawili-wili sa JavaDoc? Na mayroon kaming isang espesyal na pabrika java.util.concurrent.Executorsna nagpapahintulot sa amin na lumikha ng mga pagpapatupad na magagamit bilang default ExecutorService.

ExecutorService

Alalahanin natin muli. Kailangan nating Executormagsagawa (i.e. magsagawa) ng isang tiyak na gawain sa isang thread, kapag ang pagpapatupad ng paglikha ng isang thread ay nakatago mula sa amin. Mayroon kaming ExecutorServiceisang espesyal na isa Executorna may isang hanay ng mga kakayahan para sa pamamahala ng progreso ng pagpapatupad. At mayroon kaming pabrika Executorsna nagpapahintulot sa iyo na lumikha ExecutorService. Gawin natin ito sa ating sarili ngayon:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Tulad ng nakikita natin, tinukoy namin ang isang nakapirming thread pool ( Fixed Thread Pool) na may sukat na 2. Pagkatapos nito ay nagpapadala kami ng mga gawain sa pool nang paisa-isa. Ang bawat gawain ay nagbabalik ng string ( String) na naglalaman ng pangalan ng thread ( currentThread().getName()). Mahalagang i-shutdown sa pinakadulo ExecutorService, dahil kung hindi ay hindi lalabas ang aming programa. ExecutorsMayroong iba pang mga pamamaraan ng pabrika sa pabrika . Halimbawa, maaari tayong gumawa ng pool ng isang thread lang - newSingleThreadExecutoro pool na may caching newCachedThreadPool, kung saan aalisin ang mga thread sa pool kung idle ang mga ito sa loob ng 1 minuto. Sa katunayan, sa likod ng mga ito ExecutorServiceay may nakaharang na pila kung saan inilalagay ang mga gawain at kung saan isinasagawa ang mga gawaing ito. Higit pang impormasyon tungkol sa pag-block ng mga pila ay makikita sa video na " Blocking queue - Collections #5 - Advanced Java ". Maaari mo ring basahin ang pagsusuri na " Pag-block ng mga pila ng kasabay na pakete " at ang sagot sa tanong na " Kailan mas gusto ang LinkedBlockingQueue kaysa sa ArrayBlockingQueue ? Super pinasimple - BlockingQueue(pagba-block ng pila) hinaharangan ang isang thread, sa dalawang kaso:
  • sinusubukan ng isang thread na kumuha ng mga elemento mula sa isang walang laman na pila
  • sinusubukan ng thread na ilagay ang mga elemento sa isang buong pila
Kung titingnan natin ang pagpapatupad ng mga pamamaraan ng pabrika, makikita natin kung paano nakaayos ang mga ito. Halimbawa:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
o
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Tulad ng nakikita natin, ang mga pagpapatupad ay nilikha sa loob ng mga pamamaraan ng pabrika ExecutorService. At iyon talaga ThreadPoolExecutor. Ang mga katangian lamang na nakakaapekto sa trabaho ang nagbabago. Hindi mo masisira ang Java gamit ang isang thread: Part V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Tulad ng nakita natin dati, sa loob ng mga pamamaraan ng pabrika ThreadPoolExecutor, . Ang pag-andar ay apektado ng kung anong mga halaga ang ipinasa bilang maximum at minimum na mga thread, pati na rin kung anong pila ang ginagamit. At anumang pagpapatupad ng interface ay maaaring gamitin java.util.concurrent.BlockingQueue. Sa pagsasalita ng ThreadPoolExecutor'ahs, ito ay nagkakahalaga ng pagpuna sa mga kagiliw-giliw na tampok sa panahon ng operasyon. Halimbawa, hindi ka makakapagpadala ng mga gawain ThreadPoolExecutorkung walang espasyo doon:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Mabibigo ang code na ito sa isang error tulad ng:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Ibig sabihin, taskhindi ka maaaring magsumite, dahil SynchronousQueueito ay dinisenyo sa paraang ito ay aktwal na binubuo ng isang elemento at hindi pinapayagan kang maglagay ng higit pa doon. Tulad ng nakikita natin, queued tasksmayroong 0 dito, at walang kakaiba dito, dahil ito ay tiyak SynchronousQueue- sa katunayan, ito ay isang pila ng 1 elemento, na palaging walang laman. (!) Kapag naglagay ang isang thread ng elemento sa pila, maghihintay ito hanggang sa kunin ng isa pang thread ang elemento mula sa pila. Samakatuwid, maaari naming palitan ng new LinkedBlockingQueue<>(1)at kung ano ang ipahiwatig sa error ay magbabago queued tasks = 1. kasi ang pila ay 1 elemento lamang, pagkatapos ay hindi natin maidaragdag ang pangalawa. At babagsak tayo dito. Sa pagpapatuloy ng tema ng queue, ito ay nagkakahalaga ng noting na ang klase ThreadPoolExecutoray may karagdagang mga pamamaraan para sa servicing ang queue. Halimbawa, threadPoolExecutor.purge()aalisin ng pamamaraan ang lahat ng nakanselang gawain mula sa pila upang magbakante ng espasyo sa pila. Ang isa pang kawili-wiling tampok na nauugnay sa pila ay ang hindi tinatanggap na tagapangasiwa ng gawain:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Halimbawa, ang handler ay nagpi-print lamang ng isang salita Rejectedpara sa bawat pagtanggi na tanggapin ang isang gawain sa pila. Maginhawa, hindi ba? Bilang karagdagan, ThreadPoolExecutormayroon siyang isang kawili-wiling tagapagmana - ScheduledThreadPoolExecutorna si ScheduledExecutorService. Nagbibigay ito ng kakayahang magsagawa ng isang gawain sa isang timer.

ScheduledExecutorService

ExecutorServiceAng uri ay ScheduledExecutorServicenagpapahintulot sa iyo na magpatakbo ng mga gawain ayon sa isang iskedyul. Tingnan natin ang isang halimbawa:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Simple lang ang lahat dito. Ipinapadala ang mga gawain, nakatanggap kami ng "naka-iskedyul na gawain" java.util.concurrent.ScheduledFuture. Ang sumusunod na kaso ay maaari ding maging kapaki-pakinabang sa iskedyul:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Dito ipinapadala namin Runnableang gawain na isasagawa sa isang nakapirming rate na may tiyak na pagkaantala. Sa kasong ito, pagkatapos ng 1 segundo bawat 2 segundo, simulan ang pagpapatupad ng gawain. Mayroong katulad na opsyon:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ngunit narito ang mga gawain ay isinasagawa na may isang ibinigay na pagitan sa PAGITAN ng pagpapatupad ng iba't ibang mga gawain. Ibig sabihin, taskmatatapos ang gawain sa loob ng 1 segundo. Susunod, sa sandaling makumpleto ito, lilipas ang 2 segundo, at pagkatapos ay ilulunsad ang isang bagong gawain. Maaari mong basahin ang mga sumusunod na materyales sa paksang ito: Hindi mo masisira ang Java gamit ang isang thread: Part V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Bilang karagdagan sa mga thread pool na nabanggit sa itaas, mayroong isa pa. Masasabi mong medyo espesyal siya. Ang pangalan nito ay Work Stealing Pool. Sa madaling salita, ang Work Stealing ay isang work algorithm kung saan ang mga idle thread ay nagsisimulang kumuha ng mga gawain mula sa iba pang mga thread o mga gawain mula sa pangkalahatang queue. Tingnan natin ang isang halimbawa:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Kung patakbuhin natin ang code na ito, ExecutorServicelilikha ito ng 5 thread, dahil sasali ang bawat thread sa wait queue sa lokasyon ng object lock. Napag-usapan na natin ang tungkol sa mga monitor at lock dito sa " You Can't Spoil Java with a Thread: Part II - Synchronization ." At ngayon ay papalitan natin ito Executors.newCachedThreadPoolng Executors.newWorkStealingPool(). Ano ang magbabago? Makikita natin na ang ating mga gawain ay ginagampanan hindi sa 5 mga thread, ngunit sa mas kaunti. Tandaan na cachedThreadPoolgumawa ka ng sarili mong thread para sa bawat gawain? Dahil waitna-block nito ang thread, ngunit ang mga susunod na gawain ay gustong isagawa at ang mga bagong thread ay ginawa sa pool para sa kanila. Sa kaso ng StealingPoolmga thread, hindi sila magtatagal magpakailanman sa wait, magsisimula silang magsagawa ng mga kalapit na gawain. Paano ito naiiba sa ibang mga thread pool WorkStealingPool? Dahil may mahiwagang nabubuhay sa loob niya ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
May isa pang pagkakaiba talaga. Ang mga thread na nilikha bilang ForkJoinPooldefault ay mga daemon thread, kumpara sa mga thread na ginawa sa pamamagitan ng regular na ThreadPool. Sa pangkalahatan, ito ay nagkakahalaga ng pag-alala tungkol sa mga thread ng daemon, dahil... halimbawa, CompletableFutureginagamit din ang mga daemon thread, kung hindi mo tinukoy ang sarili mong ThreadFactory, na lilikha ng mga non-daemon na thread. Ito ang mga uri ng mga sorpresa na maaaring maghintay sa iyo sa isang hindi inaasahang lugar!)

Fork/Join Pool

Sa bahaging ito ay pag-uusapan natin ang tungkol sa parehong ForkJoinPool(tinatawag ding fork/join framework) na nabubuhay "sa ilalim ng talukbong" ng WorkStealingPool. Sa pangkalahatan, lumitaw ang Fork Join Framework sa Java 1.7. At kahit na ang Java 11 ay nasa bakuran na, sulit pa rin itong alalahanin. Hindi ang pinakakaraniwang gawain, ngunit medyo kawili-wili. Mayroong magandang pagsusuri sa paksang ito sa Internet: " Fork/Join Framework sa Java 7 ". Fork/JoinPoolgumagana sa kanyang trabaho na may ganitong konsepto bilang java.util.concurrent.RecursiveTask. Mayroon ding analogue - java.util.concurrent.RecursiveAction. Ang RecursiveActions ay hindi nagbabalik ng resulta. Kaya RecursiveTaskkatulad ng Callable, at RecursiveActionkatulad ng Runnable. Buweno, sa pagtingin sa pangalan, nakikita natin ang dalawang pangunahing pamamaraan - forkat join. Ang pamamaraan forkay nagpapatakbo ng isang gawain nang asynchronous sa isang hiwalay na thread. At joinpinapayagan ka ng pamamaraan na maghintay para makumpleto ang trabaho. Mayroong ilang mga paraan upang magamit ito: Hindi mo masisira ang Java gamit ang isang thread: Part V - Executor, ThreadPool, Fork Join - 5Ang larawang ito ay bahagi ng isang slide mula sa ulat ni Alexey Shipilev na " Fork/Join: pagpapatupad, paggamit, pagganap ." Upang gawing mas malinaw, sulit na panoorin ang kanyang ulat sa JEE CONF: " Mga tampok sa pagpapatupad ng Fork Join ."

Pagbubuod

Kaya, narito kami, tinatapos ang susunod na bahagi ng pagsusuri. Naisip namin kung ano ang una naming naisip Executorpara sa pagpapatupad ng mga thread. Pagkatapos ay nagpasya kaming ipagpatuloy ang ideya at naisip ito ExecutorService. ExecutorServiceay nagbibigay-daan sa iyo na magpadala ng mga gawain para sa pagpapatupad gamit submitang at invoke, pati na rin pamahalaan ang serbisyo sa pamamagitan ng pag-off nito. kasi ExecutorService'kailangan namin ng mga pagpapatupad, nagsulat kami ng isang klase na may mga pamamaraan ng pabrika at tinawag ito Executors. Pinapayagan ka nitong lumikha ng mga pool ng thread ThreadPoolExecutor. Kasabay nito, may mga thread pool na nagbibigay-daan din sa iyong tumukoy ng iskedyul para sa pagpapatupad, ngunit WorkStealingPoolnakatago ito sa likod ForkJoinPool. Umaasa ako na ang isinulat sa itaas ay hindi lamang kawili-wili sa iyo, ngunit naiintindihan din) Ako ay palaging masaya na makatanggap ng mga mungkahi at komento. #Viacheslav
Mga komento
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION