Giriş
Beləliklə, bilirik ki, Java-da mövzular var, onlar haqqında " Bir Mövzu ilə Java-nı korlaya bilməzsiniz: I hissə - Mövzular " icmalında oxuya bilərsiniz . Nümunə koduna yenidən baxaq:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Gördüyümüz kimi, tapşırığı işə salmaq üçün kod olduqca standartdır, lakin hər yeni işə salınması üçün onu təkrarlamalı olacağıq. Bir həll onu ayrıca bir metoda köçürməkdir, məsələn execute(Runnable runnable)
. Lakin Java tərtibatçıları artıq bizdən narahat olublar və interfeys təklif ediblər Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Gördüyünüz kimi, kod daha yığcam hala gəldi və bizə sadəcə onu Runnable
bir ipdə işlətmək üçün kod yazmağa imkan verdi. Əla, elə deyilmi? Ancaq bu yalnız başlanğıcdır:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
nəsil interfeysə malikdir ExecutorService
. Bu interfeysin JavaDoc-u deyir ki, bu, 'a' işini dayandırmaq üçün üsulları təmin edən və icranın gedişatını izləməyə imkan verən ExecutorService
xüsusi 'a'nın təsviridir . Əvvəllər “ Yava-nı mövzu ilə korlaya bilməzsiniz: IV hissə - Zəng edilə bilən, Gələcək və Dostlar ” bölməsində biz imkanları qısaca nəzərdən keçirdik . Əgər unutmusunuzsa və ya oxumamısınızsa, yaddaşınızı təzələməyi məsləhət görürəm ;) JavaDoc-da başqa hansı maraqlı şeylər yazılıb? Defolt olaraq mövcud olan tətbiqlər yaratmağa imkan verən xüsusi bir fabrikimiz var . Executor
Executor
java.util.concurrent.Future
Future
java.util.concurrent.Executors
ExecutorService
İcraçı Xidmət
Bir daha xatırlayaq.Executor
Bir ip yaratmağın həyata keçirilməsi bizdən gizli olduqda, bir ipdə müəyyən bir tapşırığı yerinə yetirməliyik (yəni icra etməliyik) . İcra prosesini idarə etmək üçün bir sıra imkanlara malik olan ExecutorService
xüsusi birimiz var . Və yaratmağa imkan verən Executor
bir fabrikimiz var . İndi özümüz edək: Executors
ExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Gördüyümüz kimi, biz Fixed Thread Pool
2 ölçülü sabit iplik hovuzunu ( ) təyin etdik. Bundan sonra biz tapşırıqları bir-bir hovuza göndəririk. String
Hər tapşırıq mövzunun adını ( ) ehtiva edən sətir ( ) qaytarır currentThread().getName()
. Ən sonunda bağlamaq vacibdir ExecutorService
, çünki əks halda proqramımız çıxmayacaq. Zavodda Executors
başqa zavod üsulları da var . Məsələn, biz yalnız bir başlıqdan ibarət hovuz yarada bilərik - newSingleThreadExecutor
ya da keşləmə ilə hovuz yarada bilərik newCachedThreadPool
ki, burada mövzular 1 dəqiqə boş qaldıqda hovuzdan silinəcək. Əslində, bunların arxasında tapşırıqların yerləşdirildiyi və bu tapşırıqların yerinə yetirildiyi bloklama növbəsiExecutorService
var . Növbələrin bloklanması haqqında daha çox məlumatı " Bloklama növbəsi - Kolleksiyalar #5 - Qabaqcıl Java " videosunda görmək olar . Siz həmçinin “ Paketin növbələrinin bloklanması ” icmalı və “ ArrayBlockingQueue-dan nə vaxt LinkedBlockingQueue-ə üstünlük verilməlidir? ” sualının cavabını oxuya bilərsiniz. Super sadələşdirilmiş - (bloklama növbəsi) iki halda mövzunu bloklayır: BlockingQueue
- mövzu boş növbədən elementlər almağa çalışır
- mövzu elementləri tam növbəyə qoymağa çalışır
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
və ya
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Gördüyümüz kimi, tətbiqlər zavod üsulları daxilində yaradılır ExecutorService
. Və əsasən budur ThreadPoolExecutor
. Yalnız işə təsir edən atributlar dəyişir.
https://en.wikipedia.org/wiki/Thread_pool#/media/Fayl:Thread_pool.svg
ThreadPoolExecutor
Daha əvvəl gördüyümüz kimi, zavod daxilində üsullarThreadPoolExecutor
, . Funksionallıq maksimum və minimum iplər kimi hansı dəyərlərin ötürülməsindən, həmçinin hansı növbənin istifadə olunduğundan təsirlənir. Və interfeysin hər hansı bir tətbiqi istifadə edilə bilər java.util.concurrent.BlockingQueue
. 'ahs'dan danışarkən ThreadPoolExecutor
, əməliyyat zamanı maraqlı xüsusiyyətləri qeyd etmək lazımdır. Məsələn, ThreadPoolExecutor
orada yer yoxdursa, tapşırıqları göndərə bilməzsiniz:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Bu kod kimi bir xəta ilə uğursuz olacaq:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Yəni task
təqdim edə bilməzsiniz, çünki SynchronousQueue
elə qurulub ki, əslində bir elementdən ibarətdir və ora daha çox yerləşdirməyə imkan vermir. Gördüyümüz kimi queued tasks
burada 0 var və bunda qəribə heç nə yoxdur, çünki bu spesifikdir SynchronousQueue
- əslində bu, həmişə boş olan 1 elementdən ibarət növbədir. (!) Bir ip elementi növbəyə qoyduqda, başqa bir ip elementi növbədən götürənə qədər gözləyəcək. Buna görə də, ilə əvəz edə bilərik new LinkedBlockingQueue<>(1)
və səhvdə nə göstəriləcəksə dəyişəcək queued tasks = 1
. Çünki növbə yalnız 1 elementdir, onda ikincisini əlavə edə bilmərik. Və bunun üzərinə düşəcəyik. Növbənin mövzusunu davam etdirərək qeyd etmək lazımdır ki, sinifdə ThreadPoolExecutor
növbəyə xidmət göstərmək üçün əlavə üsullar var. Məsələn, threadPoolExecutor.purge()
növbədə yer boşaltmaq üçün metod bütün ləğv edilmiş tapşırıqları növbədən siləcək. Növbə ilə əlaqəli başqa bir maraqlı xüsusiyyət qəbul edilməmiş tapşırıq idarəedicisidir:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Məsələn, işləyici Rejected
növbəyə tapşırığı qəbul etməkdən hər imtina üçün sadəcə bir söz çap edir. Rahatdır, elə deyilmi? Bundan əlavə, ThreadPoolExecutor
onun maraqlı bir varisi var - ScheduledThreadPoolExecutor
kim ScheduledExecutorService
. Taymerdə tapşırığı yerinə yetirmək imkanı verir.
Scheduled ExecutorService
ExecutorService
tip ScheduledExecutorService
sizə tapşırıqları cədvələ uyğun icra etməyə imkan verir. Bir misala baxaq:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Burada hər şey sadədir. Tapşırıqlar göndərilir, biz “planlaşdırılmış tapşırıq” alırıq java.util.concurrent.ScheduledFuture
. Aşağıdakı vəziyyət də cədvəllə faydalı ola bilər:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Runnable
Burada biz tapşırığı müəyyən gecikmə ilə sabit tariflə (Fixed Rate) yerinə yetirmək üçün göndəririk . Bu halda, hər 2 saniyədən bir 1 saniyədən sonra tapşırığı yerinə yetirməyə başlayın. Bənzər bir seçim var:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Lakin burada tapşırıqlar müxtəlif tapşırıqların icrası ARASINDA verilmiş intervalla yerinə yetirilir. Yəni tapşırıq task
1 saniyəyə tamamlanacaq. Sonra, tamamlanan kimi 2 saniyə keçəcək və sonra yeni bir tapşırıq başlayacaq. Bu mövzuda aşağıdakı materialları oxuya bilərsiniz:
- İp hovuzlarına giriş
- Thread Pools-a giriş
- Java Multithreading Steeplechase: İcraçılarda tapşırıqların ləğvi
- Fon tapşırıqları üçün düzgün Java icraçılarının seçilməsi
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealing Pool
Yuxarıda göstərilən iplik hovuzlarına əlavə olaraq, daha biri var. Onun bir az xüsusi olduğunu deyə bilərsiniz. Onun adı Work Stealing Pool-dur. Qısacası, Work Stealing boş iplərin digər iplərdən və ya ümumi növbədən tapşırıqları almağa başladığı bir iş alqoritmidir. Bir misala baxaq:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Bu kodu işlətsək, ExecutorService
5 mövzu yaradacaq, çünki hər mövzu obyektin yerində gözləmə növbəsinə qoşulacaq lock
. Biz artıq monitorlar və kilidlər haqqında “ Java-nı mövzu ilə korlaya bilməzsiniz: II hissə - Sinxronizasiya ” bölməsində müzakirə etdik. İndi biz onu Executors.newCachedThreadPool
ilə əvəz edəcəyik Executors.newWorkStealingPool()
. Nə dəyişəcək? Görəcəyik ki, tapşırıqlarımız 5 mövzuda deyil, daha az şəkildə yerinə yetirilir. cachedThreadPool
Hər bir tapşırıq üçün öz başlığınızı yaratdığınızı xatırlayın ? Çünki wait
mövzunu blokladı, lakin növbəti tapşırıqlar yerinə yetirilmək istədi və onlar üçün hovuzda yeni mövzular yaradıldı. Mövzular vəziyyətində StealingPool
, onlar əbədi olaraq boş qalmayacaqlar wait
, qonşu vəzifələri yerinə yetirməyə başlayacaqlar. Bu digər iplik hovuzlarından nə ilə fərqlənir WorkStealingPool
? Çünki onun daxilində həqiqətən sehrli bir şey yaşayır ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Əslində daha bir fərq var. Defolt olaraq yaradılan mövzular ForkJoinPool
, müntəzəm vasitəsilə yaradılan mövzulardan fərqli olaraq demon mövzulardır ThreadPool
. Ümumiyyətlə, demon ipləri haqqında xatırlamağa dəyər, çünki... məsələn, CompletableFuture
demon mövzuları da istifadə olunur, əgər siz öz şəxsi qeyd etməsəniz ThreadFactory
, bu daemon olmayan iplər yaradacaq. Bunlar sizi gözlənilməz yerdə gözləyə biləcək sürprizlərdir!)
Çəngəl/Hovuzu Qoşun
ForkJoinPool
Bu hissədə "başlıq altında" yaşayan eyni (həmçinin çəngəl/qoşulma çərçivəsi adlanır) haqqında danışacağıq WorkStealingPool
. Ümumiyyətlə, Fork Join Framework Java 1.7-də ortaya çıxdı. Java 11 artıq həyətdə olsa belə, yenə də xatırlamağa dəyər. Ən ümumi tapşırıq deyil, lakin olduqca maraqlıdır. İnternetdə bu mövzuda yaxşı bir rəy var: “ Java 7-də Fork/Join Framework ”. Fork/JoinPool
kimi anlayışla işində fəaliyyət göstərir java.util.concurrent.RecursiveTask
. Bir analoq da var - java.util.concurrent.RecursiveAction
. RecursiveActions nəticə qaytarmır. Beləliklə , RecursiveTask
oxşar Callable
və RecursiveAction
oxşardır Runnable
. Yaxşı, ada baxaraq, iki əsas metodu görürük - fork
və join
. Metod fork
bir tapşırığı ayrı bir başlıqda asinxron şəkildə icra edir. Və üsul join
işin tamamlanmasını gözləməyə imkan verir. Ondan istifadə etməyin bir neçə yolu var: Bu şəkil Aleksey Şipilevin “ Çalmaq/Qoşulmaq: həyata keçirmə, istifadə, performans ” hesabatından slaydın bir hissəsidir . Bunu daha aydın etmək üçün onun JEE CONF-dakı hesabatına baxmağa dəyər: “ Çəngəllə qoşulun tətbiq xüsusiyyətləri .”
Xülasə
Beləliklə, nəzərdən keçirmənin növbəti hissəsini bitiririk. Mövzuları icra etmək üçün ilk dəfə nə tapdığımızı anladıqExecutor
. Sonra ideyanı davam etdirmək qərarına gəldik və bu fikrə gəldik ExecutorService
. və ExecutorService
istifadə edərək icra üçün tapşırıqlar göndərməyə , həmçinin xidməti söndürməklə idarə etməyə imkan verir. Çünki 'Bizə tətbiqlərə ehtiyacımız var, biz fabrik üsulları ilə bir sinif yazdıq və onu adlandırdıq . Bu, ip hovuzları yaratmağa imkan verir . Eyni zamanda, icra üçün bir cədvəl təyin etməyə imkan verən mövzu hovuzları var, lakin o, arxasında gizlənir . Ümid edirəm ki, yuxarıda yazılanlar təkcə sizin üçün maraqlı deyil, həm də başa düşülən oldu) Mən həmişə təklif və şərhləri almaqdan məmnunam. #Viaçeslavsubmit
invoke
ExecutorService
Executors
ThreadPoolExecutor
WorkStealingPool
ForkJoinPool
GO TO FULL VERSION