JavaRush /Blog Java /Random-MS /Anda Tidak Boleh Merosakkan Java dengan Thread: Bahagian ...

Anda Tidak Boleh Merosakkan Java dengan Thread: Bahagian V - Executor, ThreadPool, Fork Join

Diterbitkan dalam kumpulan

pengenalan

Jadi, kami tahu bahawa terdapat benang dalam Java, yang boleh anda baca dalam ulasan " Anda Tidak Boleh Merosakkan Java dengan Benang: Bahagian I - Benang ". Anda tidak boleh merosakkan Java dengan Thread: Bahagian V - Executor, ThreadPool, Fork Join - 1Mari lihat kod sampel sekali lagi:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Seperti yang kita lihat, kod untuk melancarkan tugas adalah agak standard, tetapi untuk setiap pelancaran baharu kita perlu mengulanginya. Satu penyelesaian adalah untuk memindahkannya ke kaedah yang berasingan, contohnya execute(Runnable runnable). Tetapi pembangun Java sudah bimbang tentang kami dan menghasilkan antara muka Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Seperti yang anda lihat, kod itu telah menjadi lebih ringkas dan membolehkan kami menulis kod untuk menjalankannya Runnabledalam urutan. Hebat, bukan? Tetapi ini hanya permulaan: Anda tidak boleh merosakkan Java dengan benang: Bahagian V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Seperti yang anda lihat, antara muka Executormempunyai antara muka turunan ExecutorService. JavaDoc antara muka ini mengatakan bahawa ExecutorServiceia adalah perihalan Executor'a' khas yang menyediakan kaedah untuk menghentikan kerja Executor'a' dan membolehkan anda menjejaki java.util.concurrent.Futurekemajuan pelaksanaan. Sebelum ini, dalam " You Can't Spoil Java with Thread: Part IV - Callable, Future and Friends, " kami menyemak secara ringkas kemungkinan Future. Jika anda terlupa atau belum membacanya, saya nasihatkan anda untuk menyegarkan ingatan anda ;) Apa lagi yang menarik dalam JavaDoc? Bahawa kami mempunyai kilang khas java.util.concurrent.Executorsyang membolehkan kami membuat pelaksanaan yang tersedia secara lalai ExecutorService.

Perkhidmatan Pelaksana

Mari kita ingat lagi. Kami perlu Executormelaksanakan (iaitu melaksanakan) tugas tertentu dalam utas, apabila pelaksanaan mencipta utas disembunyikan daripada kami. Kami mempunyai ExecutorServiceyang istimewa Executoryang mempunyai satu set keupayaan untuk menguruskan kemajuan pelaksanaan. Dan kami mempunyai kilang Executorsyang membolehkan anda mencipta ExecutorService. Mari lakukan sendiri sekarang:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Seperti yang kita dapat lihat, kami telah menetapkan kumpulan benang tetap ( Fixed Thread Pool) bersaiz 2. Selepas itu kami menghantar tugasan ke kumpulan satu demi satu. Setiap tugasan mengembalikan rentetan ( String) yang mengandungi nama benang ( currentThread().getName()). Adalah penting untuk menutup pada akhir ExecutorService, kerana jika tidak, program kami tidak akan keluar. ExecutorsTerdapat kaedah kilang lain di kilang . Sebagai contoh, kita boleh membuat kumpulan hanya satu utas - newSingleThreadExecutoratau kumpulan dengan caching newCachedThreadPool, yang mana utas akan dialih keluar daripada kumpulan jika ia melahu selama 1 minit. Sebenarnya, di belakang ini ExecutorServiceterdapat barisan menyekat di mana tugas diletakkan dan dari mana tugasan ini dilaksanakan. Maklumat lanjut tentang menyekat baris gilir boleh dilihat dalam video " Menyekat baris gilir - Koleksi #5 - Java Lanjutan ". Anda juga boleh membaca ulasan " Menyekat baris gilir pakej serentak " dan jawapan kepada soalan " Bilakah lebih suka LinkedBlockingQueue daripada ArrayBlockingQueue? " Sangat mudah - BlockingQueue(menyekat baris gilir) menyekat urutan, dalam dua kes:
  • benang cuba mendapatkan elemen daripada baris gilir kosong
  • benang cuba meletakkan elemen ke dalam baris gilir penuh
Jika kita melihat kepada pelaksanaan kaedah kilang, kita boleh melihat bagaimana ia distrukturkan. Sebagai contoh:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
atau
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Seperti yang kita lihat, pelaksanaan dibuat di dalam kaedah kilang ExecutorService. Dan itu pada asasnya ThreadPoolExecutor. Hanya atribut yang mempengaruhi kerja berubah. Anda tidak boleh merosakkan Java dengan benang: Bahagian V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/Fail:Thread_pool.svg

ThreadPoolExecutor

Seperti yang kita lihat sebelum ini, kaedah di dalam kilang ThreadPoolExecutor, . Fungsi dipengaruhi oleh nilai yang diluluskan sebagai benang maksimum dan minimum, serta baris gilir yang digunakan. Dan sebarang pelaksanaan antara muka boleh digunakan java.util.concurrent.BlockingQueue. Bercakap tentang ThreadPoolExecutor'ahs, perlu diperhatikan ciri-ciri menarik semasa operasi. Sebagai contoh, anda tidak boleh menghantar tugas ThreadPoolExecutorjika tiada ruang di sana:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Kod ini akan gagal dengan ralat seperti:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Iaitu, taskanda tidak boleh menyerahkan, kerana SynchronousQueueia direka sedemikian rupa sehingga ia sebenarnya terdiri daripada satu elemen dan tidak membenarkan anda meletakkan lebih banyak di sana. Seperti yang kita dapat lihat, queued tasksterdapat 0 di sini, dan tidak ada yang pelik dalam hal ini, kerana ini khusus SynchronousQueue- sebenarnya, ia adalah baris gilir 1 elemen, yang sentiasa kosong. (!) Apabila satu utas meletakkan elemen ke dalam baris gilir, ia akan menunggu sehingga utas lain mengambil elemen itu daripada baris gilir. Oleh itu, kita boleh menggantikan dengan new LinkedBlockingQueue<>(1)dan apa yang akan ditunjukkan dalam ralat akan berubah queued tasks = 1. Kerana baris gilir hanya 1 elemen, maka kita tidak boleh menambah yang kedua. Dan kita akan jatuh pada ini. Meneruskan tema baris gilir, perlu diperhatikan bahawa kelas ThreadPoolExecutormempunyai kaedah tambahan untuk memberi perkhidmatan kepada baris gilir. Sebagai contoh, kaedah threadPoolExecutor.purge()akan mengalih keluar semua tugas yang dibatalkan daripada baris gilir untuk mengosongkan ruang dalam baris gilir. Satu lagi ciri menarik yang berkaitan dengan baris gilir ialah pengendali tugas yang tidak diterima:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Sebagai contoh, pengendali hanya mencetak perkataan Rejecteduntuk setiap keengganan untuk menerima tugas ke dalam baris gilir. Mudah, bukan? Di samping itu, ThreadPoolExecutordia mempunyai waris yang menarik - ScheduledThreadPoolExecutoriaitu ScheduledExecutorService. Ia menyediakan keupayaan untuk melaksanakan tugas pada pemasa.

ScheduledExecutorService

ExecutorServicejenis ScheduledExecutorServicemembolehkan anda menjalankan tugas mengikut jadual. Mari lihat contoh:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Semuanya mudah di sini. Tugasan dihantar, kami menerima "tugas berjadual" java.util.concurrent.ScheduledFuture. Kes berikut juga mungkin berguna dengan jadual:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Di sini kami menghantar Runnabletugas untuk dilaksanakan pada kadar tetap (Kadar Tetap) dengan kelewatan tertentu. Dalam kes ini, selepas 1 saat setiap 2 saat, mula melaksanakan tugas. Terdapat pilihan yang serupa:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Tetapi di sini tugas dilaksanakan dengan selang waktu tertentu ANTARA pelaksanaan tugas yang berbeza. Maksudnya, tugasan taskakan selesai dalam masa 1 saat. Seterusnya, sebaik sahaja ia selesai, 2 saat akan berlalu, dan kemudian tugas baru akan dilancarkan. Anda boleh membaca bahan berikut mengenai topik ini: Anda tidak boleh merosakkan Java dengan benang: Bahagian V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Sebagai tambahan kepada kumpulan benang yang disebutkan di atas, terdapat satu lagi. Boleh kata dia istimewa sikit. Namanya Kolam Curi Kerja. Ringkasnya, Work Stealing ialah algoritma kerja di mana benang terbiar mula mengambil tugasan daripada benang lain atau tugasan daripada baris gilir umum. Mari lihat contoh:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Jika kita menjalankan kod ini, ExecutorServiceia akan mencipta 5 utas, kerana setiap utas akan menyertai baris gilir menunggu di lokasi objek lock. Kami telah membincangkan tentang monitor dan kunci padanya dalam " Anda Tidak Boleh Merosakkan Java dengan Benang: Bahagian II - Penyegerakan ." Dan sekarang kita akan menggantikannya Executors.newCachedThreadPooldengan Executors.newWorkStealingPool(). Apa yang akan berubah? Kami akan melihat bahawa tugas kami dilakukan bukan dalam 5 utas, tetapi dalam lebih sedikit. Ingat bahawa cachedThreadPoolanda membuat urutan anda sendiri untuk setiap tugas? Kerana waitia menyekat urutan, tetapi tugas seterusnya mahu dilaksanakan dan urutan baharu telah dibuat dalam kumpulan untuk mereka. Dalam kes StealingPoolbenang, mereka tidak akan melahu selama-lamanya dalam wait, mereka akan mula melaksanakan tugas jiran. Bagaimanakah ini berbeza daripada kumpulan benang lain WorkStealingPool? Kerana sebenarnya ada sesuatu yang ajaib hidup dalam dirinya ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Sebenarnya ada satu lagi perbezaan. Benang yang dicipta secara ForkJoinPoollalai ialah benang daemon, berbanding dengan benang yang dibuat melalui ThreadPool. Secara umum, perlu diingati tentang benang daemon, kerana... contohnya, CompletableFuturebenang daemon juga digunakan, jika anda tidak menentukan sendiri ThreadFactory, yang akan mencipta benang bukan daemon. Ini adalah jenis kejutan yang boleh menanti anda di tempat yang tidak dijangka!)

Fork/Join Pool

Dalam bahagian ini kita akan bercakap tentang yang sama ForkJoinPool(juga dipanggil rangka kerja garpu/sambung) yang hidup "di bawah tudung" WorkStealingPool. Secara umum, Rangka Kerja Gabungan Fork muncul dalam Java 1.7. Dan walaupun Java 11 sudah berada di halaman, ia masih patut diingati. Bukan tugas yang paling biasa, tetapi agak menarik. Terdapat ulasan yang baik tentang topik ini di Internet: " Rangka Kerja Fork/Join di Java 7 ". Fork/JoinPoolberoperasi dalam kerjanya dengan konsep seperti java.util.concurrent.RecursiveTask. Terdapat juga analog - java.util.concurrent.RecursiveAction. RecursiveActions tidak mengembalikan hasil. Oleh itu RecursiveTaskserupa dengan Callable, dan RecursiveActionserupa dengan Runnable. Nah, melihat nama, kita melihat dua kaedah utama - forkdan join. Kaedah ini forkmenjalankan tugas secara tidak segerak dalam benang yang berasingan. Dan kaedah itu joinmembolehkan anda menunggu sehingga kerja selesai. Terdapat beberapa cara untuk menggunakannya: Anda tidak boleh merosakkan Java dengan benang: Bahagian V - Executor, ThreadPool, Fork Join - 5Gambar ini adalah sebahagian daripada slaid daripada laporan Alexey Shipilev " Fork/Join: implementation, use, performance ." Untuk menjadikannya lebih jelas, anda patut menonton laporannya di JEE CONF: " Ciri pelaksanaan Fork Join ."

Merumuskan

Jadi, di sini kita sedang menyelesaikan bahagian semakan seterusnya. Kami mengetahui perkara yang pertama kali kami buat Executoruntuk melaksanakan utas. Kemudian kami memutuskan untuk meneruskan idea itu dan menghasilkannya ExecutorService. ExecutorServicemembolehkan anda menghantar tugasan untuk pelaksanaan menggunakan submitdan invoke, serta mengurus perkhidmatan dengan mematikannya. Kerana ExecutorService'kami memerlukan pelaksanaan, kami menulis kelas dengan kaedah kilang dan memanggilnya Executors. Ia membolehkan anda mencipta kumpulan benang ThreadPoolExecutor. Pada masa yang sama, terdapat kumpulan benang yang turut membenarkan anda menentukan jadual untuk pelaksanaan, tetapi ia WorkStealingPooltersembunyi di belakang ForkJoinPool. Saya harap apa yang ditulis di atas bukan sahaja menarik kepada anda, tetapi juga boleh difahami) Saya sentiasa gembira menerima cadangan dan komen. #Viacheslav
Komen
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION