pengenalan
Jadi, kami tahu bahawa terdapat benang dalam Java, yang boleh anda baca dalam ulasan " Anda Tidak Boleh Merosakkan Java dengan Benang: Bahagian I - Benang ". Mari lihat kod sampel sekali lagi:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Seperti yang kita lihat, kod untuk melancarkan tugas adalah agak standard, tetapi untuk setiap pelancaran baharu kita perlu mengulanginya. Satu penyelesaian adalah untuk memindahkannya ke kaedah yang berasingan, contohnya execute(Runnable runnable)
. Tetapi pembangun Java sudah bimbang tentang kami dan menghasilkan antara muka Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Seperti yang anda lihat, kod itu telah menjadi lebih ringkas dan membolehkan kami menulis kod untuk menjalankannya Runnable
dalam urutan. Hebat, bukan? Tetapi ini hanya permulaan:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
mempunyai antara muka turunan ExecutorService
. JavaDoc antara muka ini mengatakan bahawa ExecutorService
ia adalah perihalan Executor
'a' khas yang menyediakan kaedah untuk menghentikan kerja Executor
'a' dan membolehkan anda menjejaki java.util.concurrent.Future
kemajuan pelaksanaan. Sebelum ini, dalam " You Can't Spoil Java with Thread: Part IV - Callable, Future and Friends, " kami menyemak secara ringkas kemungkinan Future
. Jika anda terlupa atau belum membacanya, saya nasihatkan anda untuk menyegarkan ingatan anda ;) Apa lagi yang menarik dalam JavaDoc? Bahawa kami mempunyai kilang khas java.util.concurrent.Executors
yang membolehkan kami membuat pelaksanaan yang tersedia secara lalai ExecutorService
.
Perkhidmatan Pelaksana
Mari kita ingat lagi. Kami perluExecutor
melaksanakan (iaitu melaksanakan) tugas tertentu dalam utas, apabila pelaksanaan mencipta utas disembunyikan daripada kami. Kami mempunyai ExecutorService
yang istimewa Executor
yang mempunyai satu set keupayaan untuk menguruskan kemajuan pelaksanaan. Dan kami mempunyai kilang Executors
yang membolehkan anda mencipta ExecutorService
. Mari lakukan sendiri sekarang:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Seperti yang kita dapat lihat, kami telah menetapkan kumpulan benang tetap ( Fixed Thread Pool
) bersaiz 2. Selepas itu kami menghantar tugasan ke kumpulan satu demi satu. Setiap tugasan mengembalikan rentetan ( String
) yang mengandungi nama benang ( currentThread().getName()
). Adalah penting untuk menutup pada akhir ExecutorService
, kerana jika tidak, program kami tidak akan keluar. Executors
Terdapat kaedah kilang lain di kilang . Sebagai contoh, kita boleh membuat kumpulan hanya satu utas - newSingleThreadExecutor
atau kumpulan dengan caching newCachedThreadPool
, yang mana utas akan dialih keluar daripada kumpulan jika ia melahu selama 1 minit. Sebenarnya, di belakang ini ExecutorService
terdapat barisan menyekat di mana tugas diletakkan dan dari mana tugasan ini dilaksanakan. Maklumat lanjut tentang menyekat baris gilir boleh dilihat dalam video " Menyekat baris gilir - Koleksi #5 - Java Lanjutan ". Anda juga boleh membaca ulasan " Menyekat baris gilir pakej serentak " dan jawapan kepada soalan " Bilakah lebih suka LinkedBlockingQueue daripada ArrayBlockingQueue? " Sangat mudah - BlockingQueue
(menyekat baris gilir) menyekat urutan, dalam dua kes:
- benang cuba mendapatkan elemen daripada baris gilir kosong
- benang cuba meletakkan elemen ke dalam baris gilir penuh
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
atau
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Seperti yang kita lihat, pelaksanaan dibuat di dalam kaedah kilang ExecutorService
. Dan itu pada asasnya ThreadPoolExecutor
. Hanya atribut yang mempengaruhi kerja berubah.
https://en.wikipedia.org/wiki/Thread_pool#/media/Fail:Thread_pool.svg
ThreadPoolExecutor
Seperti yang kita lihat sebelum ini, kaedah di dalam kilangThreadPoolExecutor
, . Fungsi dipengaruhi oleh nilai yang diluluskan sebagai benang maksimum dan minimum, serta baris gilir yang digunakan. Dan sebarang pelaksanaan antara muka boleh digunakan java.util.concurrent.BlockingQueue
. Bercakap tentang ThreadPoolExecutor
'ahs, perlu diperhatikan ciri-ciri menarik semasa operasi. Sebagai contoh, anda tidak boleh menghantar tugas ThreadPoolExecutor
jika tiada ruang di sana:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Kod ini akan gagal dengan ralat seperti:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Iaitu, task
anda tidak boleh menyerahkan, kerana SynchronousQueue
ia direka sedemikian rupa sehingga ia sebenarnya terdiri daripada satu elemen dan tidak membenarkan anda meletakkan lebih banyak di sana. Seperti yang kita dapat lihat, queued tasks
terdapat 0 di sini, dan tidak ada yang pelik dalam hal ini, kerana ini khusus SynchronousQueue
- sebenarnya, ia adalah baris gilir 1 elemen, yang sentiasa kosong. (!) Apabila satu utas meletakkan elemen ke dalam baris gilir, ia akan menunggu sehingga utas lain mengambil elemen itu daripada baris gilir. Oleh itu, kita boleh menggantikan dengan new LinkedBlockingQueue<>(1)
dan apa yang akan ditunjukkan dalam ralat akan berubah queued tasks = 1
. Kerana baris gilir hanya 1 elemen, maka kita tidak boleh menambah yang kedua. Dan kita akan jatuh pada ini. Meneruskan tema baris gilir, perlu diperhatikan bahawa kelas ThreadPoolExecutor
mempunyai kaedah tambahan untuk memberi perkhidmatan kepada baris gilir. Sebagai contoh, kaedah threadPoolExecutor.purge()
akan mengalih keluar semua tugas yang dibatalkan daripada baris gilir untuk mengosongkan ruang dalam baris gilir. Satu lagi ciri menarik yang berkaitan dengan baris gilir ialah pengendali tugas yang tidak diterima:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Sebagai contoh, pengendali hanya mencetak perkataan Rejected
untuk setiap keengganan untuk menerima tugas ke dalam baris gilir. Mudah, bukan? Di samping itu, ThreadPoolExecutor
dia mempunyai waris yang menarik - ScheduledThreadPoolExecutor
iaitu ScheduledExecutorService
. Ia menyediakan keupayaan untuk melaksanakan tugas pada pemasa.
ScheduledExecutorService
ExecutorService
jenis ScheduledExecutorService
membolehkan anda menjalankan tugas mengikut jadual. Mari lihat contoh:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Semuanya mudah di sini. Tugasan dihantar, kami menerima "tugas berjadual" java.util.concurrent.ScheduledFuture
. Kes berikut juga mungkin berguna dengan jadual:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Di sini kami menghantar Runnable
tugas untuk dilaksanakan pada kadar tetap (Kadar Tetap) dengan kelewatan tertentu. Dalam kes ini, selepas 1 saat setiap 2 saat, mula melaksanakan tugas. Terdapat pilihan yang serupa:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Tetapi di sini tugas dilaksanakan dengan selang waktu tertentu ANTARA pelaksanaan tugas yang berbeza. Maksudnya, tugasan task
akan selesai dalam masa 1 saat. Seterusnya, sebaik sahaja ia selesai, 2 saat akan berlalu, dan kemudian tugas baru akan dilancarkan. Anda boleh membaca bahan berikut mengenai topik ini:
- Pengenalan kepada kumpulan benang
- Pengenalan kepada Kolam Benang
- Java Multithreading Steeplechase: Membatalkan Tugas Dalam Pelaksana
- Memilih pelaksana Java yang betul untuk tugas latar belakang
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Sebagai tambahan kepada kumpulan benang yang disebutkan di atas, terdapat satu lagi. Boleh kata dia istimewa sikit. Namanya Kolam Curi Kerja. Ringkasnya, Work Stealing ialah algoritma kerja di mana benang terbiar mula mengambil tugasan daripada benang lain atau tugasan daripada baris gilir umum. Mari lihat contoh:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Jika kita menjalankan kod ini, ExecutorService
ia akan mencipta 5 utas, kerana setiap utas akan menyertai baris gilir menunggu di lokasi objek lock
. Kami telah membincangkan tentang monitor dan kunci padanya dalam " Anda Tidak Boleh Merosakkan Java dengan Benang: Bahagian II - Penyegerakan ." Dan sekarang kita akan menggantikannya Executors.newCachedThreadPool
dengan Executors.newWorkStealingPool()
. Apa yang akan berubah? Kami akan melihat bahawa tugas kami dilakukan bukan dalam 5 utas, tetapi dalam lebih sedikit. Ingat bahawa cachedThreadPool
anda membuat urutan anda sendiri untuk setiap tugas? Kerana wait
ia menyekat urutan, tetapi tugas seterusnya mahu dilaksanakan dan urutan baharu telah dibuat dalam kumpulan untuk mereka. Dalam kes StealingPool
benang, mereka tidak akan melahu selama-lamanya dalam wait
, mereka akan mula melaksanakan tugas jiran. Bagaimanakah ini berbeza daripada kumpulan benang lain WorkStealingPool
? Kerana sebenarnya ada sesuatu yang ajaib hidup dalam dirinya ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Sebenarnya ada satu lagi perbezaan. Benang yang dicipta secara ForkJoinPool
lalai ialah benang daemon, berbanding dengan benang yang dibuat melalui ThreadPool
. Secara umum, perlu diingati tentang benang daemon, kerana... contohnya, CompletableFuture
benang daemon juga digunakan, jika anda tidak menentukan sendiri ThreadFactory
, yang akan mencipta benang bukan daemon. Ini adalah jenis kejutan yang boleh menanti anda di tempat yang tidak dijangka!)
Fork/Join Pool
Dalam bahagian ini kita akan bercakap tentang yang samaForkJoinPool
(juga dipanggil rangka kerja garpu/sambung) yang hidup "di bawah tudung" WorkStealingPool
. Secara umum, Rangka Kerja Gabungan Fork muncul dalam Java 1.7. Dan walaupun Java 11 sudah berada di halaman, ia masih patut diingati. Bukan tugas yang paling biasa, tetapi agak menarik. Terdapat ulasan yang baik tentang topik ini di Internet: " Rangka Kerja Fork/Join di Java 7 ". Fork/JoinPool
beroperasi dalam kerjanya dengan konsep seperti java.util.concurrent.RecursiveTask
. Terdapat juga analog - java.util.concurrent.RecursiveAction
. RecursiveActions tidak mengembalikan hasil. Oleh itu RecursiveTask
serupa dengan Callable
, dan RecursiveAction
serupa dengan Runnable
. Nah, melihat nama, kita melihat dua kaedah utama - fork
dan join
. Kaedah ini fork
menjalankan tugas secara tidak segerak dalam benang yang berasingan. Dan kaedah itu join
membolehkan anda menunggu sehingga kerja selesai. Terdapat beberapa cara untuk menggunakannya: Gambar ini adalah sebahagian daripada slaid daripada laporan Alexey Shipilev " Fork/Join: implementation, use, performance ." Untuk menjadikannya lebih jelas, anda patut menonton laporannya di JEE CONF: " Ciri pelaksanaan Fork Join ."
Merumuskan
Jadi, di sini kita sedang menyelesaikan bahagian semakan seterusnya. Kami mengetahui perkara yang pertama kali kami buatExecutor
untuk melaksanakan utas. Kemudian kami memutuskan untuk meneruskan idea itu dan menghasilkannya ExecutorService
. ExecutorService
membolehkan anda menghantar tugasan untuk pelaksanaan menggunakan submit
dan invoke
, serta mengurus perkhidmatan dengan mematikannya. Kerana ExecutorService
'kami memerlukan pelaksanaan, kami menulis kelas dengan kaedah kilang dan memanggilnya Executors
. Ia membolehkan anda mencipta kumpulan benang ThreadPoolExecutor
. Pada masa yang sama, terdapat kumpulan benang yang turut membenarkan anda menentukan jadual untuk pelaksanaan, tetapi ia WorkStealingPool
tersembunyi di belakang ForkJoinPool
. Saya harap apa yang ditulis di atas bukan sahaja menarik kepada anda, tetapi juga boleh difahami) Saya sentiasa gembira menerima cadangan dan komen. #Viacheslav
GO TO FULL VERSION