JavaRush /وبلاگ جاوا /Random-FA /شما نمی توانید جاوا را با یک موضوع خراب کنید: قسمت V - Ex...
Viacheslav
مرحله

شما نمی توانید جاوا را با یک موضوع خراب کنید: قسمت V - Executor، ThreadPool، Fork Join

در گروه منتشر شد

معرفی

بنابراین، ما می دانیم که موضوعاتی در جاوا وجود دارد که می توانید در بررسی " You Can't Spoil Java with a Thread: Part I - Threads " در مورد آنها بخوانید. شما نمی توانید جاوا را با یک Thread خراب کنید: قسمت V - Executor، ThreadPool، Fork Join - 1بیایید دوباره به نمونه کد نگاه کنیم:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
همانطور که می بینیم، کد راه اندازی وظیفه کاملا استاندارد است، اما برای هر راه اندازی جدید باید آن را تکرار کنیم. یک راه حل این است که آن را به یک روش جداگانه منتقل کنید، به عنوان مثال execute(Runnable runnable). اما توسعه دهندگان جاوا قبلاً نگران ما بوده اند و یک رابط را ارائه کرده اند Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
همانطور که می بینید، کد مختصرتر شده است و به ما اجازه می دهد تا به سادگی کد بنویسیم تا آن را Runnableدر یک رشته اجرا کنیم. عالی است، اینطور نیست؟ اما این تازه شروع کار است: شما نمی توانید جاوا را با یک موضوع خراب کنید: قسمت V - Executor، ThreadPool، Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

همانطور که می بینید، اینترفیس Executorدارای یک رابط نسل است ExecutorService. JavaDoc این رابط می گوید که ExecutorServiceتوصیفی از یک Executor'a' ویژه است که روش هایی برای توقف کار Executor'a' ارائه می دهد و به شما امکان می دهد java.util.concurrent.Futureپیشرفت اجرا را پیگیری کنید. قبلاً در « شما نمی‌توانید جاوا را با موضوع خراب کنید: قسمت چهارم - Callable، Future and Friends »، به طور خلاصه احتمالات را بررسی کردیم Future. اگر آن را فراموش کرده اید یا نخوانده اید، به شما توصیه می کنم که حافظه خود را تازه کنید ;) چه چیزهای جالب دیگری در JavaDoc نوشته شده است؟ اینکه ما یک کارخانه ویژه داریم java.util.concurrent.Executorsکه به ما امکان می دهد پیاده سازی هایی را ایجاد کنیم که به طور پیش فرض در دسترس هستند ExecutorService.

سرویس مجری

دوباره به یاد بیاوریم. ما باید Executorیک کار خاص را در یک نخ اجرا کنیم (یعنی اجرا کنیم)، زمانی که اجرای ایجاد یک نخ از ما پنهان است. ما ExecutorServiceیک ویژه داریم Executorکه مجموعه ای از قابلیت ها برای مدیریت پیشرفت اجرا دارد. و ما کارخانه ای داریم Executorsکه به شما امکان می دهد ایجاد کنید ExecutorService. بیایید خودمان این کار را انجام دهیم:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
همانطور که می بینیم، یک thread pool ثابت ( ) به اندازه 2 مشخص کرده ایم. Fixed Thread Poolپس از آن وظایف را یکی یکی به استخر ارسال می کنیم. هر وظیفه یک رشته ( String) حاوی نام رشته ( currentThread().getName()) برمی گرداند. مهم است که در پایان خاموش شود ExecutorService، زیرا در غیر این صورت برنامه ما خارج نخواهد شد. روش های کارخانه دیگری در کارخانه Executorsوجود دارد . به عنوان مثال، ما می‌توانیم یک Pool فقط از یک رشته ایجاد کنیم - newSingleThreadExecutorیا یک Pool با حافظه پنهان newCachedThreadPool، که در آن رشته‌ها اگر به مدت 1 دقیقه بیکار باشند، از استخر حذف می‌شوند. در واقع پشت اینها یک صف مسدود کنندهExecutorService وجود دارد که وظایف در آن قرار می گیرند و این وظایف از آنجا اجرا می شوند. اطلاعات بیشتر در مورد مسدود کردن صف ها را می توانید در ویدیوی " صف مسدود کردن - مجموعه های شماره 5 - جاوا پیشرفته " مشاهده کنید. همچنین می‌توانید بررسی « مسدود کردن صف‌های بسته همزمان » و پاسخ به سؤال « چه زمانی LinkedBlockingQueue را به ArrayBlockingQueue ترجیح دهیم؟ » را بخوانید. فوق العاده ساده شده - (صف مسدود کردن) یک رشته را در دو مورد مسدود می کند: BlockingQueue
  • یک رشته در حال تلاش برای گرفتن عناصر از یک صف خالی است
  • thread در تلاش است تا عناصر را در یک صف کامل قرار دهد
اگر به اجرای روش های کارخانه ای نگاه کنیم، می توانیم ساختار آنها را ببینیم. مثلا:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
یا
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
همانطور که می بینیم، پیاده سازی ها در روش های کارخانه ایجاد می شوند ExecutorService. و اساساً همین است ThreadPoolExecutor. فقط ویژگی هایی که بر کار تأثیر می گذارند تغییر می کنند. شما نمی توانید جاوا را با یک موضوع خراب کنید: قسمت V - Executor، ThreadPool، Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

همانطور که قبلا دیدیم، روش های داخل کارخانه ThreadPoolExecutor، . عملکرد تحت تأثیر مقادیری است که به عنوان حداکثر و حداقل رشته ارسال می شود و همچنین چه صفی استفاده می شود. و هر پیاده سازی رابط را می توان استفاده کرد java.util.concurrent.BlockingQueue. وقتی صحبت از ThreadPoolExecutor'ahs شد، باید به ویژگی های جالب در حین کار اشاره کرد. ThreadPoolExecutorبه عنوان مثال، اگر فضایی وجود نداشته باشد، نمی توانید وظایف را به آن ارسال کنید :
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
این کد با خطایی مانند:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
یعنی taskنمی توانید ارسال کنید، زیرا SynchronousQueueبه گونه ای طراحی شده است که در واقع از یک عنصر تشکیل شده است و به شما اجازه نمی دهد چیزهای بیشتری در آنجا قرار دهید. همانطور که می بینیم، queued tasksدر اینجا 0 وجود دارد و هیچ چیز عجیبی در این وجود ندارد، زیرا این خاص است SynchronousQueue- در واقع یک صف از 1 عنصر است که همیشه خالی است. (!) هنگامی که یک رشته یک عنصر را در صف قرار می دهد، منتظر می ماند تا رشته دیگری عنصر را از صف خارج کند. بنابراین، می توانیم با جایگزین کنیم new LinkedBlockingQueue<>(1)و آنچه در خطا نشان داده می شود تغییر می کند queued tasks = 1. زیرا صف فقط 1 عنصر است، پس نمی توانیم عنصر دوم را اضافه کنیم. و ما در این مورد سقوط خواهیم کرد. در ادامه موضوع صف، شایان ذکر است که کلاس ThreadPoolExecutorدارای روش های اضافی برای سرویس دهی به صف است. به عنوان مثال، این روش threadPoolExecutor.purge()تمام کارهای لغو شده را از صف حذف می کند تا فضایی در صف آزاد شود. یکی دیگر از ویژگی های جالب مربوط به صف، کنترل کننده وظیفه پذیرفته نشده است:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
به عنوان مثال، کنترل کننده به سادگی یک کلمه را Rejectedبرای هر امتناع از پذیرش یک کار در صف چاپ می کند. راحت است، اینطور نیست؟ علاوه بر این، ThreadPoolExecutorاو یک وارث جالب دارد - ScheduledThreadPoolExecutorکه ScheduledExecutorService. این امکان را برای انجام یک کار بر روی یک تایمر فراهم می کند.

ScheduledExecutorService

ExecutorServiceنوع ScheduledExecutorServiceبه شما امکان می دهد وظایف را طبق یک برنامه اجرا کنید. بیایید به یک مثال نگاه کنیم:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
اینجا همه چیز ساده است. وظایف ارسال می شوند، ما یک "کار برنامه ریزی شده" دریافت می کنیم java.util.concurrent.ScheduledFuture. مورد زیر نیز ممکن است با برنامه مفید باشد:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
در اینجا کار را می فرستیم Runnableتا با نرخ ثابت (Fixed Rate) با تاخیر مشخص اجرا شود. در این صورت پس از 1 ثانیه هر 2 ثانیه، شروع به اجرای کار کنید. گزینه مشابهی وجود دارد:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
اما در اینجا وظایف با یک فاصله معین بین اجرای وظایف مختلف اجرا می شوند. یعنی کار taskدر 1 ثانیه تکمیل می شود. بعد، به محض تکمیل، 2 ثانیه می گذرد و سپس یک کار جدید راه اندازی می شود. می توانید مطالب زیر را در مورد این موضوع بخوانید: شما نمی توانید جاوا را با یک موضوع خراب کنید: قسمت V - Executor، ThreadPool، Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

علاوه بر استخرهای نخ ذکر شده در بالا، یک مورد دیگر نیز وجود دارد. می توان گفت که او کمی خاص است. اسمش Work Stealing Pool است. به طور خلاصه، Work Stealing یک الگوریتم کاری است که در آن نخ های بیکار شروع به گرفتن وظایف از رشته های دیگر یا وظایف از صف عمومی می کنند. بیایید به یک مثال نگاه کنیم:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
اگر این کد را اجرا کنیم، ExecutorService5 رشته ایجاد می کند، زیرا هر رشته به صف انتظار در محل شی می پیوندد lock. قبلاً در مورد مانیتورها و قفل‌های روی آن در « شما نمی‌توانید جاوا را با موضوع خراب کنید: قسمت دوم - همگام‌سازی » صحبت کرده‌ایم . و اکنون آن را Executors.newCachedThreadPoolبا Executors.newWorkStealingPool(). چه چیزی تغییر خواهد کرد؟ خواهیم دید که وظایف ما نه در 5 رشته، بلکه در تعداد کمتری انجام می شود. به یاد داشته باشید که cachedThreadPoolبرای هر کار موضوع خود را ایجاد کرده اید؟ چون waitthread را مسدود می کرد، اما کارهای بعدی می خواستند اجرا شوند و تاپیک های جدیدی در pool برای آنها ایجاد شد. در مورد StealingPoolنخ ها، آنها برای همیشه در بیکار نخواهند بود wait، آنها شروع به اجرای وظایف همسایه می کنند. این چه تفاوتی با دیگر استخرهای نخ دارد WorkStealingPool؟ زیرا واقعاً چیزی جادویی در درون او زندگی می کند ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
در واقع یک تفاوت دیگر وجود دارد. رشته‌هایی که به‌طور ForkJoinPoolپیش‌فرض ایجاد می‌شوند، رشته‌های شبح هستند، برخلاف رشته‌هایی که از طریق معمولی ایجاد می‌شوند ThreadPool. به طور کلی، لازم است در مورد موضوعات دیمون یادآوری کنیم، زیرا ... به عنوان مثال، CompletableFutureنخ‌های شبح نیز استفاده می‌شوند، اگر نخ‌های خود را مشخص نکنید ThreadFactory، که رشته‌های غیر شبح ایجاد می‌کند. اینها از نوع شگفتی هایی هستند که می توانند در مکانی غیر منتظره در انتظار شما باشند!)

چنگال/به استخر بپیوندید

در این قسمت ما در مورد همان ForkJoinPool(که به آن چارچوب چنگال/پیوستن نیز گفته می شود) صحبت خواهیم کرد که در "زیر کاپوت" زندگی می کند WorkStealingPool. به طور کلی، Fork Join Framework در جاوا 1.7 ظاهر شد. و حتی اگر جاوا 11 از قبل در حیاط باشد، باز هم ارزش به خاطر سپردن دارد. رایج ترین کار نیست، اما بسیار جالب است. یک بررسی خوب در مورد این موضوع در اینترنت وجود دارد: " فورک/پیوستن چارچوب در جاوا 7 ". Fork/JoinPoolدر کار خود با مفهومی مانند java.util.concurrent.RecursiveTask. همچنین یک آنالوگ وجود دارد - java.util.concurrent.RecursiveAction. RecursiveActions نتیجه ای را بر نمی گرداند. بنابراین RecursiveTaskشبیه به Callableو RecursiveActionشبیه به Runnable. خوب، با نگاه کردن به نام، دو روش کلیدی را می بینیم - forkو join. این روش forkیک کار را به صورت ناهمزمان در یک رشته مجزا اجرا می کند. و این روش joinبه شما این امکان را می دهد که منتظر بمانید تا کار کامل شود. چندین راه برای استفاده از آن وجود دارد: شما نمی توانید جاوا را با یک موضوع خراب کنید: قسمت V - Executor، ThreadPool، Fork Join - 5این تصویر بخشی از اسلایدی از گزارش الکسی شیپیلف " Fork/Join: پیاده سازی، استفاده، عملکرد " ​​است. برای شفاف‌تر شدن، ارزش تماشای گزارش او در JEE CONF را دارد: « ویژگی‌های پیاده‌سازی Fork Join ».

خلاصه کردن

بنابراین، در اینجا ما در حال اتمام بخش بعدی بررسی هستیم. Executorما متوجه شدیم که اولین بار برای اجرای رشته ها به چه چیزی رسیدیم . سپس تصمیم گرفتیم این ایده را ادامه دهیم و به آن رسیدیم ExecutorService. به شما این امکان را می دهد که با استفاده از و ExecutorServiceوظایفی را برای اجرا ارسال کنید ، همچنین با خاموش کردن آن، سرویس را مدیریت کنید. زیرا ما به پیاده سازی نیاز داریم، یک کلاس با متدهای کارخانه نوشتیم و آن را نام بردیم . این به شما امکان می دهد تا استخرهای نخ ایجاد کنید . در عین حال، thread pool هایی وجود دارد که به شما امکان می دهد یک زمان بندی برای اجرا مشخص کنید، اما پنهان می کند . امیدوارم آنچه در بالا نوشته شد نه تنها برای شما جالب باشد، بلکه قابل درک باشد) من همیشه از دریافت پیشنهادات و نظرات خوشحالم. #ویاچسلاوsubmitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
نظرات
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION