Kirish
Shunday qilib, biz bilamizki, Java-da mavzular bor, ular haqida " Siz Java-ni mavzu bilan buzolmaydi: I qism - mavzular " sharhida o'qishingiz mumkin . Keling, namuna kodini yana ko'rib chiqaylik:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Ko'rib turganimizdek, vazifani ishga tushirish uchun kod juda standart, ammo har bir yangi ishga tushirish uchun biz uni takrorlashimiz kerak. Bitta yechim uni alohida usulga o'tkazishdir, masalan execute(Runnable runnable)
. Ammo Java dasturchilari allaqachon bizdan xavotirda va interfeysni o'ylab topishgan Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Ko'rib turganingizdek, kod yanada ixchamroq bo'ldi va uni Runnable
ish zarrachasida ishlatish uchun kod yozishga imkon berdi. Ajoyib, shunday emasmi? Ammo bu faqat boshlanishi:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
avlod interfeysiga ega ExecutorService
. Ushbu interfeysning JavaDoc-da aytilishicha, bu "a" ishni to'xtatish usullarini taqdim etadigan va bajarilish jarayonini kuzatish imkonini beruvchi ExecutorService
maxsus "a" tavsifi . Ilgari, " Siz Java-ni mavzu bilan buzolmaysiz: IV qism - Qo'ng'iroq qilish mumkin, kelajak va do'stlar " da biz imkoniyatlarni qisqacha ko'rib chiqdik . Agar siz uni unutgan bo'lsangiz yoki o'qimagan bo'lsangiz, xotirangizni yangilashni maslahat beraman;) JavaDoc-da yana qanday qiziqarli narsalar yozilgan? Bizda sukut bo'yicha mavjud bo'lgan ilovalarni yaratishga imkon beruvchi maxsus zavod mavjud . Executor
Executor
java.util.concurrent.Future
Future
java.util.concurrent.Executors
ExecutorService
Ijrochi xizmati
Yana eslaylik. Ip yaratishni amalga oshirish bizdan yashirin bo'lsa, bizExecutor
ipda ma'lum bir vazifani bajarishimiz (ya'ni, bajarishimiz) kerak. Bizda ijro jarayonini boshqarish uchun bir qator imkoniyatlarga ega bo'lgan ExecutorService
maxsus dastur mavjud. Va bizda yaratish imkonini beruvchi Executor
zavod mavjud . Keling, buni o'zimiz qilaylik: Executors
ExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Ko'rib turganimizdek, biz Fixed Thread Pool
2 o'lchamdagi o'zgarmas ipli hovuzni ( ) belgilab oldik. Shundan so'ng biz hovuzga vazifalarni birma-bir jo'natamiz. String
Har bir vazifa ip nomini ( ) o'z ichiga olgan qatorni ( ) qaytaradi currentThread().getName()
. Eng oxirida o'chirish muhim ExecutorService
, chunki aks holda bizning dasturimiz chiqmaydi. Zavodda Executors
boshqa zavod usullari mavjud . Misol uchun, biz faqat bitta ipdan iborat hovuz yaratishimiz mumkin - newSingleThreadExecutor
yoki keshlash bilan hovuz newCachedThreadPool
, bu erda iplar 1 daqiqa davomida bo'sh turgan bo'lsa, ular hovuzdan olib tashlanadi. Aslida, bularning orqasida vazifalar qo'yiladigan va bu vazifalar bajariladigan blokirovka navbatiExecutorService
mavjud . Navbatlarni bloklash haqida ko'proq ma'lumotni " Bloklash navbati - To'plamlar №5 - Kengaytirilgan Java " videosida ko'rish mumkin . Shuningdek, siz " Bir vaqtning o'zida paketning navbatlarini bloklash " sharhini va " Qachon ArrayBlockingQueue o'rniga LinkedBlockingQueue-ni afzal ko'rish kerak? " Degan savolga javobni o'qishingiz mumkin. Super soddalashtirilgan - (navbatni bloklash) ikki holatda ipni bloklaydi: BlockingQueue
- ip bo'sh navbatdan elementlarni olishga harakat qilmoqda
- ip elementlarni to'liq navbatga qo'yishga harakat qilmoqda
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
yoki
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Ko'rib turganimizdek, ilovalar zavod usullari ichida yaratilgan ExecutorService
. Va bu asosan ThreadPoolExecutor
. Faqat ishga ta'sir qiluvchi atributlar o'zgaradi.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Biz ilgari ko'rganimizdek, zavod usullari ichidaThreadPoolExecutor
, . Funktsionallikka qanday qiymatlar maksimal va minimal iplar sifatida uzatilishi, shuningdek, qanday navbat ishlatilganligi ta'sir qiladi. Va interfeysning har qanday amalga oshirilishi mumkin java.util.concurrent.BlockingQueue
. "Ahs" haqida gapirganda ThreadPoolExecutor
, ish paytida qiziqarli xususiyatlarni ta'kidlash kerak. Masalan, ThreadPoolExecutor
agar bo'sh joy bo'lmasa, siz topshiriqlarni yubora olmaysiz:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ushbu kod xato bilan muvaffaqiyatsiz bo'ladi:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Ya'ni, task
siz topshira olmaysiz, chunki SynchronousQueue
u aslida bitta elementdan iborat bo'lgan tarzda yaratilgan va u erda ko'proq qo'yishga imkon bermaydi. Ko'rib turganimizdek, queued tasks
bu erda 0 bor va bu erda g'alati narsa yo'q, chunki bu o'ziga xosdir SynchronousQueue
- aslida bu har doim bo'sh bo'lgan 1 ta elementdan iborat navbat. (!) Bitta ip elementni navbatga qo‘yganda, u navbatdan elementni boshqa ip olishini kutadi. Shuning uchun, biz bilan almashtirishimiz mumkin new LinkedBlockingQueue<>(1)
va xatoda ko'rsatilgan narsa o'zgaradi queued tasks = 1
. Chunki navbat faqat 1 element bo'lsa, ikkinchisini qo'sha olmaymiz. Va biz bunga tushamiz. ThreadPoolExecutor
Navbat mavzusini davom ettirib, sinfda navbatga xizmat ko'rsatishning qo'shimcha usullari mavjudligini ta'kidlash kerak . Misol uchun, usul threadPoolExecutor.purge()
navbatdagi joyni bo'shatish uchun barcha bekor qilingan vazifalarni navbatdan olib tashlaydi. Navbat bilan bog'liq yana bir qiziqarli xususiyat - bu qabul qilinmagan vazifani qayta ishlash:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Rejected
Misol uchun, ishlov beruvchi navbatdagi topshiriqni qabul qilishdan bosh tortgan har bir so'z uchun oddiygina so'zni chop etadi . Qulay, shunday emasmi? Bundan tashqari, ThreadPoolExecutor
uning qiziqarli merosxo'ri bor - ScheduledThreadPoolExecutor
kim ScheduledExecutorService
. Taymerda vazifani bajarish imkoniyatini beradi.
ScheduledExecutorService
ExecutorService
turi ScheduledExecutorService
sizga jadvalga muvofiq vazifalarni bajarishga imkon beradi. Keling, bir misolni ko'rib chiqaylik:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Bu erda hamma narsa oddiy. Vazifalar yuboriladi, biz "rejalashtirilgan vazifani" olamiz java.util.concurrent.ScheduledFuture
. Jadvalda quyidagi holat ham foydali bo'lishi mumkin:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Runnable
Bu erda biz belgilangan stavkada (Fixed Rate) ma'lum bir kechikish bilan bajarilishi kerak bo'lgan vazifani yuboramiz . Bunday holda, har 2 soniyada 1 soniyadan so'ng, vazifani bajarishni boshlang. Shunga o'xshash variant mavjud:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Lekin bu yerda topshiriqlar turli topshiriqlarning bajarilishi orasiga berilgan interval bilan bajariladi. Ya'ni vazifa task
1 soniyada bajariladi. Keyinchalik, u tugashi bilan 2 soniya o'tadi va keyin yangi vazifa boshlanadi. Ushbu mavzu bo'yicha quyidagi materiallarni o'qishingiz mumkin:
- Ip hovuzlariga kirish
- Thread Pools bilan tanishtirish
- Java Multithreading Cheeplechase: Ijrochilardagi vazifalarni bekor qilish
- Fon vazifalari uchun to'g'ri Java ijrochilarini tanlash
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealing Pool
Yuqorida aytib o'tilgan ipli hovuzlarga qo'shimcha ravishda yana bittasi bor. Siz uni bir oz o'ziga xos deb aytishingiz mumkin. Uning nomi Work Stealing Pool. Muxtasar qilib aytganda, Work Stealing - bu ish algoritmi bo'lib, unda bo'sh ish zarralari boshqa ish zarralari yoki umumiy navbatdagi vazifalarni olishni boshlaydi. Keling, bir misolni ko'rib chiqaylik:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Agar biz ushbu kodni ishga tushirsak, ExecutorService
u 5 ta ip hosil qiladi, chunki har bir ip ob'ekt joylashgan joyda kutish navbatiga qo'shiladi lock
. Biz allaqachon monitorlar va undagi qulflar haqida " Siz Java-ni mavzu bilan buzolmaysiz: II qism - Sinxronizatsiya " da muhokama qildik. Executors.newCachedThreadPool
Va endi biz uni bilan almashtiramiz Executors.newWorkStealingPool()
. Nima o'zgaradi? Bizning vazifalarimiz 5 ta ipda emas, balki kamroq bajarilganligini ko'ramiz. cachedThreadPool
Har bir vazifa uchun o'zingizning mavzuingizni yaratganingizni eslaysizmi ? Chunki wait
u ipni to'sib qo'ydi, lekin keyingi vazifalar bajarilishini xohladi va ular uchun hovuzda yangi iplar yaratildi. Iplar bo'lsa StealingPool
, ular abadiy ishlamaydilar wait
, ular qo'shni vazifalarni bajarishni boshlaydilar. Bu boshqa ip hovuzlaridan qanday farq qiladi WorkStealingPool
? Chunki uning ichida haqiqatan ham sehrli narsa yashaydi ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Aslida yana bitta farq bor. Oddiy orqali yaratilgan mavzulardan ForkJoinPool
farqli o'laroq, sukut bo'yicha yaratilgan mavzular demon iplaridir ThreadPool
. Umuman olganda, demon iplari haqida eslash kerak, chunki ... masalan, CompletableFuture
demon iplari ham ishlatiladi, agar siz o'zingizni belgilamasangiz ThreadFactory
, bu demon bo'lmagan iplarni yaratadi. Bu sizni kutilmagan joyda kutishi mumkin bo'lgan kutilmagan hodisalar!)
Vilka / Hovuzga qo'shilish
ForkJoinPool
Ushbu qismda biz "kaput ostida" yashaydigan bir xil (shuningdek, vilkalar/qo'shilish ramkasi deb ataladi) haqida gapiramiz WorkStealingPool
. Umuman olganda, Fork Join Framework Java 1.7 da paydo bo'ldi. Va Java 11 allaqachon hovlida bo'lsa ham, buni eslash kerak. Eng keng tarqalgan vazifa emas, lekin juda qiziq. Internetda ushbu mavzu bo'yicha yaxshi sharh mavjud: " Java 7 da Fork/Join Framework ". Fork/JoinPool
kabi tushuncha bilan ishida faoliyat yuritadi java.util.concurrent.RecursiveTask
. Analog ham mavjud - java.util.concurrent.RecursiveAction
. RecursiveActions natijani qaytarmaydi. Shunday qilib , RecursiveTask
shunga o'xshash Callable
va RecursiveAction
shunga o'xshash Runnable
. Xo'sh, nomga qarab, biz ikkita asosiy usulni ko'ramiz - fork
va join
. Usul fork
alohida ish zarrachasida asinxron vazifani bajaradi. Va usul join
sizga ish tugashini kutish imkonini beradi. Undan foydalanishning bir necha yo'li mavjud: Bu rasm Aleksey Shipilevning " Qo'shish/qo'shilish: amalga oshirish, foydalanish, ishlash " hisobotidagi slaydning bir qismidir . Buni aniqroq qilish uchun uning JEE CONF-dagi hisobotini ko'rib chiqishga arziydi: " Fork qo'shilishni amalga oshirish xususiyatlari ."
Xulosa qilish
Shunday qilib, biz sharhning keyingi qismini yakunlaymiz. Biz iplarni bajarish uchun birinchi marta nimani o'ylab topdikExecutor
. Keyin g'oyani davom ettirishga qaror qildik va o'ylab topdik ExecutorService
. va ExecutorService
yordamida vazifalarni bajarish uchun yuborish , shuningdek, xizmatni o'chirish orqali boshqarish imkonini beradi. Chunki "Bizga ilovalar kerak, biz zavod usullari bilan sinf yozdik va uni . Bu sizga ip hovuzlarini yaratishga imkon beradi . Shu bilan birga, ip hovuzlari mavjud, ular ham bajarish uchun jadvalni belgilashga imkon beradi, lekin yashiradi . Umid qilamanki, yuqorida yozilganlar siz uchun nafaqat qiziqarli, balki tushunarli bo'ldi) Men har doim taklif va sharhlarni olishdan xursandman. #Viacheslavsubmit
invoke
ExecutorService
Executors
ThreadPoolExecutor
WorkStealingPool
ForkJoinPool
GO TO FULL VERSION