Киришүү
Ошентип, биз Java'да жиптер бар экенин билебиз, алар жөнүндө сиз " Жаваны жип менен буза албайсыз: I бөлүк - Жиптер " сынынан окуй аласыз . Келгиле, үлгү codeун кайра карап көрөлү:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Көрүнүп тургандай, тапшырманы ишке киргизүү codeу абдан стандарттуу, бирок ар бир жаңы ишке киргизүү үчүн биз аны кайталашыбыз керек. Бир чечим аны өзүнчө ыкмага жылдыруу болуп саналат, мисалы execute(Runnable runnable)
. Бирок Java иштеп чыгуучулары биз үчүн тынчсызданып, интерфейсти ойлоп табышты Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Көрүнүп тургандай, code кыскараак болуп калды жана аны Runnable
жипте иштетүү үчүн жөн гана code жазууга мүмкүнчүлүк берди. Сонун, туурабы? Бирок, бул башталышы гана:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
тукум интерфейси бар ExecutorService
. Бул интерфейстин JavaDoc бул ExecutorService
атайын "а" сыпаттамасы деп айтылат, ал "а" Executor
ишин токтотуу ыкмаларын камсыз кылат жана аткаруунун жүрүшүнө көз салууга Executor
мүмкүндүк берет . java.util.concurrent.Future
Буга чейин, " Жаваны жип менен буза албайсыз: IV-бөлүк - Чакырылган, Келечек жана Достор " бөлүмүндө биз мүмкүнчүлүктөрдү кыскача карап чыкканбыз Future
. Эгер сиз аны унутуп калсаңыз же окуй элек болсоңуз, эс тутумуңузду жаңыртууну сунуштайм ;) JavaDocто дагы эмне кызыктуу? java.util.concurrent.Executors
Бизде демейки боюнча жеткorктүү болгон ишке ашырууларды түзүүгө мүмкүндүк берген атайын завод бар ExecutorService
.
ExecutorService
Дагы бир жолу эстейли.Executor
Жипти түзүүнүн аткарылышы бизден жашырылганда, биз жипте белгилүү бир тапшырманы аткарышыбыз керек (б.а.). Бизде аткаруунун жүрүшүн башкаруу үчүн мүмкүнчүлүктөрдүн комплекси бар ExecutorService
атайын бар. Executor
А бизде завод бар Executors
, ал түзүүгө мүмкүндүк берет ExecutorService
. Келгиле, азыр өзүбүз кылалы:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Көрүнүп тургандай, биз Fixed Thread Pool
2-өлчөмдөгү туруктуу жип бассейнин ( ) көрсөттүк. Андан кийин биз бассейнге тапшырмаларды бирден жөнөтөбүз. String
Ар бир тапшырма жиптин атын ( ) камтыган сапты ( ) кайтарат currentThread().getName()
. Эң аягында өчүрүү маанилүү ExecutorService
, анткени антпесе биздин программа чыкпайт. Фабрикада Executors
башка заводдук ыкмалар бар . Мисалы, биз бир эле жиптен турган бассейнди түзө алабыз - newSingleThreadExecutor
же кэштелген бассейн newCachedThreadPool
, анда жиптер 1 мүнөт бош турса, алар бассейнден алынып салынат. Чынында, булардын артында тапшырмалар коюлган жана бул тапшырмалар аткарыла турган бөгөттөөчү кезекExecutorService
бар . Кезектерди бөгөттөө жөнүндө көбүрөөк маалыматты " Бөгөттөө кезек - Коллекциялар №5 - Өркүндөтүлгөн Java " видеосунда көрүүгө болот . Сиз ошондой эле " Бир убактагы пакеттин кезектерин бөгөттөө " сын-пикирди жана " ArrayBlockingQueueге караганда LinkedBlockingQueue'ге качан артыкчылык берүү керек? " деген суроого жоопту окуй аласыз. Супер жөнөкөйлөштүрүлгөн - (бөгөттөө кезеги) жипти эки учурда бөгөттөйт: BlockingQueue
- жип бош кезектеги элементтерди алууга аракет кылып жатат
- жип элементтерди толук кезекке коюуга аракет кылып жатат
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
же
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Көрүнүп тургандай, ишке ашыруулар фабриканын ичинде түзүлөт ExecutorService
. Жана бул негизинен ThreadPoolExecutor
. Жумушка таасир этүүчү атрибуттар гана өзгөрөт.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Мурда көргөнүбүздөй, заводдун ичиндеги ыкмаларThreadPoolExecutor
, . Функционалдуулукка максималдуу жана минималдуу жиптер катары кандай маанилер берилгени, ошондой эле кандай кезек колдонулганы таасир этет. Жана интерфейстин каалаган ишке ашырылышы колдонулушу мүмкүн java.util.concurrent.BlockingQueue
. 'ahs кеп ThreadPoolExecutor
, иш учурунда кызыктуу өзгөчөлүктөрүн белгилей кетүү керек. Мисалы, ThreadPoolExecutor
анда орун жок болсо, тапшырмаларды жөнөтө албайсыз:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Бул code төмөнкүдөй ката менен ишке ашпай калат:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Башкача айтканда, task
сиз тапшыра албайсыз, анткени SynchronousQueue
ал чындыгында бир элементтен тургандай кылып иштелип чыккан жана ал жерге көбүрөөк коюуга жол бербейт. Көрүнүп тургандай, queued tasks
бул жерде 0 бар, мында кызыктай эч нерсе жок, анткени бул конкреттүү SynchronousQueue
- чындыгында, бул дайыма бош турган 1 элементтен турган кезек. (!) Бир жип элементти кезекке койгондо, башка жип кезектен элементти алганга чейин күтөт. Ошондуктан, биз менен алмаштыра алабыз new LinkedBlockingQueue<>(1)
жана катада көрсөтүлгөн нерсе өзгөрөт queued tasks = 1
. Анткени кезек 1 гана элемент, анда биз экинчисин кошо албайбыз. А биз мунун үстүнөн чыгабыз. Кезектин темасын улантып, класста ThreadPoolExecutor
кезекти тейлөөнүн кошумча ыкмалары бар экендигин белгилей кетүү керек. Мисалы, ыкма threadPoolExecutor.purge()
кезектеги орун бошотуу үчүн бардык жокко чыгарылган тапшырмаларды кезектен алып салат. Кезекке байланыштуу дагы бир кызыктуу өзгөчөлүк - бул кабыл алынбаган тапшырманы иштетүүчү:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Мисалы, иштетүүчү Rejected
кезектеги тапшырманы кабыл алуудан баш тартуу үчүн сөздү басып чыгарат. Ыңгайлуу, туурабы? Мындан тышкары, ThreadPoolExecutor
анын кызыктуу мураскору бар - ScheduledThreadPoolExecutor
ким ScheduledExecutorService
. Ал таймерде тапшырманы аткаруу мүмкүнчүлүгүн берет.
ScheduledExecutorService
ExecutorService
түрү ScheduledExecutorService
сизге графикке ылайык тапшырмаларды аткарууга мүмкүндүк берет. Келгиле, бир мисал карап көрөлү:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Бул жерде баары жөнөкөй. Тапшырмалар жөнөтүлдү, биз "пландуу тапшырманы" алабыз java.util.concurrent.ScheduledFuture
. Төмөнкү жагдай да график менен пайдалуу болушу мүмкүн:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Бул жерде биз Runnable
белгилүү бир кечигүү менен белгиленген тарифте (Fixed Rate) аткарыла турган тапшырманы жөнөтөбүз. Бул учурда, ар бир 2 секунддан кийин 1 секунддан кийин тапшырманы аткарууга киришиңиз. Окшош вариант бар:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Бирок бул жерде тапшырмалар ар кандай тапшырмаларды аткаруунун ортосунда берилген интервал менен аткарылат. Башкача айтканда, тапшырма task
1 секундда аткарылат. Андан кийин, ал бүтөөрү менен 2 секунд өтөт, андан кийин жаңы тапшырма башталат. Бул тема боюнча төмөнкү материалдарды окуй аласыз:
- Жип бассейндерине киришүү
- Thread Pools менен таанышуу
- Java Multithreading Steeplechase: аткаруучуларда тапшырмаларды жокко чыгаруу
- Фондук тапшырмалар үчүн туура Java аткаруучуларды тандоо
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Жогоруда айтылган жип көлмөлөрүнөн тышкары дагы бирөө бар. Ал бир аз өзгөчө деп айтса болот. Анын аты Work Stealing Pool. Кыскача айтканда, Work Stealing – бул иштебеген жиптер башка жиптерден тапшырмаларды же жалпы кезектеги тапшырмаларды ала баштаган жумуш алгоритми. Келгиле, бир мисал карап көрөлү:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Бул codeду иштетсек, ExecutorService
ал 5 жипти жаратат, анткени ар бир жип an objectтин жайгашкан жеринде күтүү кезегине кошулат lock
. Биз буга чейин мониторлор жана кулпулар жөнүндө " Сиз Javaны жип менен буза албайсыз: II бөлүк - Синхрондоштуруу " бөлүмүндө талкуулаганбыз. Executors.newCachedThreadPool
Эми аны менен алмаштырабыз Executors.newWorkStealingPool()
. Эмне өзгөрөт? Биздин тапшырмалар 5 жипте эмес, азыраак аткарылганын көрөбүз. cachedThreadPool
Ар бир тапшырма үчүн өзүңүздүн жипиңизди жаратканыңызды унутпаңызбы ? Анткени wait
ал жипти жаап салган, бирок кийинки тапшырмалар аткарылгысы келген жана алар үчүн бассейнде жаңы жиптер түзүлгөн. Жиптер болсо StealingPool
, алар түбөлүккө иштебейт wait
, кошуна тапшырмаларды аткара башташат. Бул башка жип бассейндеринен эмнеси менен айырмаланат WorkStealingPool
? Анткени анын ичинде чындап эле сыйкырдуу бир нерсе жашайт ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Чынында дагы бир айырма бар. Демейки боюнча түзүлгөн жиптер ForkJoinPool
кадимки аркылуу түзүлгөн жиптерден айырмаланып, демон жиптери болуп саналат ThreadPool
. Жалпысынан алганда, демон жиптери жөнүндө эстен чыгарбоо керек, анткени... мисалы, CompletableFuture
демон жиптери да колдонулат, эгерде сиз өзүңүздүн ThreadFactory
, демон эмес жиптерди түзө турган болсоңуз. Бул күтүлбөгөн жерден сизди күтө турган сюрприздердин түрлөрү!)
Айыр / кошулуу бассейни
Бул бөлүктө биз ошол элеForkJoinPool
(айрыкча/кошуу алкагы деп да аталат) "капоттун астында" жашайбыз WorkStealingPool
. Жалпысынан, Fork Join Framework Java 1.7де пайда болгон. Java 11 короодо болсо дагы, аны эстен чыгарбоо керек. Эң таралган тапшырма эмес, бирок абдан кызыктуу. Интернетте бул тема боюнча жакшы пикир бар: “ Fork/Join Framework in Java 7 ”. Fork/JoinPool
сыяктуу түшүнүк менен өз ишинде иш алып барат java.util.concurrent.RecursiveTask
. аналогу да бар - java.util.concurrent.RecursiveAction
. RecursiveActions натыйжаны кайтарbyte. Ошентип , RecursiveTask
окшош Callable
жана RecursiveAction
окшош Runnable
. Ооба, аты карап, биз эки негизги ыкмаларын көрүп - fork
жана join
. Метод fork
тапшырманы өзүнчө жипте асинхрондуу иштетет. Ал эми ыкма join
иштин аягына чейин күтүүгө мүмкүндүк берет. Аны колдонуунун бир нече жолу бар: Бул сүрөт Алексей Шипилевдин “ Айырык/кошуу: ишке ашыруу, колдонуу, аткаруу ” баяндамасынан слайддын бир бөлүгү . Аны айкыныраак кылуу үчүн, анын JEE CONFдеги баяндамасын көрүү керек: " Fork Join ишке ашыруу өзгөчөлүктөрү ."
Жыйынтык чыгаруу
Ошентип, биз карап чыгуунун кийинки бөлүгүн бүтүрөбүз.Executor
Биз жиптерди аткаруу үчүн биринчи жолу эмнени ойлоп тапканыбызды түшүндүк . Анан идеяны улантууну чечтик жана аны ойлоп таптык ExecutorService
. жана ExecutorService
аркылуу тапшырмаларды аткарууга жөнөтүүгө , ошондой эле кызматты өчүрүү менен башкарууга мүмкүндүк берет. Анткени "бизге ишке ашыруулар керек, биз заводдук ыкмалар менен класс жазып, аны . Бул сизге жип бассейндерин түзүүгө мүмкүндүк берет . Ошол эле учурда, ошондой эле аткаруу үчүн расписание көрсөтүүгө мүмкүндүк берет жип бассейндер бар, бирок жашырат . Жогоруда жазылгандар сиз үчүн кызыктуу гана болбостон, түшүнүктүү болду деп ишенем) Мен ар дайым сунуштарды жана комментарийлерди алганыма кубанычтамын. #Вячеславsubmit
invoke
ExecutorService
Executors
ThreadPoolExecutor
WorkStealingPool
ForkJoinPool
GO TO FULL VERSION