JavaRush /Java Blog /Random-TK /Java-ny sapak bilen zaýalap bilmersiňiz: V bölüm - ýerine...
Viacheslav
Dereje

Java-ny sapak bilen zaýalap bilmersiňiz: V bölüm - ýerine ýetiriji, ThreadPool, Fork goşulmak

Toparda çap edildi

Giriş

Şeýlelik bilen, Java-da sapaklaryň bardygyny bilýäris, " Java-ny bir mowzuk bilen zaýalap bilmersiňiz: I bölüm - sapaklar " synynda okap bilersiňiz. Java-ny sapak bilen zaýalap bilmersiňiz: V bölüm - ýerine ýetiriji, ThreadPool, Fork Join - 1Mysal kodyna ýene bir gezek seredeliň:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Görşümiz ýaly, meseläni başlamak üçin kod gaty adaty, ýöne her täze çykaryş üçin ony gaýtalamaly bolarys. Bir çözgüt, meselem, aýratyn usula geçirmek execute(Runnable runnable). Emma Java döredijiler eýýäm biz hakda alada edip, interfeýs tapdylar Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Görşüňiz ýaly, kod has gysga boldy we bize diňe Runnablebir sapakda işlemek üçin kod ýazmaga mümkinçilik berdi. Gowy, şeýlemi? Emma bu başlangyç: Java-ny sapak bilen zaýalap bilmersiňiz: V bölüm - ýerine ýetiriji, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Görşüňiz ýaly, interfeýsiň Executornesil interfeýsi bar ExecutorService. Bu interfeýsiň JavaDoc, bu "a" işini togtatmagyň usullaryny üpjün edýän we ýerine ýetirişiň gidişini yzarlamaga mümkinçilik berýän ExecutorServiceýörite "a" -yň beýanydygyny aýdýar. Ozal “ Java-ny sapak bilen zaýalap bilmersiňiz: IV bölüm - Çagyryp bolýan, geljek we dostlar ” atly bölümde bu mümkinçilikleri gysgaça gözden geçirdik . Unutan bolsaňyz ýa-da okamadyk bolsaňyz, ýadyňyzy täzelemegi maslahat berýärin;) JavaDoc-da başga haýsy gyzykly zatlar ýazylýar? Dymmaklyk bilen elýeterli bolan amallary döretmäge mümkinçilik berýän ýörite zawodymyz bar . ExecutorExecutorjava.util.concurrent.FutureFuturejava.util.concurrent.ExecutorsExecutorService

ExecutorService

Againene bir gezek ýatlalyň. ExecutorBir sapak döretmegiň ýerine ýetirilişi bizden gizlenende, belli bir meseläni sapakda ýerine ýetirmeli (ýagny ýerine ýetirmeli) . Executionerine ýetirişiň gidişini dolandyrmak üçin birnäçe mümkinçiliklere eýe bolan ExecutorServiceaýratyn birimiz bar . We döretmäge mümkinçilik berýän Executorzawodymyz bar . Geliň indi özümiz edeliň: ExecutorsExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Görşümiz ýaly, 2 ululykdaky kesgitlenen sapak basseýnini kesgitledik, Fixed Thread Poolşondan soň howuza meseleler ýeke-ýekeden iberýäris. Her bir mesele, Stringsapagyň adyny () öz içine alýan setiri ( currentThread().getName()) gaýtaryp berýär. Iň soňunda ýapmak möhümdir ExecutorService, sebäbi bolmasa programmamyz çykmaz. Zawodda Executorsbeýleki zawod usullary bar . Mysal üçin, diňe bir sapakdan howuz döredip bileris - newSingleThreadExecutorýa-da keşli howuz döredip bileris newCachedThreadPool, bu ýerde sapaklar 1 minut işlemese howuzdan aýrylar. Aslynda, bularyň aňyrsynda haýsy meseleleriň goýulýandygyny we bu meseleleriň ýerine ýetirilişini bloklaýan nobatExecutorService bar . Bloklary blokirlemek barada has giňişleýin maglumaty " Blokirleme nobaty - 5-nji ýygyndy - Advanced Java " wideosynda görüp bilersiňiz . Şeýle hem, “ Bilelikdäki bukjanyň nobatlaryny blokirlemek ” synyny we “ LinkedBlockingQueue-ni ArrayBlockingQueue-den haçan saýlamaly? ” Diýen soraga jogap tapyp bilersiňiz. Örän ýönekeýleşdirilen - (nobaty blokirlemek) bir sapagy iki ýagdaýda bloklaýar: BlockingQueue
  • bir sapak boş nobatdan elementleri almaga synanyşýar
  • sapak elementleri doly nobata goýmaga synanyşýar
Zawod usullarynyň ýerine ýetirilişine göz aýlasak, olaryň nähili gurlandygyny görüp bileris. Mysal üçin:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
ýa-da
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Görşümiz ýaly, zawod usullarynyň içinde durmuşa geçirilýär ExecutorService. Bu esasan ThreadPoolExecutor. Diňe işiň üýtgemegine täsir edýän häsiýetler. Java-ny sapak bilen zaýalap bilmersiňiz: V bölüm - ýerine ýetiriji, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Öň görşümiz ýaly zawodyň içinde ThreadPoolExecutor,. Funksiýa haýsy bahalaryň iň ýokary we iň pes sapaklar hökmünde geçýändigine, şeýle hem haýsy nobatyň ulanylýandygyna täsir edýär. Interfeýsiň islendik ýerine ýetirilmegi ulanylyp bilner java.util.concurrent.BlockingQueue. Ahs hakda aýdanymyzda ThreadPoolExecutor, iş wagtynda gyzykly aýratynlyklary bellemelidiris. ThreadPoolExecutorMysal üçin, boş ýer ýok bolsa, wezipeleri iberip bilmersiňiz :
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Bu kod aşakdaky ýaly ýalňyşlyk bilen şowsuz bolar:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Submitagny, tasktabşyryp bilmersiňiz, sebäbi SynchronousQueueaslynda bir elementden ybarat bolup, ol ýerde has köp zat goýmaga mümkinçilik bermeýän görnüşde döredildi. Görşümiz ýaly, queued tasksbu ýerde 0 bar we bu ýerde geň zat ýok, sebäbi bu aýratyn SynchronousQueue- aslynda elmydama boş bolan 1 elementiň nobaty. (!) Bir sapak bir elementi nobata goýsa, başga bir sapak elementi nobatdan alýança garaşar. Şonuň üçin çalşyp bileris new LinkedBlockingQueue<>(1)we ýalňyşlykda görkeziljek zatlar üýtgär queued tasks = 1. Sebäbi nobat diňe 1 element, soň ikinjisini goşup bilmeris. Munuň üstünde durarys. ThreadPoolExecutorNobatyň mowzugyny dowam etdirip, synpyň nobata hyzmat etmek üçin goşmaça usullarynyň bardygyny bellemelidiris . Mysal üçin, usul threadPoolExecutor.purge()nobatda boş ýer boşatmak üçin ähli ýatyrylan meseleleri nobatdan aýyrar. Nobat bilen baglanyşykly başga bir gyzykly aýratynlyk, kabul edilmedik iş dolandyryjysy:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Mysal üçin, işleýji Rejectedher bir meseläni nobata kabul etmekden ýüz öwürmek üçin diňe bir söz çap edýär. Amatly, şeýlemi? Mundan başga-da, ThreadPoolExecutoronuň gyzykly mirasdüşeri bar - ScheduledThreadPoolExecutorkim ScheduledExecutorService. Taymerde bir işi ýerine ýetirmek ukybyny üpjün edýär.

Meýilleşdirilen ýerine ýetiriji hyzmat

ExecutorServicegörnüşi, ScheduledExecutorServicetertip boýunça işleri ýerine ýetirmäge mümkinçilik berýär. Bir mysala seredeliň:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Bu ýerde hemme zat ýönekeý. Wezipeler iberilýär, biz “meýilleşdirilen ýumuş” alýarys java.util.concurrent.ScheduledFuture. Aşakdaky ýagdaý meýilnama bilen peýdaly bolup biler:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
RunnableBu ýerde belli bir gijikdirme bilen kesgitlenen nyrhda (kesgitlenen nyrh) ýerine ýetirilmeli meseläni iberýäris . Bu ýagdaýda, her 2 sekuntda 1 sekuntdan soň, ýumuşy ýerine ýetirip başlaň. Şuňa meňzeş wariant bar:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Emma bu ýerde meseleler dürli meseleleriň ýerine ýetirilmeginiň arasynda belli bir aralyk bilen ýerine ýetirilýär. .Agny, wezipe task1 sekuntda tamamlanar. Ondan soň, tamamlan badyna 2 sekunt geçer we täze wezipe başlar. Bu mowzuk boýunça aşakdaky materiallary okap bilersiňiz: Java-ny sapak bilen zaýalap bilmersiňiz: V bölüm - ýerine ýetiriji, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Aboveokarda agzalan sapak howuzlaryndan başga-da ýene biri bar. Onuň birneme üýtgeşikdigini aýdyp bilersiňiz. Onuň ady Iş ogurlamak howzy. Gysgaça aýdylanda, “Work ogurlamak” iş algoritmidir, onda boş sapaklar beýleki sapaklardan ýa-da umumy nobatdan meseleler alyp başlaýar. Bir mysala seredeliň:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Bu kody işletsek, ExecutorService5 sapak döreder, sebäbi her sapak obýektiň ýerleşýän ýerinde garaşmak nobatyna goşular lock. Monitorlar we gulplar barada eýýäm " Java-ny sapak bilen zaýalap bilmersiňiz: II bölüm - Sinhronizasiýa " -da. Executors.newCachedThreadPoolIndi bolsa onuň ýerine çalyşarys Executors.newWorkStealingPool(). Näme üýtgär? Wezipelerimiziň 5 sapakda däl-de, has az ýerine ýetirilendigini göreris. cachedThreadPoolHer bir iş üçin öz sapagyňyzy döredendigiňizi ýadyňyzdamy ? Sebäbi waitbu sapagy petikledi, ýöne indiki meseleler ýerine ýetirilmegini isledi we howuzda täze sapaklar döredildi. Threadüplükler bar bolsa StealingPool, olar hemişelik işlemezler wait, goňşy işleri ýerine ýetirip başlarlar. Bu beýleki sapak howuzlaryndan nähili tapawutlanýar WorkStealingPool? Sebäbi içinde hakykatdanam jadyly bir zat bar ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Aslynda ýene bir tapawut bar. Adaty tertipde döredilen sapaklar ForkJoinPool, yzygiderli döredilen sapaklardan tapawutlylykda daemon sapaklarydyr ThreadPool. Umuman, daemon sapaklary hakda ýatda saklamaly, sebäbi ... meselem, daemon däl sapaklary döredýän CompletableFutureözüňizi görkezmeseňiz, daemon sapaklary hem ulanylýar . ThreadFactoryBular garaşylmadyk ýerde garaşyp biljek garaşylmadyk görnüşler!)

Fork / howuza goşulyň

ForkJoinPoolBu bölümde , “kapotyň aşagynda” ýaşaýan şol bir (fork / goşulma çarçuwasy hem diýilýär) hakda gürleşeris WorkStealingPool. Umuman, “Fork Join Framework” Java 1.7-de peýda boldy. Java 11 eýýäm howluda bolsa-da, ýatda saklamaly. Iň ýaýran mesele däl, ýöne gaty gyzykly. Internetde bu mowzukda gowy syn bar: " Java 7-de Fork / Join Framework ". Fork/JoinPoolýaly düşünje bilen işleýär java.util.concurrent.RecursiveTask. Analog hem bar - java.util.concurrent.RecursiveAction. “RecursiveActions” netije bermeýär. Şeýlelik RecursiveTaskbilen meňzeş Callablewe RecursiveActionmeňzeş Runnable. Adyna seretseň, iki esasy usuly görýäris - forkwe join. Usul forkbir meseläni aýratyn sapakda birkemsiz ýerine ýetirýär. Bu usul, joinişiň tamamlanmagyna garaşmaga mümkinçilik berýär. Ony ulanmagyň birnäçe usuly bar: Bu surat Alekseý Şipilewiň “ Fork / Join: ýerine ýetiriş, ulanmak, ýerine ýetirişJava-ny sapak bilen zaýalap bilmersiňiz: V bölüm - ýerine ýetiriji, ThreadPool, Fork Join - 5 ” hasabatynyň slaýdynyň bir bölegidir . Has düşnükli etmek üçin JEE CONF-daky hasabatyna tomaşa etmeli: " Fork ýerine ýetiriş aýratynlyklaryna goşulyň ."

Jemleme

Şeýlelikde, synyň indiki bölümini tamamlaýarys. ExecutorSaplary ýerine ýetirmek üçin ilki bilen näme tapanymyzy bildik . Soňra pikiri dowam etdirmek kararyna geldik we bu pikiri orta atdyk ExecutorService. ulanyp ExecutorServiceýerine ýetirmek üçin meseleler ibermäge , şeýle hem hyzmaty öçürip dolandyrmaga mümkinçilik berýär. Sebäbi 'durmuşa geçirilmegi zerur, zawod usullary bilen synp ýazdyk we oňa jaň etdik . Sapak howuzlaryny döretmäge mümkinçilik berýär . Şol bir wagtyň özünde, ýerine ýetiriş tertibini kesgitlemäge mümkinçilik berýän sapak howuzlary hem bar, ýöne aňyrsynda gizlenýär . Aboveokarda ýazylanlar diňe bir sizi gyzyklandyrman, eýsem düşnükli hem bolar diýip umyt edýärin) Teklipleri we teswirleri almakdan elmydama begenýärin. # WiaçeslawsubmitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
Teswirler
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION