JavaRush /Blog Jawa /Random-JV /Sampeyan Ora Bisa Ngrusak Jawa nganggo Utas: Bagian V - P...
Viacheslav
tingkat

Sampeyan Ora Bisa Ngrusak Jawa nganggo Utas: Bagian V - Pelaksana, ThreadPool, Fork Join

Diterbitake ing grup

Pambuka

Dadi, kita ngerti manawa ana utas ing Jawa, sing bisa diwaca ing review " Sampeyan Ora Bisa Ngrusak Jawa nganggo Utas: Bagian I - Utas ". Sampeyan ora bisa ngrusak Jawa nganggo Utas: Bagian V - Pelaksana, ThreadPool, Fork Join - 1Ayo ndeleng kode conto maneh:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Kaya sing kita deleng, kode kanggo ngluncurake tugas kasebut cukup standar, nanging kanggo saben peluncuran anyar kita kudu mbaleni. Siji solusi kanggo mindhah menyang cara sing kapisah, contone execute(Runnable runnable). Nanging pangembang Jawa wis kuwatir babagan kita lan nggawe antarmuka Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Nalika sampeyan bisa ndeleng, kode wis dadi luwih ringkes lan ngidini kita mung nulis kode kanggo mbukak Runnableing thread. Apik, ta? Nanging iki mung wiwitan: Sampeyan ora bisa ngrusak Jawa nganggo utas: Part V - Pelaksana, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Kaya sing sampeyan ngerteni, antarmuka kasebut Executorduwe antarmuka turunan ExecutorService. JavaDoc antarmuka iki ujar manawa ExecutorServiceminangka katrangan khusus Executor'a' sing nyedhiyakake cara kanggo mungkasi karya Executor'a' lan ngidini sampeyan nglacak java.util.concurrent.Futurekemajuan eksekusi. Sadurunge, ing " Sampeyan Ora Bisa Ngrusak Jawa nganggo Utas: Part IV - Bisa Ditelpon, Masa Depan lan Kanca-kanca, " kita nliti kemungkinan kasebut Future. Yen sampeyan kelalen utawa durung maca, Aku menehi saran supaya refresh memori;) Apa liyane menarik sing ditulis ing JavaDoc? Kita duwe pabrik khusus java.util.concurrent.Executorssing ngidini kita nggawe implementasi sing kasedhiya kanthi standar ExecutorService.

PelaksanaLayanan

Ayo eling maneh. Kita kudu Executornglakokake (yaiku nglakokake) tugas tartamtu ing thread, nalika implementasine nggawe thread didhelikake saka kita. Kita duwe ExecutorServicekhusus Executorsing nduweni kemampuan kanggo ngatur kemajuan eksekusi. Lan kita duwe pabrik Executorssing ngidini sampeyan nggawe ExecutorService. Ayo dhewe saiki:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Nalika kita bisa ndeleng, kita wis kasebut blumbang thread tetep ( Fixed Thread Pool) ukuran 2. Sawisé iku kita ngirim tugas kanggo blumbang siji. Saben tugas ngasilake string ( String) sing ngemot jeneng utas ( currentThread().getName()). Penting kanggo mateni ing pungkasan ExecutorService, amarga yen program kita ora bakal metu. ExecutorsAna cara pabrik liyane ing pabrik . Contone, kita bisa nggawe blumbang mung siji thread - newSingleThreadExecutorutawa blumbang karo caching newCachedThreadPool, ngendi Utas bakal dibusak saka blumbang yen lagi nganggur kanggo 1 menit. Nyatane, ing mburi iki ExecutorServiceana antrian pamblokiran ing ngendi tugas diselehake lan saka ngendi tugas kasebut dileksanakake. Informasi liyane babagan pamblokiran antrian bisa dideleng ing video " Antrian pamblokiran - Koleksi #5 - Jawa Lanjut ". Sampeyan uga bisa maca review " Watesan antrian paket bebarengan " lan jawaban kanggo pitakonan " Kapan luwih seneng LinkedBlockingQueue tinimbang ArrayBlockingQueue ? Super simplified - BlockingQueue(antri mblokir) mblokir thread, ing rong kasus:
  • Utas nyoba njupuk unsur saka antrian kosong
  • thread nyoba kanggo sijine unsur menyang antrian lengkap
Yen kita ndeleng implementasine metode pabrik, kita bisa ndeleng kepiye strukture. Tuladhane:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
utawa
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Kaya sing kita deleng, implementasine digawe ing metode pabrik ExecutorService. Lan iku dhasare ThreadPoolExecutor. Mung atribut sing mengaruhi owah-owahan karya. Sampeyan ora bisa ngrusak Jawa nganggo utas: Part V - Pelaksana, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Minangka sadurunge kita weruh, nang cara pabrik ThreadPoolExecutor,. Fungsi kasebut kena pengaruh apa nilai sing dilewati minangka utas maksimal lan minimal, uga antrian apa sing digunakake. Lan apa wae implementasi antarmuka bisa digunakake java.util.concurrent.BlockingQueue. Ngomong ThreadPoolExecutor'ahs, iku worth kang lagi nyimak fitur menarik sak operasi. Contone, sampeyan ora bisa ngirim tugas ThreadPoolExecutoryen ora ana spasi:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Kode iki bakal gagal karo kesalahan kaya:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Sing, tasksampeyan ora bisa ngirim, amarga SynchronousQueueiku dirancang ing kuwi cara sing bener kasusun saka siji unsur lan ora ngijini sampeyan kanggo sijine liyane ana. Nalika kita bisa ndeleng, queued tasksana 0 kene, lan ora ana sing aneh, amarga iki tartamtu SynchronousQueue- ing kasunyatan, iku antrian 1 unsur, kang tansah kosong. (!) Nalika siji thread nempatno unsur menyang antrian, bakal ngenteni nganti thread liyane njupuk unsur saka antrian. Mulane, kita bisa ngganti karo new LinkedBlockingQueue<>(1)lan apa sing bakal dituduhake ing kesalahan bakal diganti queued tasks = 1. Amarga antrian mung 1 unsur, banjur kita ora bisa nambah liyane. Lan kita bakal tiba ing iki. Terus tema antrian, iku worth kang lagi nyimak sing kelas ThreadPoolExecutorwis cara tambahan kanggo layanan antrian. Contone, cara kasebut threadPoolExecutor.purge()bakal mbusak kabeh tugas sing dibatalake saka antrian kanggo mbebasake spasi ing antrian. Fitur menarik liyane sing ana gandhengane karo antrian yaiku panangan tugas sing ora ditampa:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Contone, pawang mung nyithak tembung Rejectedkanggo saben penolakan nampa tugas menyang antrian. Trep, ta? Kajaba iku, ThreadPoolExecutordheweke duwe ahli waris sing menarik - ScheduledThreadPoolExecutoryaiku ScheduledExecutorService. Iki menehi kemampuan kanggo nindakake tugas ing timer.

ScheduledExecutorService

ExecutorServicejinis ScheduledExecutorServicengijini sampeyan kanggo mbukak tugas miturut jadwal. Ayo ndeleng conto:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Kabeh iku prasaja ing kene. Tugas dikirim, kita nampa "tugas sing dijadwalake" java.util.concurrent.ScheduledFuture. Kasus ing ngisor iki bisa uga migunani karo jadwal:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ing kene kita ngirim Runnabletugas kanggo dileksanakake ing tingkat tetep (Tarif Tetap) kanthi wektu tundha tartamtu. Ing kasus iki, sawise 1 detik saben 2 detik, miwiti nglakokaké tugas. Ana pilihan sing padha:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Nanging ing kene tugas dieksekusi kanthi interval tartamtu ANTARA eksekusi tugas sing beda-beda. Tegese, tugas taskbakal rampung ing 1 detik. Sabanjure, sanalika rampung, 2 detik bakal liwati, banjur tugas anyar bakal diluncurake. Sampeyan bisa maca bahan ing ngisor iki babagan topik iki: Sampeyan ora bisa ngrusak Jawa nganggo benang: Part V - Pelaksana, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Saliyane pools thread kasebut ing ndhuwur, ana siji liyane. Sampeyan bisa ngomong yen dheweke rada khusus. Jenenge Work Stealing Pool. Singkatnya, Work Stealing minangka algoritma kerja ing ngendi benang nganggur wiwit njupuk tugas saka utas utawa tugas liyane saka antrian umum. Ayo ndeleng conto:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Yen kita mbukak kode iki, ExecutorServicebakal nggawe 5 Utas, amarga saben utas bakal gabung karo antrian ngenteni ing lokasi obyek lock. Kita wis ngrembug babagan monitor lan kunci ing " Sampeyan ora bisa ngrusak Jawa nganggo Utas: Bagian II - sinkronisasi ." Lan saiki kita bakal ngganti Executors.newCachedThreadPoolkaro Executors.newWorkStealingPool(). Apa sing bakal diganti? Kita bakal weruh manawa tugas kita ora ditindakake ing 5 utas, nanging luwih sithik. Elinga yen cachedThreadPoolsampeyan nggawe thread dhewe kanggo saben tugas? Amarga waitdiblokir thread, nanging tugas sabanjuré wanted kanggo kaleksanan lan Utas anyar digawe ing blumbang kanggo wong-wong mau. Ing kasus StealingPoolbenang, dheweke ora bakal nganggur ing salawas-lawase wait; dheweke bakal miwiti nindakake tugas tetanggan. Kepiye bedane iki karo kolam benang liyane WorkStealingPool? Amarga ana sing gaib sing manggon ing njero dheweke ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Bener ana siji liyane prabédan. Utas sing digawe kanthi ForkJoinPoolgawan yaiku benang daemon, beda karo benang sing digawe liwat ThreadPool. Umumé, kudu dieling-eling babagan benang daemon, amarga ... contone, CompletableFutureUtas daemon uga digunakake, yen sampeyan ora nemtokake dhewe ThreadFactory, kang bakal nggawe Utas non-daemon. Iki minangka kejutan sing bisa nunggu sampeyan ing papan sing ora dikarepake!)

Fork / Join Pool

Ing bagean iki kita bakal pirembagan bab siji ForkJoinPool(uga disebut garpu / joint framework) sing urip "ing hood" saka WorkStealingPool. Umumé, Fork Join Framework muncul ing Java 1.7. Lan sanajan Jawa 11 wis ana ing plataran, isih kudu dieling-eling. Ora tugas sing paling umum, nanging cukup menarik. Ana review apik babagan topik iki ing Internet: " Fork / Join Framework in Java 7 ". Fork/JoinPoolmakaryakke ing karya karo konsep kayata java.util.concurrent.RecursiveTask. Ana uga analog - java.util.concurrent.RecursiveAction. RecursiveActions ora ngasilake asil. Dadi RecursiveTaskpadha karo Callable, lan RecursiveActionpadha karo Runnable. Inggih, looking ing jeneng, kita ndeleng rong cara tombol - forklan join. Cara kasebut forknindakake tugas kanthi ora sinkron ing benang sing kapisah. Lan cara kasebut joinngidini sampeyan ngenteni karya rampung. Ana sawetara cara kanggo nggunakake: Sampeyan ora bisa ngrusak Jawa nganggo benang: Part V - Pelaksana, ThreadPool, Fork Join - 5Gambar iki minangka bagéan saka slide saka laporan Alexey Shipilev " Fork / Join: implementasine, nggunakake, kinerja ." Kanggo nggawe luwih cetha, iku worth nonton laporan ing JEE CONF: " Fork Join fitur implementasine ."

ngringkes

Dadi, ing kene kita ngrampungake bagean review sabanjure. We figured metu apa kita pisanan teka munggah karo Executorkanggo ngeksekusi Utas. Banjur kita mutusake kanggo nerusake ide kasebut lan nggawe ide kasebut ExecutorService. ExecutorServicengidini sampeyan ngirim tugas kanggo eksekusi nggunakake submitlan invoke, uga ngatur layanan kanthi mateni. Amarga ExecutorService'kita butuh implementasine, kita nulis kelas kanthi metode pabrik lan diarani Executors. Iki ngidini sampeyan nggawe pools thread ThreadPoolExecutor. Ing wektu sing padha, ana pools thread sing uga ngidini sampeyan nemtokake jadwal eksekusi, nanging WorkStealingPoolndhelikake ForkJoinPool. Muga-muga apa sing ditulis ing ndhuwur ora mung menarik kanggo sampeyan, nanging uga bisa dingerteni) Aku tansah seneng nampa saran lan komentar. #Viacheslav
Komentar
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION