معرفی
بنابراین، ما می دانیم که موضوعاتی در جاوا وجود دارد که می توانید در بررسی " You Can't Spoil Java with a Thread: Part I - Threads " در مورد آنها بخوانید. بیایید دوباره به نمونه کد نگاه کنیم:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
همانطور که می بینیم، کد راه اندازی وظیفه کاملا استاندارد است، اما برای هر راه اندازی جدید باید آن را تکرار کنیم. یک راه حل این است که آن را به یک روش جداگانه منتقل کنید، به عنوان مثال execute(Runnable runnable)
. اما توسعه دهندگان جاوا قبلاً نگران ما بوده اند و یک رابط را ارائه کرده اند Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
همانطور که می بینید، کد مختصرتر شده است و به ما اجازه می دهد تا به سادگی کد بنویسیم تا آن را Runnable
در یک رشته اجرا کنیم. عالی است، اینطور نیست؟ اما این تازه شروع کار است:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
دارای یک رابط نسل است ExecutorService
. JavaDoc این رابط می گوید که ExecutorService
توصیفی از یک Executor
'a' ویژه است که روش هایی برای توقف کار Executor
'a' ارائه می دهد و به شما امکان می دهد java.util.concurrent.Future
پیشرفت اجرا را پیگیری کنید. قبلاً در « شما نمیتوانید جاوا را با موضوع خراب کنید: قسمت چهارم - Callable، Future and Friends »، به طور خلاصه احتمالات را بررسی کردیم Future
. اگر آن را فراموش کرده اید یا نخوانده اید، به شما توصیه می کنم که حافظه خود را تازه کنید ;) چه چیزهای جالب دیگری در JavaDoc نوشته شده است؟ اینکه ما یک کارخانه ویژه داریم java.util.concurrent.Executors
که به ما امکان می دهد پیاده سازی هایی را ایجاد کنیم که به طور پیش فرض در دسترس هستند ExecutorService
.
سرویس مجری
دوباره به یاد بیاوریم. ما بایدExecutor
یک کار خاص را در یک نخ اجرا کنیم (یعنی اجرا کنیم)، زمانی که اجرای ایجاد یک نخ از ما پنهان است. ما ExecutorService
یک ویژه داریم Executor
که مجموعه ای از قابلیت ها برای مدیریت پیشرفت اجرا دارد. و ما کارخانه ای داریم Executors
که به شما امکان می دهد ایجاد کنید ExecutorService
. بیایید خودمان این کار را انجام دهیم:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
همانطور که می بینیم، یک thread pool ثابت ( ) به اندازه 2 مشخص کرده ایم. Fixed Thread Pool
پس از آن وظایف را یکی یکی به استخر ارسال می کنیم. هر وظیفه یک رشته ( String
) حاوی نام رشته ( currentThread().getName()
) برمی گرداند. مهم است که در پایان خاموش شود ExecutorService
، زیرا در غیر این صورت برنامه ما خارج نخواهد شد. روش های کارخانه دیگری در کارخانه Executors
وجود دارد . به عنوان مثال، ما میتوانیم یک Pool فقط از یک رشته ایجاد کنیم - newSingleThreadExecutor
یا یک Pool با حافظه پنهان newCachedThreadPool
، که در آن رشتهها اگر به مدت 1 دقیقه بیکار باشند، از استخر حذف میشوند. در واقع پشت اینها یک صف مسدود کنندهExecutorService
وجود دارد که وظایف در آن قرار می گیرند و این وظایف از آنجا اجرا می شوند. اطلاعات بیشتر در مورد مسدود کردن صف ها را می توانید در ویدیوی " صف مسدود کردن - مجموعه های شماره 5 - جاوا پیشرفته " مشاهده کنید. همچنین میتوانید بررسی « مسدود کردن صفهای بسته همزمان » و پاسخ به سؤال « چه زمانی LinkedBlockingQueue را به ArrayBlockingQueue ترجیح دهیم؟ » را بخوانید. فوق العاده ساده شده - (صف مسدود کردن) یک رشته را در دو مورد مسدود می کند: BlockingQueue
- یک رشته در حال تلاش برای گرفتن عناصر از یک صف خالی است
- thread در تلاش است تا عناصر را در یک صف کامل قرار دهد
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
یا
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
همانطور که می بینیم، پیاده سازی ها در روش های کارخانه ایجاد می شوند ExecutorService
. و اساساً همین است ThreadPoolExecutor
. فقط ویژگی هایی که بر کار تأثیر می گذارند تغییر می کنند.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
همانطور که قبلا دیدیم، روش های داخل کارخانهThreadPoolExecutor
، . عملکرد تحت تأثیر مقادیری است که به عنوان حداکثر و حداقل رشته ارسال می شود و همچنین چه صفی استفاده می شود. و هر پیاده سازی رابط را می توان استفاده کرد java.util.concurrent.BlockingQueue
. وقتی صحبت از ThreadPoolExecutor
'ahs شد، باید به ویژگی های جالب در حین کار اشاره کرد. ThreadPoolExecutor
به عنوان مثال، اگر فضایی وجود نداشته باشد، نمی توانید وظایف را به آن ارسال کنید :
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
این کد با خطایی مانند:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
یعنی task
نمی توانید ارسال کنید، زیرا SynchronousQueue
به گونه ای طراحی شده است که در واقع از یک عنصر تشکیل شده است و به شما اجازه نمی دهد چیزهای بیشتری در آنجا قرار دهید. همانطور که می بینیم، queued tasks
در اینجا 0 وجود دارد و هیچ چیز عجیبی در این وجود ندارد، زیرا این خاص است SynchronousQueue
- در واقع یک صف از 1 عنصر است که همیشه خالی است. (!) هنگامی که یک رشته یک عنصر را در صف قرار می دهد، منتظر می ماند تا رشته دیگری عنصر را از صف خارج کند. بنابراین، می توانیم با جایگزین کنیم new LinkedBlockingQueue<>(1)
و آنچه در خطا نشان داده می شود تغییر می کند queued tasks = 1
. زیرا صف فقط 1 عنصر است، پس نمی توانیم عنصر دوم را اضافه کنیم. و ما در این مورد سقوط خواهیم کرد. در ادامه موضوع صف، شایان ذکر است که کلاس ThreadPoolExecutor
دارای روش های اضافی برای سرویس دهی به صف است. به عنوان مثال، این روش threadPoolExecutor.purge()
تمام کارهای لغو شده را از صف حذف می کند تا فضایی در صف آزاد شود. یکی دیگر از ویژگی های جالب مربوط به صف، کنترل کننده وظیفه پذیرفته نشده است:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
به عنوان مثال، کنترل کننده به سادگی یک کلمه را Rejected
برای هر امتناع از پذیرش یک کار در صف چاپ می کند. راحت است، اینطور نیست؟ علاوه بر این، ThreadPoolExecutor
او یک وارث جالب دارد - ScheduledThreadPoolExecutor
که ScheduledExecutorService
. این امکان را برای انجام یک کار بر روی یک تایمر فراهم می کند.
ScheduledExecutorService
ExecutorService
نوع ScheduledExecutorService
به شما امکان می دهد وظایف را طبق یک برنامه اجرا کنید. بیایید به یک مثال نگاه کنیم:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
اینجا همه چیز ساده است. وظایف ارسال می شوند، ما یک "کار برنامه ریزی شده" دریافت می کنیم java.util.concurrent.ScheduledFuture
. مورد زیر نیز ممکن است با برنامه مفید باشد:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
در اینجا کار را می فرستیم Runnable
تا با نرخ ثابت (Fixed Rate) با تاخیر مشخص اجرا شود. در این صورت پس از 1 ثانیه هر 2 ثانیه، شروع به اجرای کار کنید. گزینه مشابهی وجود دارد:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
اما در اینجا وظایف با یک فاصله معین بین اجرای وظایف مختلف اجرا می شوند. یعنی کار task
در 1 ثانیه تکمیل می شود. بعد، به محض تکمیل، 2 ثانیه می گذرد و سپس یک کار جدید راه اندازی می شود. می توانید مطالب زیر را در مورد این موضوع بخوانید:
- مقدمه ای بر استخرهای نخ
- مقدمه ای بر Thread Pools
- Java Multithreading Steeplechase: Canceling Tasks in Executors
- انتخاب اجرای صحیح جاوا برای کارهای پس زمینه
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
علاوه بر استخرهای نخ ذکر شده در بالا، یک مورد دیگر نیز وجود دارد. می توان گفت که او کمی خاص است. اسمش Work Stealing Pool است. به طور خلاصه، Work Stealing یک الگوریتم کاری است که در آن نخ های بیکار شروع به گرفتن وظایف از رشته های دیگر یا وظایف از صف عمومی می کنند. بیایید به یک مثال نگاه کنیم:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
اگر این کد را اجرا کنیم، ExecutorService
5 رشته ایجاد می کند، زیرا هر رشته به صف انتظار در محل شی می پیوندد lock
. قبلاً در مورد مانیتورها و قفلهای روی آن در « شما نمیتوانید جاوا را با موضوع خراب کنید: قسمت دوم - همگامسازی » صحبت کردهایم . و اکنون آن را Executors.newCachedThreadPool
با Executors.newWorkStealingPool()
. چه چیزی تغییر خواهد کرد؟ خواهیم دید که وظایف ما نه در 5 رشته، بلکه در تعداد کمتری انجام می شود. به یاد داشته باشید که cachedThreadPool
برای هر کار موضوع خود را ایجاد کرده اید؟ چون wait
thread را مسدود می کرد، اما کارهای بعدی می خواستند اجرا شوند و تاپیک های جدیدی در pool برای آنها ایجاد شد. در مورد StealingPool
نخ ها، آنها برای همیشه در بیکار نخواهند بود wait
، آنها شروع به اجرای وظایف همسایه می کنند. این چه تفاوتی با دیگر استخرهای نخ دارد WorkStealingPool
؟ زیرا واقعاً چیزی جادویی در درون او زندگی می کند ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
در واقع یک تفاوت دیگر وجود دارد. رشتههایی که بهطور ForkJoinPool
پیشفرض ایجاد میشوند، رشتههای شبح هستند، برخلاف رشتههایی که از طریق معمولی ایجاد میشوند ThreadPool
. به طور کلی، لازم است در مورد موضوعات دیمون یادآوری کنیم، زیرا ... به عنوان مثال، CompletableFuture
نخهای شبح نیز استفاده میشوند، اگر نخهای خود را مشخص نکنید ThreadFactory
، که رشتههای غیر شبح ایجاد میکند. اینها از نوع شگفتی هایی هستند که می توانند در مکانی غیر منتظره در انتظار شما باشند!)
چنگال/به استخر بپیوندید
در این قسمت ما در مورد همانForkJoinPool
(که به آن چارچوب چنگال/پیوستن نیز گفته می شود) صحبت خواهیم کرد که در "زیر کاپوت" زندگی می کند WorkStealingPool
. به طور کلی، Fork Join Framework در جاوا 1.7 ظاهر شد. و حتی اگر جاوا 11 از قبل در حیاط باشد، باز هم ارزش به خاطر سپردن دارد. رایج ترین کار نیست، اما بسیار جالب است. یک بررسی خوب در مورد این موضوع در اینترنت وجود دارد: " فورک/پیوستن چارچوب در جاوا 7 ". Fork/JoinPool
در کار خود با مفهومی مانند java.util.concurrent.RecursiveTask
. همچنین یک آنالوگ وجود دارد - java.util.concurrent.RecursiveAction
. RecursiveActions نتیجه ای را بر نمی گرداند. بنابراین RecursiveTask
شبیه به Callable
و RecursiveAction
شبیه به Runnable
. خوب، با نگاه کردن به نام، دو روش کلیدی را می بینیم - fork
و join
. این روش fork
یک کار را به صورت ناهمزمان در یک رشته مجزا اجرا می کند. و این روش join
به شما این امکان را می دهد که منتظر بمانید تا کار کامل شود. چندین راه برای استفاده از آن وجود دارد: این تصویر بخشی از اسلایدی از گزارش الکسی شیپیلف " Fork/Join: پیاده سازی، استفاده، عملکرد " است. برای شفافتر شدن، ارزش تماشای گزارش او در JEE CONF را دارد: « ویژگیهای پیادهسازی Fork Join ».
خلاصه کردن
بنابراین، در اینجا ما در حال اتمام بخش بعدی بررسی هستیم.Executor
ما متوجه شدیم که اولین بار برای اجرای رشته ها به چه چیزی رسیدیم . سپس تصمیم گرفتیم این ایده را ادامه دهیم و به آن رسیدیم ExecutorService
. به شما این امکان را می دهد که با استفاده از و ExecutorService
وظایفی را برای اجرا ارسال کنید ، همچنین با خاموش کردن آن، سرویس را مدیریت کنید. زیرا ما به پیاده سازی نیاز داریم، یک کلاس با متدهای کارخانه نوشتیم و آن را نام بردیم . این به شما امکان می دهد تا استخرهای نخ ایجاد کنید . در عین حال، thread pool هایی وجود دارد که به شما امکان می دهد یک زمان بندی برای اجرا مشخص کنید، اما پنهان می کند . امیدوارم آنچه در بالا نوشته شد نه تنها برای شما جالب باشد، بلکه قابل درک باشد) من همیشه از دریافت پیشنهادات و نظرات خوشحالم. #ویاچسلاوsubmit
invoke
ExecutorService
Executors
ThreadPoolExecutor
WorkStealingPool
ForkJoinPool
GO TO FULL VERSION