JavaRush /Java блогы /Random-KK /Сіз Java-ны жіппен бүлдіре алмайсыз: V бөлім - Орындаушы,...
Viacheslav
Деңгей

Сіз Java-ны жіппен бүлдіре алмайсыз: V бөлім - Орындаушы, Threadpool, шанышқымен қосылу

Топта жарияланған

Кіріспе

Сонымен, біз Java-да ағындар бар екенін білеміз, олар туралы « Жаваны жіппен бүлдіре алмайсыз: I бөлім - тақырыптар » шолуынан оқуға болады . Сіз Java тілін жіппен бүлдіре алмайсыз: V бөлім - Орындаушы, ThreadPool, шанышқымен қосылу - 1Үлгі codeты қайтадан қарастырайық:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Көріп отырғанымыздай, тапсырманы іске қосу codeы өте стандартты, бірақ әрбір жаңа іске қосу үшін оны қайталауға тура келеді. Бір шешім оны бөлек әдіске жылжыту болып табылады, мысалы execute(Runnable runnable). Бірақ Java әзірлеушілері біз үшін алаңдап, интерфейсті ойлап тапты Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Көріп отырғаныңыздай, code қысқарақ болды және оны Runnableағында іске қосу үшін жай ғана code жазуға мүмкіндік берді. Тамаша, солай емес пе? Бірақ бұл тек бастамасы: Сіз Java тілін жіппен бүлдіре алмайсыз: V бөлім - Орындаушы, ThreadPool, шанышқымен қосылу - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Көріп отырғаныңыздай, интерфейстің Executorұрпақ интерфейсі бар ExecutorService. Бұл интерфейстің JavaDoc нұсқасы «a » жұмысын тоқтату әдістерін қамтамасыз ететін және орындау барысын бақылауға мүмкіндік беретін ExecutorServiceарнайы «a» сипаттамасы екенін айтады. Бұрын « Жаваны жіппен бүлдіре алмайсыз: IV бөлім – шақыруға болатын, болашақ және достар » бөлімінде біз мүмкіндіктерді қысқаша қарастырдық . Егер сіз оны ұмытып қалсаңыз немесе оқымаған болсаңыз, жадыңызды жаңартуға кеңес беремін;) JavaDoc-та тағы қандай қызықты нәрселер жазылған? Бізде әдепкі бойынша қол жетімді іске асыруды жасауға мүмкіндік беретін арнайы зауыт бар . ExecutorExecutorjava.util.concurrent.FutureFuturejava.util.concurrent.ExecutorsExecutorService

Орындаушы қызметі

Тағы да еске түсірейік. Біз Executorағынды құруды жүзеге асыру бізден жасырылған кезде, ағындағы белгілі бір тапсырманы орындауға (яғни орындауға) тура келеді. Бізде орындалу барысын басқаруға арналған мүмкіндіктер жиынтығы бар ExecutorServiceарнайы бар. Ал бізде жасауға мүмкіндік беретін Executorзауыт бар . Енді өзіміз жасайық: ExecutorsExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Көріп отырғанымыздай, біз Fixed Thread Pool2 өлшемді бекітілген жіп пулын ( ) көрсеттік. Содан кейін біз пулға тапсырмаларды бір-бірден жібереміз. Әрбір тапсырма Stringағын атауын ( currentThread().getName()) қамтитын жолды ( ) қайтарады. Ең соңында өшіру маңызды ExecutorService, өйткені әйтпесе бағдарламамыз шықпайды. Зауытта Executorsбасқа да зауыттық әдістер бар . Мысалы, біз тек бір ағынның пулын newSingleThreadExecutorнемесе кэштеу бар пулды жасай аламыз newCachedThreadPool, онда ағындар 1 minutes бойы жұмыс істемей тұрса, олар пулдан жойылады. Шындығында, бұлардың артында тапсырмалар орналастырылатын және осы тапсырмалар орындалатын блоктау кезегіExecutorService бар . Кезектерді бұғаттау туралы қосымша ақпаратты " Блоктау кезегі - №5 жинақтар - кеңейтілген Java " бейнесінен көруге болады . Сондай-ақ, « Бір мезгілде пакеттің кезектерін блоктау » шолуын және « ArrayBlockingQueue орнына LinkedBlockingQueue-ді қашан таңдау керек? » Деген сұраққа жауапты оқуға болады. Өте жеңілдетілген - (блоктау кезегі) екі жағдайда ағынды блоктайды: BlockingQueue
  • ағын бос кезектен элементтерді алуға тырысады
  • жіп элементтерді толық кезекке қоюға тырысады
Зауыттық әдістерді енгізуді қарастыратын болсақ, олардың құрылымдық құрылымын көруге болады. Мысалы:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
немесе
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Көріп отырғанымыздай, іске асыру зауыттық әдістер ішінде жасалады ExecutorService. Және бұл негізінен ThreadPoolExecutor. Жұмысқа әсер ететін атрибуттар ғана өзгереді. Сіз Java-ны жіппен бұза алмайсыз: V бөлім - Орындаушы, ThreadPool, Fork Join - 3

https://kk.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Бұрын көргеніміздей, зауыттық әдістер ThreadPoolExecutor, . Функционалдылыққа қандай мәндердің максималды және минималды ағындар ретінде берілетіні, сондай-ақ қандай кезек қолданылатыны әсер етеді. Және интерфейстің кез келген іске асырылуы мүмкін java.util.concurrent.BlockingQueue. 'ahs туралы айтқанда ThreadPoolExecutor, жұмыс кезінде қызықты ерекшеліктерді атап өткен жөн. Мысалы, ThreadPoolExecutorбос орын болмаса, тапсырмаларды жібере алмайсыз:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Бұл code келесідей қатемен сәтсіз болады:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Яғни, taskсіз жібере алмайсыз, өйткені SynchronousQueueол шын мәнінде бір элементтен тұратындай етіп жасалған және ол жерге көбірек қоюға мүмкіндік бермейді. Көріп отырғанымыздай, queued tasksмұнда 0 бар және бұл жерде таңқаларлық ештеңе жоқ, өйткені бұл нақты SynchronousQueue- шын мәнінде, бұл әрқашан бос болатын 1 элементтің кезегі. (!) Бір ағын элементті кезекке қойғанда, ол басқа ағын элементті кезектен алғанша күтеді. Сондықтан, біз ауыстыра аламыз new LinkedBlockingQueue<>(1)және қатеде көрсетілетін нәрсе өзгереді queued tasks = 1. Өйткені кезек тек 1 элемент болса, екінші элементті қоса алмаймыз. Ал біз бұған құлаймыз. Кезек тақырыбын жалғастыра отырып, сыныпта ThreadPoolExecutorкезекке қызмет көрсетудің қосымша әдістері бар екенін атап өткен жөн. Мысалы, әдіс threadPoolExecutor.purge()кезектегі орынды босату үшін барлық тоқтатылған тапсырмаларды кезектен жояды. Кезекке қатысты тағы бір қызықты мүмкіндік - қабылданбаған тапсырма өңдеушісі:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Мысалы, өңдеуші Rejectedкезекке тапсырманы қабылдаудан әрбір бас тарту үшін жай ғана сөзді басып шығарады. Ыңғайлы, солай емес пе? Сонымен қатар, ThreadPoolExecutorоның қызықты мұрагері бар - ScheduledThreadPoolExecutorол кім ScheduledExecutorService. Ол таймерде тапсырманы орындау мүмкіндігін береді.

ScheduledExecutorService

ExecutorServiceтүрі ScheduledExecutorServiceтапсырмаларды кестеге сәйкес орындауға мүмкіндік береді. Мысал қарастырайық:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Мұнда бәрі қарапайым. Тапсырмалар жіберілді, біз «жоспарланған тапсырма» аламыз java.util.concurrent.ScheduledFuture. Келесі жағдай кестеде де пайдалы болуы мүмкін:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Мұнда біз Runnableбелгілі бір кідіріспен белгіленген мөлшерлемемен (Fixed Rate) орындалатын тапсырманы жібереміз. Бұл жағдайда әрбір 2 секунд сайын 1 секундтан кейін тапсырманы орындауға кірісіңіз. Ұқсас опция бар:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Бірақ мұнда тапсырмалар әртүрлі тапсырмаларды орындау арасында берілген интервалмен орындалады. Яғни, тапсырма task1 секундта орындалады. Әрі қарай, ол аяқталғаннан кейін 2 секунд өтеді, содан кейін жаңа тапсырма іске қосылады. Осы тақырып бойынша келесі материалдарды оқи аласыз: Сіз Java-ны жіппен бұза алмайсыз: V бөлім - Орындаушы, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Жоғарыда аталған жіп пулдарынан басқа, тағы біреуі бар. Сіз оны ерекше деп айта аласыз. Оның аты - Work Stealing Pool. Қысқаша айтқанда, Work Stealing - жұмыс істемейтін ағындар басқа ағындардан немесе жалпы кезектен тапсырмаларды ала бастайтын жұмыс алгоритмі. Мысал қарастырайық:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Бұл codeты іске қоссақ, ExecutorServiceол 5 ағынды жасайды, өйткені әрбір ағын нысан орнында күту кезегіне қосылады lock. Мониторлар мен құлыптар туралы біз « Сіз Java-ны тақырыппен бүлдіре алмайсыз: II бөлім - Синхрондау » бөлімінде талқыладық . Executors.newCachedThreadPoolАл енді біз оны ауыстырамыз Executors.newWorkStealingPool(). Не өзгереді? Біздің тапсырмалар 5 жіпте емес, азырақ орындалатынын көреміз. cachedThreadPoolӘрбір тапсырма үшін өзіңіздің ағыныңызды жасағаныңызды есте сақтаңыз ба ? Өйткені waitол ағынды блоктады, бірақ келесі тапсырмалар орындалғысы келді және олар үшін пулда жаңа ағындар жасалды. Жіптер жағдайында StealingPoolолар мәңгі жұмыс істемейді wait, олар көрші тапсырмаларды орындауға кіріседі. Бұл басқа жіп пулдарынан қалай ерекшеленеді WorkStealingPool? Өйткені оның ішінде шынымен де сиқырлы нәрсе бар ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Шындығында тағы бір айырмашылық бар. Әдепкі бойынша жасалған ағындар ForkJoinPoolтұрақты арқылы жасалған ағындарға қарағанда демон ағындары болып табылады ThreadPool. Жалпы, демон ағындары туралы еске түсірген жөн, өйткені... мысалы, демондық емес ағындарды жасайтын CompletableFutureөзіңізді көрсетпесеңіз, демон ағындары да пайдаланылады . ThreadFactoryБұл сізді күтпеген жерде күтетін тосын сыйлар!)

Шанышқы/қосылу бассейні

ForkJoinPoolБұл бөлімде біз «сорғыштың астында» өмір сүретін бірдей (сонымен бірге шанышқы/қосылу шеңбері деп аталады) туралы сөйлесетін боламыз WorkStealingPool. Жалпы, Fork Join Framework Java 1.7-де пайда болды. Java 11 аулада болса да, оны әлі де есте ұстаған жөн. Ең кең таралған тапсырма емес, бірақ өте қызықты. Интернетте бұл тақырып бойынша жақсы шолу бар: “ Java 7 жүйесінде Fork/Join Framework ”. Fork/JoinPoolсияқты тұжырымдамамен өз жұмысында әрекет етеді java.util.concurrent.RecursiveTask. Сондай-ақ аналогы бар - java.util.concurrent.RecursiveAction. RecursiveActions нәтижені қайтармайды. Осылайша RecursiveTaskұқсас Callableжәне RecursiveActionұқсас Runnable. Атауға қарап, біз екі негізгі әдісті көреміз - forkжәне join. Әдіс forkтапсырманы асинхронды түрде бөлек ағында іске қосады. Ал әдіс joinжұмыстың аяқталуын күтуге мүмкіндік береді. Оны қолданудың бірнеше жолы бар: Бұл сурет Алексей Шипилевтің « Айналмалы/қосылу: іске асыру, пайдалану, өнімділікСіз Java-ны жіппен бұза алмайсыз: V бөлім - Орындаушы, ThreadPool, Fork Join - 5 » баяндамасының слайдының бөлігі . Түсінікті болу үшін оның JEE CONF-тегі есебін қарау керек: « Fork Join іске асыру мүмкіндіктері ».

Қорытындылау

Сонымен, біз шолудың келесі бөлігін аяқтаймыз. ExecutorБіз ағындарды орындау үшін алғаш рет не ойлап тапқанымызды анықтадық . Содан кейін біз идеяны жалғастыруды ұйғардық және оған келдік ExecutorService. және ExecutorServiceарқылы тапсырмаларды орындауға жіберуге , сондай-ақ қызметті өшіру арқылы басқаруға мүмкіндік береді. Өйткені «бізге іске асыру қажет, біз зауыттық әдістермен класс жазып, оны . Ол сізге жіп пулдарын жасауға мүмкіндік береді . Сонымен қатар орындау кестесін көрсетуге мүмкіндік беретін ағын пулдары бар, бірақ жасырады . Жоғарыда жазылғандар сізге қызықты ғана емес, сонымен қатар түсінікті болды деп үміттенемін) Мен әрқашан ұсыныстар мен пікірлерді алуға қуаныштымын. #ВячеславsubmitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
Пікірлер
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION