JavaRush /Java Blog /Random-ID /Anda Tidak Dapat Merusak Java dengan Thread: Bagian V - E...
Viacheslav
Level 3

Anda Tidak Dapat Merusak Java dengan Thread: Bagian V - Executor, ThreadPool, Fork Join

Dipublikasikan di grup Random-ID

Perkenalan

Jadi, kita tahu bahwa ada thread di Java, yang bisa Anda baca di ulasan “ Anda Tidak Dapat Memanjakan Java dengan Thread: Bagian I - Threads ”. Anda tidak dapat merusak Java dengan Thread: Part V - Executor, ThreadPool, Fork Join - 1Mari kita lihat kembali contoh kodenya:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Seperti yang bisa kita lihat, kode untuk meluncurkan tugas ini cukup standar, tetapi untuk setiap peluncuran baru kita harus mengulanginya. Salah satu solusinya adalah dengan memindahkannya ke metode terpisah, misalnya execute(Runnable runnable). Namun pengembang Java telah mengkhawatirkan kami dan menghasilkan antarmuka Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Seperti yang Anda lihat, kodenya menjadi lebih ringkas dan memungkinkan kami menulis kode untuk menjalankannya Runnabledi thread. Hebat, bukan? Tapi ini baru permulaan: Anda tidak dapat merusak Java dengan utas: Bagian V - Pelaksana, ThreadPool, Fork Join - 2

https://docs.Oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Seperti yang Anda lihat, antarmuka Executormemiliki antarmuka turunan ExecutorService. JavaDoc dari antarmuka ini mengatakan bahwa ExecutorServiceini adalah deskripsi Executor'a' khusus yang menyediakan metode untuk menghentikan pekerjaan Executor'a' dan memungkinkan Anda melacak java.util.concurrent.Futurekemajuan eksekusi. Sebelumnya, di “ Anda Tidak Dapat Memanjakan Java dengan Thread: Part IV - Callable, Future and Friends, ” kami meninjau secara singkat kemungkinannya Future. Jika Anda lupa atau belum membacanya, saya sarankan Anda menyegarkan ingatan Anda ;) Hal menarik apa lagi yang tertulis di JavaDoc? Bahwa kami memiliki pabrik khusus java.util.concurrent.Executorsyang memungkinkan kami membuat implementasi yang tersedia secara default ExecutorService.

Layanan Pelaksana

Mari kita ingat lagi. Kita harus Executormenjalankan (yaitu mengeksekusi) tugas tertentu di sebuah thread, ketika implementasi pembuatan thread disembunyikan dari kita. Kami memiliki ExecutorServiceyang khusus Executoryang memiliki serangkaian kemampuan untuk mengelola kemajuan eksekusi. Dan kami memiliki pabrik Executorsyang memungkinkan Anda membuat ExecutorService. Mari kita lakukan sendiri sekarang:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Seperti yang bisa kita lihat, kita telah menentukan kumpulan thread tetap ( Fixed Thread Pool) dengan ukuran 2. Setelah itu kita mengirimkan tugas ke kumpulan tersebut satu per satu. Setiap tugas mengembalikan string ( String) yang berisi nama thread ( currentThread().getName()). Penting untuk mematikannya di bagian paling akhir ExecutorService, karena jika tidak, program kita tidak akan keluar. ExecutorsAda metode pabrik lainnya di pabrik . Misalnya, kita dapat membuat kumpulan hanya satu thread - newSingleThreadExecutoratau kumpulan dengan caching newCachedThreadPool, di mana thread akan dihapus dari kumpulan jika tidak digunakan selama 1 menit. Faktanya, di balik ini ExecutorServiceterdapat antrian pemblokiran tempat tugas ditempatkan dan dari mana tugas tersebut dijalankan. Informasi lebih lanjut mengenai pemblokiran antrian dapat dilihat pada video " Pemblokiran antrian - Koleksi #5 - Java Tingkat Lanjut ". Anda juga dapat membaca ulasan “ Memblokir antrian paket bersamaan ” dan jawaban atas pertanyaan “ Kapan memilih LinkedBlockingQueue daripada ArrayBlockingQueue? ” Sangat disederhanakan - BlockingQueue(memblokir antrian) memblokir thread, dalam dua kasus:
  • sebuah utas mencoba mengambil elemen dari antrian kosong
  • utas mencoba memasukkan elemen ke dalam antrian penuh
Jika kita melihat penerapan metode pabrik, kita dapat melihat bagaimana strukturnya. Misalnya:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
atau
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Seperti yang bisa kita lihat, implementasi dibuat di dalam metode pabrik ExecutorService. Dan pada dasarnya itu saja ThreadPoolExecutor. Hanya atribut yang mempengaruhi pekerjaan yang berubah. Anda tidak dapat merusak Java dengan utas: Bagian V - Pelaksana, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

Pelaksana ThreadPool

Seperti yang kita lihat sebelumnya, metode di dalam pabrik ThreadPoolExecutor, . Fungsionalitasnya dipengaruhi oleh nilai apa yang diteruskan sebagai thread maksimum dan minimum, serta antrian apa yang digunakan. Dan implementasi antarmuka apa pun dapat digunakan java.util.concurrent.BlockingQueue. Berbicara tentang ThreadPoolExecutor'ahs, perlu diperhatikan fitur-fitur menarik selama pengoperasian. Misalnya, Anda tidak dapat mengirim tugas ThreadPoolExecutorjika tidak ada ruang di sana:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Kode ini akan gagal dengan kesalahan seperti:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Artinya, taskAnda tidak bisa tunduk, karena SynchronousQueueitu dirancang sedemikian rupa sehingga sebenarnya terdiri dari satu elemen dan tidak memungkinkan Anda untuk menempatkan lebih banyak di sana. Seperti yang bisa kita lihat, queued tasksada 0 di sini, dan tidak ada yang aneh dengan itu, karena ini spesifik SynchronousQueue- sebenarnya, ini adalah antrian 1 elemen, yang selalu kosong. (!) Ketika satu thread memasukkan elemen ke dalam antrian, thread tersebut akan menunggu hingga thread lain mengambil elemen tersebut dari antrian. Oleh karena itu, kita dapat menggantinya dengan new LinkedBlockingQueue<>(1)dan apa yang ditunjukkan dalam kesalahan akan berubah queued tasks = 1. Karena antriannya hanya 1 elemen, maka kita tidak bisa menambahkan yang kedua. Dan kita akan terjebak dalam hal ini. Melanjutkan tema antrian, perlu dicatat bahwa kelas ThreadPoolExecutormemiliki metode tambahan untuk melayani antrian. Misalnya, metode ini threadPoolExecutor.purge()akan menghapus semua tugas yang dibatalkan dari antrean untuk mengosongkan ruang dalam antrean. Fitur menarik lainnya yang terkait dengan antrian adalah penangan tugas yang tidak diterima:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Misalnya, pawang hanya mencetak sebuah kata Rejecteduntuk setiap penolakan menerima tugas ke dalam antrian. Nyaman, bukan? Selain itu, ThreadPoolExecutoria memiliki ahli waris yang menarik - ScheduledThreadPoolExecutoryaitu ScheduledExecutorService. Ini memberikan kemampuan untuk melakukan tugas pada pengatur waktu.

Layanan Pelaksana Terjadwal

ExecutorServiceketik ScheduledExecutorServicememungkinkan Anda menjalankan tugas sesuai jadwal. Mari kita lihat sebuah contoh:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Semuanya sederhana di sini. Tugas dikirim, kami menerima "tugas terjadwal" java.util.concurrent.ScheduledFuture. Kasus berikut mungkin juga berguna dengan jadwal:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Disini kami mengirimkan Runnabletugas untuk dieksekusi pada tingkat bunga tetap (Fixed Rate) dengan penundaan tertentu. Dalam hal ini, setelah 1 detik setiap 2 detik, mulailah menjalankan tugas. Ada opsi serupa:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Namun di sini tugas dijalankan dengan interval tertentu ANTARA pelaksanaan tugas yang berbeda. Artinya, tugas taskakan selesai dalam 1 detik. Selanjutnya, segera setelah selesai, 2 detik akan berlalu, dan tugas baru akan diluncurkan. Anda dapat membaca materi berikut tentang topik ini: Anda tidak dapat merusak Java dengan utas: Bagian V - Pelaksana, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

Kolam Pencurian Kerja

Selain kumpulan thread yang disebutkan di atas, ada satu lagi. Bisa dibilang dia sedikit istimewa. Namanya adalah Work Stealing Pool. Singkatnya, Work Stealing adalah algoritma kerja di mana thread yang menganggur mulai mengambil tugas dari thread lain atau tugas dari antrian umum. Mari kita lihat sebuah contoh:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Jika kita menjalankan kode ini, ExecutorServicemaka akan dibuat 5 thread, karena setiap thread akan bergabung dalam antrian tunggu di lokasi objek lock. Kita telah membahas tentang monitor dan kuncinya di “ Anda Tidak Dapat Memanjakan Java dengan Thread: Bagian II - Sinkronisasi .” Dan sekarang kita akan menggantinya Executors.newCachedThreadPooldengan Executors.newWorkStealingPool(). Apa yang akan berubah? Kita akan melihat bahwa tugas kita dilakukan bukan di 5 thread, tetapi dalam jumlah yang lebih sedikit. Ingat bahwa cachedThreadPoolAnda membuat thread sendiri untuk setiap tugas? Karena waititu memblokir utas, tetapi tugas berikutnya ingin dijalankan dan utas baru dibuat di kumpulan untuk mereka. Dalam kasus StealingPoolutas, mereka tidak akan menganggur selamanya wait, mereka akan mulai menjalankan tugas-tugas tetangga. Apa bedanya dengan kumpulan thread lainnya WorkStealingPool? Karena sebenarnya ada sesuatu yang ajaib yang hidup di dalam dirinya ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Sebenarnya ada satu perbedaan lagi. Thread yang dibuat secara ForkJoinPooldefault adalah thread daemon, bukan thread yang dibuat melalui ThreadPool. Secara umum, perlu diingat tentang utas daemon, karena... misalnya, CompletableFutureutas daemon juga digunakan, jika Anda tidak menentukan sendiri ThreadFactory, yang akan membuat utas non-daemon. Kejutan seperti inilah yang mungkin menanti Anda di tempat yang tidak terduga!)

Garpu/Gabung Kolam

Pada bagian ini kita akan berbicara tentang kerangka kerja yang sama ForkJoinPool(juga disebut kerangka fork/join) yang ada “di balik terpal” WorkStealingPool. Secara umum, Fork Join Framework muncul di Java 1.7. Dan meskipun Java 11 sudah ada, hal itu masih patut diingat. Bukan tugas yang paling umum, tapi cukup menarik. Ada ulasan bagus tentang topik ini di Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPoolberoperasi dalam karyanya dengan konsep seperti java.util.concurrent.RecursiveTask. Ada juga analognya - java.util.concurrent.RecursiveAction. RecursiveActions tidak memberikan hasil. Jadi RecursiveTaskmirip dengan Callable, dan RecursiveActionmirip dengan Runnable. Melihat namanya, kita melihat dua metode utama - forkdan join. Metode ini forkmenjalankan tugas secara asinkron di thread terpisah. Dan metode ini joinmemungkinkan Anda menunggu hingga pekerjaan selesai. Ada beberapa cara untuk menggunakannya: Anda tidak dapat merusak Java dengan utas: Bagian V - Pelaksana, ThreadPool, Fork Join - 5Gambar ini adalah bagian dari slide dari laporan Alexei Shipilev “ Fork/Join: implementasi, penggunaan, kinerja .” Untuk membuatnya lebih jelas, ada baiknya melihat laporannya di JEE CONF: “ Fitur implementasi Fork Join .”

Meringkas

Jadi, inilah kita, menyelesaikan ulasan bagian selanjutnya. Kami menemukan apa yang pertama kali kami pikirkan Executoruntuk mengeksekusi thread. Kemudian kami memutuskan untuk melanjutkan ide tersebut dan mewujudkannya ExecutorService. ExecutorServicememungkinkan Anda mengirim tugas untuk dieksekusi menggunakan submitdan invoke, serta mengelola layanan dengan mematikannya. Karena ExecutorService'kami memerlukan implementasi, kami menulis kelas dengan metode pabrik dan menamainya Executors. Ini memungkinkan Anda membuat kumpulan thread ThreadPoolExecutor. Pada saat yang sama, ada kumpulan thread yang juga memungkinkan Anda menentukan jadwal eksekusi, tetapi WorkStealingPooltersembunyi di balik ForkJoinPool. Saya harap apa yang ditulis di atas tidak hanya menarik bagi Anda, tetapi juga dapat dimengerti) Saya selalu senang menerima saran dan komentar. #Viacheslav
Komentar
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION