JavaRush /Java блогу /Random-KY /Сиз Javaны жип менен буза албайсыз: V бөлүк - Аткаруучу, ...
Viacheslav
Деңгээл

Сиз Javaны жип менен буза албайсыз: V бөлүк - Аткаруучу, ThreadPool, айры кошулуу

Группада жарыяланган

Киришүү

Ошентип, биз Java'да жиптер бар экенин билебиз, алар жөнүндө сиз " Жаваны жип менен буза албайсыз: I бөлүк - Жиптер " сынынан окуй аласыз . Сиз Javaны жип менен буза албайсыз: V бөлүк - Аткаруучу, ThreadPool, айры кошулуу - 1Келгиле, үлгү codeун кайра карап көрөлү:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Көрүнүп тургандай, тапшырманы ишке киргизүү codeу абдан стандарттуу, бирок ар бир жаңы ишке киргизүү үчүн биз аны кайталашыбыз керек. Бир чечим аны өзүнчө ыкмага жылдыруу болуп саналат, мисалы execute(Runnable runnable). Бирок Java иштеп чыгуучулары биз үчүн тынчсызданып, интерфейсти ойлоп табышты Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Көрүнүп тургандай, code кыскараак болуп калды жана аны Runnableжипте иштетүү үчүн жөн гана code жазууга мүмкүнчүлүк берди. Сонун, туурабы? Бирок, бул башталышы гана: Сиз Java'ны жип менен буза албайсыз: V бөлүк - Аткаруучу, ThreadPool, айры кошулуу - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Көрүнүп тургандай, интерфейстин Executorтукум интерфейси бар ExecutorService. Бул интерфейстин JavaDoc бул ExecutorServiceатайын "а" сыпаттамасы деп айтылат, ал "а" Executorишин токтотуу ыкмаларын камсыз кылат жана аткаруунун жүрүшүнө көз салууга Executorмүмкүндүк берет . java.util.concurrent.FutureБуга чейин, " Жаваны жип менен буза албайсыз: IV-бөлүк - Чакырылган, Келечек жана Достор " бөлүмүндө биз мүмкүнчүлүктөрдү кыскача карап чыкканбыз Future. Эгер сиз аны унутуп калсаңыз же окуй элек болсоңуз, эс тутумуңузду жаңыртууну сунуштайм ;) JavaDocто дагы эмне кызыктуу? java.util.concurrent.ExecutorsБизде демейки боюнча жеткorктүү болгон ишке ашырууларды түзүүгө мүмкүндүк берген атайын завод бар ExecutorService.

ExecutorService

Дагы бир жолу эстейли. ExecutorЖипти түзүүнүн аткарылышы бизден жашырылганда, биз жипте белгилүү бир тапшырманы аткарышыбыз керек (б.а.). Бизде аткаруунун жүрүшүн башкаруу үчүн мүмкүнчүлүктөрдүн комплекси бар ExecutorServiceатайын бар. ExecutorА бизде завод бар Executors, ал түзүүгө мүмкүндүк берет ExecutorService. Келгиле, азыр өзүбүз кылалы:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Көрүнүп тургандай, биз Fixed Thread Pool2-өлчөмдөгү туруктуу жип бассейнин ( ) көрсөттүк. Андан кийин биз бассейнге тапшырмаларды бирден жөнөтөбүз. StringАр бир тапшырма жиптин атын ( ) камтыган сапты ( ) кайтарат currentThread().getName(). Эң аягында өчүрүү маанилүү ExecutorService, анткени антпесе биздин программа чыкпайт. Фабрикада Executorsбашка заводдук ыкмалар бар . Мисалы, биз бир эле жиптен турган бассейнди түзө алабыз - newSingleThreadExecutorже кэштелген бассейн newCachedThreadPool, анда жиптер 1 мүнөт бош турса, алар бассейнден алынып салынат. Чынында, булардын артында тапшырмалар коюлган жана бул тапшырмалар аткарыла турган бөгөттөөчү кезекExecutorService бар . Кезектерди бөгөттөө жөнүндө көбүрөөк маалыматты " Бөгөттөө кезек - Коллекциялар №5 - Өркүндөтүлгөн Java " видеосунда көрүүгө болот . Сиз ошондой эле " Бир убактагы пакеттин кезектерин бөгөттөө " сын-пикирди жана " ArrayBlockingQueueге караганда LinkedBlockingQueue'ге качан артыкчылык берүү керек? " деген суроого жоопту окуй аласыз. Супер жөнөкөйлөштүрүлгөн - (бөгөттөө кезеги) жипти эки учурда бөгөттөйт: BlockingQueue
  • жип бош кезектеги элементтерди алууга аракет кылып жатат
  • жип элементтерди толук кезекке коюуга аракет кылып жатат
Фабрикалык методдордун ишке ашырылышын карай турган болсок, алардын кандайча тузулгендугун керууге болот. Мисалы:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
же
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Көрүнүп тургандай, ишке ашыруулар фабриканын ичинде түзүлөт ExecutorService. Жана бул негизинен ThreadPoolExecutor. Жумушка таасир этүүчү атрибуттар гана өзгөрөт. Javaны жип менен буза албайсыз: V бөлүк - Аткаруучу, ThreadPool, Fork кошулуу - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Мурда көргөнүбүздөй, заводдун ичиндеги ыкмалар ThreadPoolExecutor, . Функционалдуулукка максималдуу жана минималдуу жиптер катары кандай маанилер берилгени, ошондой эле кандай кезек колдонулганы таасир этет. Жана интерфейстин каалаган ишке ашырылышы колдонулушу мүмкүн java.util.concurrent.BlockingQueue. 'ahs кеп ThreadPoolExecutor, иш учурунда кызыктуу өзгөчөлүктөрүн белгилей кетүү керек. Мисалы, ThreadPoolExecutorанда орун жок болсо, тапшырмаларды жөнөтө албайсыз:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Бул code төмөнкүдөй ката менен ишке ашпай калат:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Башкача айтканда, taskсиз тапшыра албайсыз, анткени SynchronousQueueал чындыгында бир элементтен тургандай кылып иштелип чыккан жана ал жерге көбүрөөк коюуга жол бербейт. Көрүнүп тургандай, queued tasksбул жерде 0 бар, мында кызыктай эч нерсе жок, анткени бул конкреттүү SynchronousQueue- чындыгында, бул дайыма бош турган 1 элементтен турган кезек. (!) Бир жип элементти кезекке койгондо, башка жип кезектен элементти алганга чейин күтөт. Ошондуктан, биз менен алмаштыра алабыз new LinkedBlockingQueue<>(1)жана катада көрсөтүлгөн нерсе өзгөрөт queued tasks = 1. Анткени кезек 1 гана элемент, анда биз экинчисин кошо албайбыз. А биз мунун үстүнөн чыгабыз. Кезектин темасын улантып, класста ThreadPoolExecutorкезекти тейлөөнүн кошумча ыкмалары бар экендигин белгилей кетүү керек. Мисалы, ыкма threadPoolExecutor.purge()кезектеги орун бошотуу үчүн бардык жокко чыгарылган тапшырмаларды кезектен алып салат. Кезекке байланыштуу дагы бир кызыктуу өзгөчөлүк - бул кабыл алынбаган тапшырманы иштетүүчү:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Мисалы, иштетүүчү Rejectedкезектеги тапшырманы кабыл алуудан баш тартуу үчүн сөздү басып чыгарат. Ыңгайлуу, туурабы? Мындан тышкары, ThreadPoolExecutorанын кызыктуу мураскору бар - ScheduledThreadPoolExecutorким ScheduledExecutorService. Ал таймерде тапшырманы аткаруу мүмкүнчүлүгүн берет.

ScheduledExecutorService

ExecutorServiceтүрү ScheduledExecutorServiceсизге графикке ылайык тапшырмаларды аткарууга мүмкүндүк берет. Келгиле, бир мисал карап көрөлү:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Бул жерде баары жөнөкөй. Тапшырмалар жөнөтүлдү, биз "пландуу тапшырманы" алабыз java.util.concurrent.ScheduledFuture. Төмөнкү жагдай да график менен пайдалуу болушу мүмкүн:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Бул жерде биз Runnableбелгилүү бир кечигүү менен белгиленген тарифте (Fixed Rate) аткарыла турган тапшырманы жөнөтөбүз. Бул учурда, ар бир 2 секунддан кийин 1 секунддан кийин тапшырманы аткарууга киришиңиз. Окшош вариант бар:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Бирок бул жерде тапшырмалар ар кандай тапшырмаларды аткаруунун ортосунда берилген интервал менен аткарылат. Башкача айтканда, тапшырма task1 секундда аткарылат. Андан кийин, ал бүтөөрү менен 2 секунд өтөт, андан кийин жаңы тапшырма башталат. Бул тема боюнча төмөнкү материалдарды окуй аласыз: Javaны жип менен буза албайсыз: V бөлүк - Аткаруучу, ThreadPool, Fork кошулуу - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Жогоруда айтылган жип көлмөлөрүнөн тышкары дагы бирөө бар. Ал бир аз өзгөчө деп айтса болот. Анын аты Work Stealing Pool. Кыскача айтканда, Work Stealing – бул иштебеген жиптер башка жиптерден тапшырмаларды же жалпы кезектеги тапшырмаларды ала баштаган жумуш алгоритми. Келгиле, бир мисал карап көрөлү:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Бул codeду иштетсек, ExecutorServiceал 5 жипти жаратат, анткени ар бир жип an objectтин жайгашкан жеринде күтүү кезегине кошулат lock. Биз буга чейин мониторлор жана кулпулар жөнүндө " Сиз Javaны жип менен буза албайсыз: II бөлүк - Синхрондоштуруу " бөлүмүндө талкуулаганбыз. Executors.newCachedThreadPoolЭми аны менен алмаштырабыз Executors.newWorkStealingPool(). Эмне өзгөрөт? Биздин тапшырмалар 5 жипте эмес, азыраак аткарылганын көрөбүз. cachedThreadPoolАр бир тапшырма үчүн өзүңүздүн жипиңизди жаратканыңызды унутпаңызбы ? Анткени waitал жипти жаап салган, бирок кийинки тапшырмалар аткарылгысы келген жана алар үчүн бассейнде жаңы жиптер түзүлгөн. Жиптер болсо StealingPool, алар түбөлүккө иштебейт wait, кошуна тапшырмаларды аткара башташат. Бул башка жип бассейндеринен эмнеси менен айырмаланат WorkStealingPool? Анткени анын ичинде чындап эле сыйкырдуу бир нерсе жашайт ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Чынында дагы бир айырма бар. Демейки боюнча түзүлгөн жиптер ForkJoinPoolкадимки аркылуу түзүлгөн жиптерден айырмаланып, демон жиптери болуп саналат ThreadPool. Жалпысынан алганда, демон жиптери жөнүндө эстен чыгарбоо керек, анткени... мисалы, CompletableFutureдемон жиптери да колдонулат, эгерде сиз өзүңүздүн ThreadFactory, демон эмес жиптерди түзө турган болсоңуз. Бул күтүлбөгөн жерден сизди күтө турган сюрприздердин түрлөрү!)

Айыр / кошулуу бассейни

Бул бөлүктө биз ошол эле ForkJoinPool(айрыкча/кошуу алкагы деп да аталат) "капоттун астында" жашайбыз WorkStealingPool. Жалпысынан, Fork Join Framework Java 1.7де пайда болгон. Java 11 короодо болсо дагы, аны эстен чыгарбоо керек. Эң таралган тапшырма эмес, бирок абдан кызыктуу. Интернетте бул тема боюнча жакшы пикир бар: “ Fork/Join Framework in Java 7 ”. Fork/JoinPoolсыяктуу түшүнүк менен өз ишинде иш алып барат java.util.concurrent.RecursiveTask. аналогу да бар - java.util.concurrent.RecursiveAction. RecursiveActions натыйжаны кайтарbyte. Ошентип , RecursiveTaskокшош Callableжана RecursiveActionокшош Runnable. Ооба, аты карап, биз эки негизги ыкмаларын көрүп - forkжана join. Метод forkтапшырманы өзүнчө жипте асинхрондуу иштетет. Ал эми ыкма joinиштин аягына чейин күтүүгө мүмкүндүк берет. Аны колдонуунун бир нече жолу бар: Бул сүрөт Алексей Шипилевдин “ Айырык/кошуу: ишке ашыруу, колдонуу, аткарууJavaны жип менен буза албайсыз: V бөлүк - Аткаруучу, ThreadPool, Fork кошулуу - 5 ” баяндамасынан слайддын бир бөлүгү . Аны айкыныраак кылуу үчүн, анын JEE CONFдеги баяндамасын көрүү керек: " Fork Join ишке ашыруу өзгөчөлүктөрү ."

Жыйынтык чыгаруу

Ошентип, биз карап чыгуунун кийинки бөлүгүн бүтүрөбүз. ExecutorБиз жиптерди аткаруу үчүн биринчи жолу эмнени ойлоп тапканыбызды түшүндүк . Анан идеяны улантууну чечтик жана аны ойлоп таптык ExecutorService. жана ExecutorServiceаркылуу тапшырмаларды аткарууга жөнөтүүгө , ошондой эле кызматты өчүрүү менен башкарууга мүмкүндүк берет. Анткени "бизге ишке ашыруулар керек, биз заводдук ыкмалар менен класс жазып, аны . Бул сизге жип бассейндерин түзүүгө мүмкүндүк берет . Ошол эле учурда, ошондой эле аткаруу үчүн расписание көрсөтүүгө мүмкүндүк берет жип бассейндер бар, бирок жашырат . Жогоруда жазылгандар сиз үчүн кызыктуу гана болбостон, түшүнүктүү болду деп ишенем) Мен ар дайым сунуштарды жана комментарийлерди алганыма кубанычтамын. #ВячеславsubmitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
Комментарийлер
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION