JavaRush /مدونة جافا /Random-AR /لا يمكنك إفساد Java بخيط: الجزء الخامس - Executor، Thread...
Viacheslav
مستوى

لا يمكنك إفساد Java بخيط: الجزء الخامس - Executor، ThreadPool، Fork Join

نشرت في المجموعة

مقدمة

لذلك، نحن نعلم أن هناك مواضيع في جافا، والتي يمكنك أن تقرأ عنها في المراجعة " لا يمكنك إفساد جافا بموضوع: الجزء الأول - المواضيع ". لا يمكنك إفساد Java بخيط: الجزء الخامس - Executor، ThreadPool، Fork Join - 1دعونا نلقي نظرة على نموذج التعليمات البرمجية مرة أخرى:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
كما نرى، فإن رمز بدء المهمة قياسي تمامًا، ولكن لكل إطلاق جديد سيتعين علينا تكراره. أحد الحلول هو نقله إلى طريقة منفصلة، ​​على سبيل المثال execute(Runnable runnable). لكن مطوري Java قد قلقوا علينا بالفعل وتوصلوا إلى واجهة Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
كما ترون، أصبح الكود أكثر إيجازًا وسمح لنا بكتابة الكود ببساطة لتشغيله Runnableفي سلسلة رسائل. عظيم، أليس كذلك؟ ولكن هذا هو مجرد بداية: لا يمكنك إفساد Java بخيط: الجزء الخامس - Executor، ThreadPool، Fork Join - 2

https://docs.Oracle.com/javase/7/docs/api/Java/util/concurrent/Executor.html

كما ترى، Executorتحتوي الواجهة على واجهة تنازلية ExecutorService. يقول JavaDoc الخاص بهذه الواجهة إنه ExecutorServiceوصف لـ Executor"a" خاص يوفر طرقًا لإيقاف العمل Executor"a" ويسمح لك بتتبع java.util.concurrent.Futureتقدم التنفيذ. سابقًا، في " لا يمكنك إفساد Java بالخيط: الجزء الرابع - القابل للاستدعاء والمستقبل والأصدقاء "، قمنا بمراجعة الاحتمالات بإيجاز Future. إذا نسيته أو لم تقرأه، أنصحك بتحديث ذاكرتك؛) ما الأشياء الأخرى المثيرة للاهتمام في JavaDoc؟ أن لدينا مصنعًا خاصًا java.util.concurrent.Executorsيسمح لنا بإنشاء تطبيقات متاحة افتراضيًا ExecutorService.

ExecutorService

دعونا نتذكر مرة أخرى. يتعين علينا Executorتنفيذ (أي تنفيذ) مهمة معينة في سلسلة رسائل، عندما يكون تنفيذ إنشاء سلسلة رسائل مخفيًا عنا. لدينا ExecutorServiceواحدة خاصة Executorلديها مجموعة من الإمكانيات لإدارة تقدم التنفيذ. ولدينا مصنع Executorsيسمح لك بإنشاء ملفات ExecutorService. دعونا نفعل ذلك بأنفسنا الآن:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
كما نرى، لقد حددنا مجموعة مؤشرات ترابط ثابتة ( Fixed Thread Pool) بحجم 2. وبعد ذلك نرسل المهام إلى المجموعة واحدة تلو الأخرى. تقوم كل مهمة بإرجاع سلسلة ( String) تحتوي على اسم مؤشر الترابط ( currentThread().getName()). من المهم إيقاف التشغيل في النهاية ExecutorService، وإلا فلن يخرج برنامجنا. Executorsهناك طرق مصنعية أخرى في المصنع . على سبيل المثال، يمكننا إنشاء مجموعة من مؤشر ترابط واحد فقط - newSingleThreadExecutorأو مجموعة بها تخزين مؤقت newCachedThreadPool، حيث ستتم إزالة سلاسل الرسائل من المجموعة إذا كانت خاملة لمدة دقيقة واحدة. في الواقع، خلف هذه ExecutorServiceهناك قائمة انتظار محظورة يتم وضع المهام فيها ويتم تنفيذ هذه المهام منها. يمكن مشاهدة المزيد من المعلومات حول حظر قوائم الانتظار في الفيديو " حظر قائمة الانتظار - المجموعات رقم 5 - Java المتقدمة ". يمكنك أيضًا قراءة المراجعة " حظر قوائم الانتظار للحزمة المتزامنة " والإجابة على السؤال " متى تفضل LinkedBlockingQueue على ArrayBlockingQueue؟ " مبسط للغاية - BlockingQueue(قائمة الانتظار المحظورة) تحظر سلسلة رسائل، في حالتين:
  • يحاول الخيط الحصول على عناصر من قائمة انتظار فارغة
  • يحاول الخيط وضع العناصر في قائمة انتظار كاملة
إذا نظرنا إلى تنفيذ أساليب المصنع، يمكننا أن نرى كيف يتم تنظيمها. على سبيل المثال:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
أو
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
كما نرى، يتم إنشاء التطبيقات داخل أساليب المصنع ExecutorService. وهذا هو في الأساس ThreadPoolExecutor. فقط السمات التي تؤثر على العمل تتغير. لا يمكنك تدمير Java باستخدام مؤشر ترابط: الجزء الخامس - Executor، ThreadPool، Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

كما رأينا سابقًا، طرق داخل المصنع ThreadPoolExecutor. تتأثر الوظيفة بالقيم التي يتم تمريرها كحد أقصى وأدنى للخيوط، بالإضافة إلى قائمة الانتظار المستخدمة. ويمكن استخدام أي تطبيق للواجهة java.util.concurrent.BlockingQueue. عند الحديث عن ThreadPoolExecutor"آه"، تجدر الإشارة إلى ميزات مثيرة للاهتمام أثناء التشغيل. على سبيل المثال، لا يمكنك إرسال المهام إلى ThreadPoolExecutorإذا لم يكن هناك مساحة هناك:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
سيفشل هذا الرمز مع وجود خطأ مثل:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
أي أنه taskلا يمكنك التقديم، لأنه SynchronousQueueإنه مصمم بحيث يتكون بالفعل من عنصر واحد ولا يسمح لك بوضع المزيد هناك. كما نرى، queued tasksيوجد 0 هنا، ولا يوجد شيء غريب في هذا، لأن هذا محدد SynchronousQueue- في الواقع، إنها قائمة انتظار مكونة من عنصر واحد، وهو فارغ دائمًا. (!) عندما يقوم أحد الخيوط بوضع عنصر في قائمة الانتظار، فسوف ينتظر حتى يأخذ مؤشر ترابط آخر العنصر من قائمة الانتظار. ولذلك يمكننا أن نستبدل بـ new LinkedBlockingQueue<>(1)وسيتغير ما سيشار إليه في الخطأ queued tasks = 1. لأن قائمة الانتظار مكونة من عنصر واحد فقط، لذا لا يمكننا إضافة العنصر الثاني. وسوف نقع على هذا. استمرارًا لموضوع قائمة الانتظار، تجدر الإشارة إلى أن الفصل ThreadPoolExecutorلديه طرق إضافية لخدمة قائمة الانتظار. على سبيل المثال، سيقوم الأسلوب threadPoolExecutor.purge()بإزالة كافة المهام الملغاة من قائمة الانتظار لتحرير مساحة في قائمة الانتظار. ميزة أخرى مثيرة للاهتمام تتعلق بقائمة الانتظار هي معالج المهام غير المقبول:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
على سبيل المثال، يقوم المعالج ببساطة بطباعة كلمة Rejectedلكل رفض لقبول مهمة في قائمة الانتظار. مريحة، أليس كذلك؟ وبالإضافة إلى ذلك، ThreadPoolExecutorلديه وريث مثير للاهتمام - ScheduledThreadPoolExecutorوهو ScheduledExecutorService. ويوفر القدرة على أداء مهمة على جهاز توقيت.

خدمة تنفيذية مجدولة

ExecutorServiceالنوع ScheduledExecutorServiceيسمح لك بتشغيل المهام وفقًا لجدول زمني. لنلقي نظرة على مثال:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
كل شيء بسيط هنا. يتم إرسال المهام، ونتلقى "مهمة مجدولة" java.util.concurrent.ScheduledFuture. قد تكون الحالة التالية مفيدة أيضًا مع الجدول الزمني:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
هنا نرسل Runnableالمهمة ليتم تنفيذها بمعدل ثابت (Fixed Rate) مع تأخير معين. في هذه الحالة، بعد ثانية واحدة كل ثانيتين، ابدأ في تنفيذ المهمة. هناك خيار مماثل:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ولكن هنا يتم تنفيذ المهام بفاصل زمني محدد بين تنفيذ المهام المختلفة. أي أن المهمة taskستكتمل خلال ثانية واحدة. بعد ذلك، بمجرد اكتماله، ستمر ثانيتان، ثم سيتم إطلاق مهمة جديدة. يمكنك قراءة المواد التالية حول هذا الموضوع: لا يمكنك تدمير Java بخيط: الجزء الخامس - Executor، ThreadPool، Fork Join - 4

https://dzone.com/articles/diving-into-Java-8s-newworkstealingpools

WorkStealingPool

بالإضافة إلى تجمعات الخيوط المذكورة أعلاه، هناك واحدة أخرى. يمكنك القول أنه مميز بعض الشيء. اسمها هو تجمع سرقة العمل. باختصار، سرقة العمل هي خوارزمية عمل تبدأ فيها سلاسل العمليات الخاملة في أخذ المهام من سلاسل رسائل أخرى أو مهام من قائمة الانتظار العامة. لنلقي نظرة على مثال:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
إذا قمنا بتشغيل هذا الكود، ExecutorServiceفسيؤدي إلى إنشاء 5 سلاسل رسائل، لأن سينضم كل مؤشر ترابط إلى قائمة الانتظار في موقع الكائن lock. لقد ناقشنا بالفعل حول الشاشات والأقفال الموجودة عليها في " لا يمكنك إفساد Java بخيط: الجزء الثاني - المزامنة ". والآن سوف نستبدله Executors.newCachedThreadPoolبـ Executors.newWorkStealingPool(). ما الذي سيتغير؟ سنرى أن مهامنا لا يتم تنفيذها في 5 مواضيع، ولكن في عدد أقل. هل تتذكر أنك cachedThreadPoolأنشأت موضوعك الخاص لكل مهمة؟ لأنه waitقام بحظر مؤشر الترابط، ولكن المهام التالية مطلوب تنفيذها وتم إنشاء سلاسل رسائل جديدة لها في التجمع. في حالة StealingPoolالخيوط، لن تكون في وضع الخمول إلى الأبد wait، بل ستبدأ في تنفيذ المهام المجاورة. كيف يختلف هذا عن تجمعات الخيوط الأخرى WorkStealingPool؟ لأنه في الواقع هناك شيء سحري يعيش بداخله ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
هناك في الواقع فرق واحد آخر. المواضيع التي يتم إنشاؤها بشكل ForkJoinPoolافتراضي هي خيوط خفية، على عكس الخيوط التي تم إنشاؤها من خلال ThreadPool. بشكل عام، يجدر بنا أن نتذكر المواضيع الخفية، لأن... على سبيل المثال، CompletableFutureيتم أيضًا استخدام سلاسل الرسائل الخفية، إذا لم تحدد سلاسل المحادثات الخاصة بك ThreadFactory، والتي ستؤدي إلى إنشاء سلاسل رسائل غير خفية. هذه هي أنواع المفاجآت التي يمكن أن تنتظرك في مكان غير متوقع!)

شوكة/انضم إلى حمام السباحة

سنتحدث في هذا الجزء عن نفس الإطار ForkJoinPool(الذي يُسمى أيضًا إطار الشوكة/الانضمام) الذي يعيش "تحت الغطاء" لملفات WorkStealingPool. بشكل عام، ظهر Fork Join Framework في Java 1.7. وحتى لو كان Java 11 موجودًا بالفعل، فلا يزال يستحق التذكر. ليست المهمة الأكثر شيوعا، ولكنها مثيرة للاهتمام للغاية. توجد مراجعة جيدة حول هذا الموضوع على الإنترنت: " Fork/Join Framework in Java 7 ". Fork/JoinPoolيعمل في عمله بمفهوم مثل java.util.concurrent.RecursiveTask. هناك أيضًا نظير - java.util.concurrent.RecursiveAction. RecursiveActions لا ترجع نتيجة. وهكذا RecursiveTaskتشبه Callable، RecursiveActionوتشبه Runnable. حسنًا، بالنظر إلى الاسم، نرى طريقتين رئيسيتين - forkو join. تقوم الطريقة forkبتشغيل مهمة بشكل غير متزامن في موضوع منفصل. وتسمح لك الطريقة joinبالانتظار حتى يكتمل العمل. هناك عدة طرق لاستخدامها: لا يمكنك تدمير Java باستخدام مؤشر ترابط: الجزء الخامس - Executor، ThreadPool، Fork Join - 5هذه الصورة جزء من شريحة من تقرير أليكسي شيبيليف " الشوكة/الانضمام: التنفيذ، الاستخدام، الأداء ". لتوضيح الأمر أكثر، من المفيد مشاهدة تقريره في JEE CONF: " ميزات تنفيذ Fork Join ".

تلخيص

إذن، ها نحن قد أنهينا الجزء التالي من المراجعة. لقد اكتشفنا ما توصلنا إليه أولاً Executorلتنفيذ المواضيع. ومن ثم قررنا الاستمرار في الفكرة وتوصلنا إليها ExecutorService. ExecutorServiceيسمح لك بإرسال المهام للتنفيذ باستخدام submitو invoke، وكذلك إدارة الخدمة عن طريق إيقاف تشغيلها. لأن ExecutorService'نحن بحاجة إلى تطبيقات، لقد كتبنا فصلًا دراسيًا بأساليب المصنع وأطلقنا عليه اسم Executors. انها تسمح لك بإنشاء تجمعات المواضيع ThreadPoolExecutor. في الوقت نفسه، هناك تجمعات مؤشرات ترابط تسمح لك أيضًا بتحديد جدول زمني للتنفيذ، ولكنها WorkStealingPoolمخفية خلف ForkJoinPool. آمل أن ما كتب أعلاه لم يكن مثيرًا للاهتمام بالنسبة لك فحسب، بل كان مفهومًا أيضًا) ويسعدني دائمًا تلقي الاقتراحات والتعليقات. # فياتشيسلاف
تعليقات
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION