Pambuka
Dadi, kita ngerti manawa ana utas ing Jawa, sing bisa diwaca ing review " Sampeyan Ora Bisa Ngrusak Jawa nganggo Utas: Bagian I - Utas ". Ayo ndeleng kode conto maneh:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Kaya sing kita deleng, kode kanggo ngluncurake tugas kasebut cukup standar, nanging kanggo saben peluncuran anyar kita kudu mbaleni. Siji solusi kanggo mindhah menyang cara sing kapisah, contone execute(Runnable runnable)
. Nanging pangembang Jawa wis kuwatir babagan kita lan nggawe antarmuka Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Nalika sampeyan bisa ndeleng, kode wis dadi luwih ringkes lan ngidini kita mung nulis kode kanggo mbukak Runnable
ing thread. Apik, ta? Nanging iki mung wiwitan:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
duwe antarmuka turunan ExecutorService
. JavaDoc antarmuka iki ujar manawa ExecutorService
minangka katrangan khusus Executor
'a' sing nyedhiyakake cara kanggo mungkasi karya Executor
'a' lan ngidini sampeyan nglacak java.util.concurrent.Future
kemajuan eksekusi. Sadurunge, ing " Sampeyan Ora Bisa Ngrusak Jawa nganggo Utas: Part IV - Bisa Ditelpon, Masa Depan lan Kanca-kanca, " kita nliti kemungkinan kasebut Future
. Yen sampeyan kelalen utawa durung maca, Aku menehi saran supaya refresh memori;) Apa liyane menarik sing ditulis ing JavaDoc? Kita duwe pabrik khusus java.util.concurrent.Executors
sing ngidini kita nggawe implementasi sing kasedhiya kanthi standar ExecutorService
.
PelaksanaLayanan
Ayo eling maneh. Kita kuduExecutor
nglakokake (yaiku nglakokake) tugas tartamtu ing thread, nalika implementasine nggawe thread didhelikake saka kita. Kita duwe ExecutorService
khusus Executor
sing nduweni kemampuan kanggo ngatur kemajuan eksekusi. Lan kita duwe pabrik Executors
sing ngidini sampeyan nggawe ExecutorService
. Ayo dhewe saiki:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Nalika kita bisa ndeleng, kita wis kasebut blumbang thread tetep ( Fixed Thread Pool
) ukuran 2. Sawisé iku kita ngirim tugas kanggo blumbang siji. Saben tugas ngasilake string ( String
) sing ngemot jeneng utas ( currentThread().getName()
). Penting kanggo mateni ing pungkasan ExecutorService
, amarga yen program kita ora bakal metu. Executors
Ana cara pabrik liyane ing pabrik . Contone, kita bisa nggawe blumbang mung siji thread - newSingleThreadExecutor
utawa blumbang karo caching newCachedThreadPool
, ngendi Utas bakal dibusak saka blumbang yen lagi nganggur kanggo 1 menit. Nyatane, ing mburi iki ExecutorService
ana antrian pamblokiran ing ngendi tugas diselehake lan saka ngendi tugas kasebut dileksanakake. Informasi liyane babagan pamblokiran antrian bisa dideleng ing video " Antrian pamblokiran - Koleksi #5 - Jawa Lanjut ". Sampeyan uga bisa maca review " Watesan antrian paket bebarengan " lan jawaban kanggo pitakonan " Kapan luwih seneng LinkedBlockingQueue tinimbang ArrayBlockingQueue ? Super simplified - BlockingQueue
(antri mblokir) mblokir thread, ing rong kasus:
- Utas nyoba njupuk unsur saka antrian kosong
- thread nyoba kanggo sijine unsur menyang antrian lengkap
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
utawa
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Kaya sing kita deleng, implementasine digawe ing metode pabrik ExecutorService
. Lan iku dhasare ThreadPoolExecutor
. Mung atribut sing mengaruhi owah-owahan karya.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Minangka sadurunge kita weruh, nang cara pabrikThreadPoolExecutor
,. Fungsi kasebut kena pengaruh apa nilai sing dilewati minangka utas maksimal lan minimal, uga antrian apa sing digunakake. Lan apa wae implementasi antarmuka bisa digunakake java.util.concurrent.BlockingQueue
. Ngomong ThreadPoolExecutor
'ahs, iku worth kang lagi nyimak fitur menarik sak operasi. Contone, sampeyan ora bisa ngirim tugas ThreadPoolExecutor
yen ora ana spasi:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Kode iki bakal gagal karo kesalahan kaya:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Sing, task
sampeyan ora bisa ngirim, amarga SynchronousQueue
iku dirancang ing kuwi cara sing bener kasusun saka siji unsur lan ora ngijini sampeyan kanggo sijine liyane ana. Nalika kita bisa ndeleng, queued tasks
ana 0 kene, lan ora ana sing aneh, amarga iki tartamtu SynchronousQueue
- ing kasunyatan, iku antrian 1 unsur, kang tansah kosong. (!) Nalika siji thread nempatno unsur menyang antrian, bakal ngenteni nganti thread liyane njupuk unsur saka antrian. Mulane, kita bisa ngganti karo new LinkedBlockingQueue<>(1)
lan apa sing bakal dituduhake ing kesalahan bakal diganti queued tasks = 1
. Amarga antrian mung 1 unsur, banjur kita ora bisa nambah liyane. Lan kita bakal tiba ing iki. Terus tema antrian, iku worth kang lagi nyimak sing kelas ThreadPoolExecutor
wis cara tambahan kanggo layanan antrian. Contone, cara kasebut threadPoolExecutor.purge()
bakal mbusak kabeh tugas sing dibatalake saka antrian kanggo mbebasake spasi ing antrian. Fitur menarik liyane sing ana gandhengane karo antrian yaiku panangan tugas sing ora ditampa:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Contone, pawang mung nyithak tembung Rejected
kanggo saben penolakan nampa tugas menyang antrian. Trep, ta? Kajaba iku, ThreadPoolExecutor
dheweke duwe ahli waris sing menarik - ScheduledThreadPoolExecutor
yaiku ScheduledExecutorService
. Iki menehi kemampuan kanggo nindakake tugas ing timer.
ScheduledExecutorService
ExecutorService
jinis ScheduledExecutorService
ngijini sampeyan kanggo mbukak tugas miturut jadwal. Ayo ndeleng conto:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Kabeh iku prasaja ing kene. Tugas dikirim, kita nampa "tugas sing dijadwalake" java.util.concurrent.ScheduledFuture
. Kasus ing ngisor iki bisa uga migunani karo jadwal:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ing kene kita ngirim Runnable
tugas kanggo dileksanakake ing tingkat tetep (Tarif Tetap) kanthi wektu tundha tartamtu. Ing kasus iki, sawise 1 detik saben 2 detik, miwiti nglakokaké tugas. Ana pilihan sing padha:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Nanging ing kene tugas dieksekusi kanthi interval tartamtu ANTARA eksekusi tugas sing beda-beda. Tegese, tugas task
bakal rampung ing 1 detik. Sabanjure, sanalika rampung, 2 detik bakal liwati, banjur tugas anyar bakal diluncurake. Sampeyan bisa maca bahan ing ngisor iki babagan topik iki:
- Pambuka kanggo pools thread
- Pambuka kanggo Thread Pools
- Java Multithreading Steeplechase: Mbatalake Tugas ing Pelaksana
- Milih eksekutor Jawa sing bener kanggo tugas latar mburi
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Saliyane pools thread kasebut ing ndhuwur, ana siji liyane. Sampeyan bisa ngomong yen dheweke rada khusus. Jenenge Work Stealing Pool. Singkatnya, Work Stealing minangka algoritma kerja ing ngendi benang nganggur wiwit njupuk tugas saka utas utawa tugas liyane saka antrian umum. Ayo ndeleng conto:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Yen kita mbukak kode iki, ExecutorService
bakal nggawe 5 Utas, amarga saben utas bakal gabung karo antrian ngenteni ing lokasi obyek lock
. Kita wis ngrembug babagan monitor lan kunci ing " Sampeyan ora bisa ngrusak Jawa nganggo Utas: Bagian II - sinkronisasi ." Lan saiki kita bakal ngganti Executors.newCachedThreadPool
karo Executors.newWorkStealingPool()
. Apa sing bakal diganti? Kita bakal weruh manawa tugas kita ora ditindakake ing 5 utas, nanging luwih sithik. Elinga yen cachedThreadPool
sampeyan nggawe thread dhewe kanggo saben tugas? Amarga wait
diblokir thread, nanging tugas sabanjuré wanted kanggo kaleksanan lan Utas anyar digawe ing blumbang kanggo wong-wong mau. Ing kasus StealingPool
benang, dheweke ora bakal nganggur ing salawas-lawase wait
; dheweke bakal miwiti nindakake tugas tetanggan. Kepiye bedane iki karo kolam benang liyane WorkStealingPool
? Amarga ana sing gaib sing manggon ing njero dheweke ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Bener ana siji liyane prabédan. Utas sing digawe kanthi ForkJoinPool
gawan yaiku benang daemon, beda karo benang sing digawe liwat ThreadPool
. Umumé, kudu dieling-eling babagan benang daemon, amarga ... contone, CompletableFuture
Utas daemon uga digunakake, yen sampeyan ora nemtokake dhewe ThreadFactory
, kang bakal nggawe Utas non-daemon. Iki minangka kejutan sing bisa nunggu sampeyan ing papan sing ora dikarepake!)
Fork / Join Pool
Ing bagean iki kita bakal pirembagan bab sijiForkJoinPool
(uga disebut garpu / joint framework) sing urip "ing hood" saka WorkStealingPool
. Umumé, Fork Join Framework muncul ing Java 1.7. Lan sanajan Jawa 11 wis ana ing plataran, isih kudu dieling-eling. Ora tugas sing paling umum, nanging cukup menarik. Ana review apik babagan topik iki ing Internet: " Fork / Join Framework in Java 7 ". Fork/JoinPool
makaryakke ing karya karo konsep kayata java.util.concurrent.RecursiveTask
. Ana uga analog - java.util.concurrent.RecursiveAction
. RecursiveActions ora ngasilake asil. Dadi RecursiveTask
padha karo Callable
, lan RecursiveAction
padha karo Runnable
. Inggih, looking ing jeneng, kita ndeleng rong cara tombol - fork
lan join
. Cara kasebut fork
nindakake tugas kanthi ora sinkron ing benang sing kapisah. Lan cara kasebut join
ngidini sampeyan ngenteni karya rampung. Ana sawetara cara kanggo nggunakake: Gambar iki minangka bagéan saka slide saka laporan Alexey Shipilev " Fork / Join: implementasine, nggunakake, kinerja ." Kanggo nggawe luwih cetha, iku worth nonton laporan ing JEE CONF: " Fork Join fitur implementasine ."
ngringkes
Dadi, ing kene kita ngrampungake bagean review sabanjure. We figured metu apa kita pisanan teka munggah karoExecutor
kanggo ngeksekusi Utas. Banjur kita mutusake kanggo nerusake ide kasebut lan nggawe ide kasebut ExecutorService
. ExecutorService
ngidini sampeyan ngirim tugas kanggo eksekusi nggunakake submit
lan invoke
, uga ngatur layanan kanthi mateni. Amarga ExecutorService
'kita butuh implementasine, kita nulis kelas kanthi metode pabrik lan diarani Executors
. Iki ngidini sampeyan nggawe pools thread ThreadPoolExecutor
. Ing wektu sing padha, ana pools thread sing uga ngidini sampeyan nemtokake jadwal eksekusi, nanging WorkStealingPool
ndhelikake ForkJoinPool
. Muga-muga apa sing ditulis ing ndhuwur ora mung menarik kanggo sampeyan, nanging uga bisa dingerteni) Aku tansah seneng nampa saran lan komentar. #Viacheslav
GO TO FULL VERSION