مقدمة
لذلك، نحن نعلم أن هناك مواضيع في جافا، والتي يمكنك أن تقرأ عنها في المراجعة " لا يمكنك إفساد جافا بموضوع: الجزء الأول - المواضيع ". دعونا نلقي نظرة على نموذج التعليمات البرمجية مرة أخرى:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
كما نرى، فإن رمز بدء المهمة قياسي تمامًا، ولكن لكل إطلاق جديد سيتعين علينا تكراره. أحد الحلول هو نقله إلى طريقة منفصلة، على سبيل المثال execute(Runnable runnable)
. لكن مطوري Java قد قلقوا علينا بالفعل وتوصلوا إلى واجهة Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
كما ترون، أصبح الكود أكثر إيجازًا وسمح لنا بكتابة الكود ببساطة لتشغيله Runnable
في سلسلة رسائل. عظيم، أليس كذلك؟ ولكن هذا هو مجرد بداية:
https://docs.Oracle.com/javase/7/docs/api/Java/util/concurrent/Executor.html
Executor
تحتوي الواجهة على واجهة تنازلية ExecutorService
. يقول JavaDoc الخاص بهذه الواجهة إنه ExecutorService
وصف لـ Executor
"a" خاص يوفر طرقًا لإيقاف العمل Executor
"a" ويسمح لك بتتبع java.util.concurrent.Future
تقدم التنفيذ. سابقًا، في " لا يمكنك إفساد Java بالخيط: الجزء الرابع - القابل للاستدعاء والمستقبل والأصدقاء "، قمنا بمراجعة الاحتمالات بإيجاز Future
. إذا نسيته أو لم تقرأه، أنصحك بتحديث ذاكرتك؛) ما الأشياء الأخرى المثيرة للاهتمام في JavaDoc؟ أن لدينا مصنعًا خاصًا java.util.concurrent.Executors
يسمح لنا بإنشاء تطبيقات متاحة افتراضيًا ExecutorService
.
ExecutorService
دعونا نتذكر مرة أخرى. يتعين عليناExecutor
تنفيذ (أي تنفيذ) مهمة معينة في سلسلة رسائل، عندما يكون تنفيذ إنشاء سلسلة رسائل مخفيًا عنا. لدينا ExecutorService
واحدة خاصة Executor
لديها مجموعة من الإمكانيات لإدارة تقدم التنفيذ. ولدينا مصنع Executors
يسمح لك بإنشاء ملفات ExecutorService
. دعونا نفعل ذلك بأنفسنا الآن:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
كما نرى، لقد حددنا مجموعة مؤشرات ترابط ثابتة ( Fixed Thread Pool
) بحجم 2. وبعد ذلك نرسل المهام إلى المجموعة واحدة تلو الأخرى. تقوم كل مهمة بإرجاع سلسلة ( String
) تحتوي على اسم مؤشر الترابط ( currentThread().getName()
). من المهم إيقاف التشغيل في النهاية ExecutorService
، وإلا فلن يخرج برنامجنا. Executors
هناك طرق مصنعية أخرى في المصنع . على سبيل المثال، يمكننا إنشاء مجموعة من مؤشر ترابط واحد فقط - newSingleThreadExecutor
أو مجموعة بها تخزين مؤقت newCachedThreadPool
، حيث ستتم إزالة سلاسل الرسائل من المجموعة إذا كانت خاملة لمدة دقيقة واحدة. في الواقع، خلف هذه ExecutorService
هناك قائمة انتظار محظورة يتم وضع المهام فيها ويتم تنفيذ هذه المهام منها. يمكن مشاهدة المزيد من المعلومات حول حظر قوائم الانتظار في الفيديو " حظر قائمة الانتظار - المجموعات رقم 5 - Java المتقدمة ". يمكنك أيضًا قراءة المراجعة " حظر قوائم الانتظار للحزمة المتزامنة " والإجابة على السؤال " متى تفضل LinkedBlockingQueue على ArrayBlockingQueue؟ " مبسط للغاية - BlockingQueue
(قائمة الانتظار المحظورة) تحظر سلسلة رسائل، في حالتين:
- يحاول الخيط الحصول على عناصر من قائمة انتظار فارغة
- يحاول الخيط وضع العناصر في قائمة انتظار كاملة
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
أو
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
كما نرى، يتم إنشاء التطبيقات داخل أساليب المصنع ExecutorService
. وهذا هو في الأساس ThreadPoolExecutor
. فقط السمات التي تؤثر على العمل تتغير.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
كما رأينا سابقًا، طرق داخل المصنعThreadPoolExecutor
. تتأثر الوظيفة بالقيم التي يتم تمريرها كحد أقصى وأدنى للخيوط، بالإضافة إلى قائمة الانتظار المستخدمة. ويمكن استخدام أي تطبيق للواجهة java.util.concurrent.BlockingQueue
. عند الحديث عن ThreadPoolExecutor
"آه"، تجدر الإشارة إلى ميزات مثيرة للاهتمام أثناء التشغيل. على سبيل المثال، لا يمكنك إرسال المهام إلى ThreadPoolExecutor
إذا لم يكن هناك مساحة هناك:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
سيفشل هذا الرمز مع وجود خطأ مثل:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
أي أنه task
لا يمكنك التقديم، لأنه SynchronousQueue
إنه مصمم بحيث يتكون بالفعل من عنصر واحد ولا يسمح لك بوضع المزيد هناك. كما نرى، queued tasks
يوجد 0 هنا، ولا يوجد شيء غريب في هذا، لأن هذا محدد SynchronousQueue
- في الواقع، إنها قائمة انتظار مكونة من عنصر واحد، وهو فارغ دائمًا. (!) عندما يقوم أحد الخيوط بوضع عنصر في قائمة الانتظار، فسوف ينتظر حتى يأخذ مؤشر ترابط آخر العنصر من قائمة الانتظار. ولذلك يمكننا أن نستبدل بـ new LinkedBlockingQueue<>(1)
وسيتغير ما سيشار إليه في الخطأ queued tasks = 1
. لأن قائمة الانتظار مكونة من عنصر واحد فقط، لذا لا يمكننا إضافة العنصر الثاني. وسوف نقع على هذا. استمرارًا لموضوع قائمة الانتظار، تجدر الإشارة إلى أن الفصل ThreadPoolExecutor
لديه طرق إضافية لخدمة قائمة الانتظار. على سبيل المثال، سيقوم الأسلوب threadPoolExecutor.purge()
بإزالة كافة المهام الملغاة من قائمة الانتظار لتحرير مساحة في قائمة الانتظار. ميزة أخرى مثيرة للاهتمام تتعلق بقائمة الانتظار هي معالج المهام غير المقبول:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
على سبيل المثال، يقوم المعالج ببساطة بطباعة كلمة Rejected
لكل رفض لقبول مهمة في قائمة الانتظار. مريحة، أليس كذلك؟ وبالإضافة إلى ذلك، ThreadPoolExecutor
لديه وريث مثير للاهتمام - ScheduledThreadPoolExecutor
وهو ScheduledExecutorService
. ويوفر القدرة على أداء مهمة على جهاز توقيت.
خدمة تنفيذية مجدولة
ExecutorService
النوع ScheduledExecutorService
يسمح لك بتشغيل المهام وفقًا لجدول زمني. لنلقي نظرة على مثال:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
كل شيء بسيط هنا. يتم إرسال المهام، ونتلقى "مهمة مجدولة" java.util.concurrent.ScheduledFuture
. قد تكون الحالة التالية مفيدة أيضًا مع الجدول الزمني:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
هنا نرسل Runnable
المهمة ليتم تنفيذها بمعدل ثابت (Fixed Rate) مع تأخير معين. في هذه الحالة، بعد ثانية واحدة كل ثانيتين، ابدأ في تنفيذ المهمة. هناك خيار مماثل:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ولكن هنا يتم تنفيذ المهام بفاصل زمني محدد بين تنفيذ المهام المختلفة. أي أن المهمة task
ستكتمل خلال ثانية واحدة. بعد ذلك، بمجرد اكتماله، ستمر ثانيتان، ثم سيتم إطلاق مهمة جديدة. يمكنك قراءة المواد التالية حول هذا الموضوع:
- مقدمة لبرك الموضوع
- مقدمة إلى تجمعات الموضوع
- Java Multithreading Steeplechase: إلغاء المهام في المنفذين
- اختيار منفذي Java الصحيحين لمهام الخلفية
https://dzone.com/articles/diving-into-Java-8s-newworkstealingpools
WorkStealingPool
بالإضافة إلى تجمعات الخيوط المذكورة أعلاه، هناك واحدة أخرى. يمكنك القول أنه مميز بعض الشيء. اسمها هو تجمع سرقة العمل. باختصار، سرقة العمل هي خوارزمية عمل تبدأ فيها سلاسل العمليات الخاملة في أخذ المهام من سلاسل رسائل أخرى أو مهام من قائمة الانتظار العامة. لنلقي نظرة على مثال:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
إذا قمنا بتشغيل هذا الكود، ExecutorService
فسيؤدي إلى إنشاء 5 سلاسل رسائل، لأن سينضم كل مؤشر ترابط إلى قائمة الانتظار في موقع الكائن lock
. لقد ناقشنا بالفعل حول الشاشات والأقفال الموجودة عليها في " لا يمكنك إفساد Java بخيط: الجزء الثاني - المزامنة ". والآن سوف نستبدله Executors.newCachedThreadPool
بـ Executors.newWorkStealingPool()
. ما الذي سيتغير؟ سنرى أن مهامنا لا يتم تنفيذها في 5 مواضيع، ولكن في عدد أقل. هل تتذكر أنك cachedThreadPool
أنشأت موضوعك الخاص لكل مهمة؟ لأنه wait
قام بحظر مؤشر الترابط، ولكن المهام التالية مطلوب تنفيذها وتم إنشاء سلاسل رسائل جديدة لها في التجمع. في حالة StealingPool
الخيوط، لن تكون في وضع الخمول إلى الأبد wait
، بل ستبدأ في تنفيذ المهام المجاورة. كيف يختلف هذا عن تجمعات الخيوط الأخرى WorkStealingPool
؟ لأنه في الواقع هناك شيء سحري يعيش بداخله ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
هناك في الواقع فرق واحد آخر. المواضيع التي يتم إنشاؤها بشكل ForkJoinPool
افتراضي هي خيوط خفية، على عكس الخيوط التي تم إنشاؤها من خلال ThreadPool
. بشكل عام، يجدر بنا أن نتذكر المواضيع الخفية، لأن... على سبيل المثال، CompletableFuture
يتم أيضًا استخدام سلاسل الرسائل الخفية، إذا لم تحدد سلاسل المحادثات الخاصة بك ThreadFactory
، والتي ستؤدي إلى إنشاء سلاسل رسائل غير خفية. هذه هي أنواع المفاجآت التي يمكن أن تنتظرك في مكان غير متوقع!)
شوكة/انضم إلى حمام السباحة
سنتحدث في هذا الجزء عن نفس الإطارForkJoinPool
(الذي يُسمى أيضًا إطار الشوكة/الانضمام) الذي يعيش "تحت الغطاء" لملفات WorkStealingPool
. بشكل عام، ظهر Fork Join Framework في Java 1.7. وحتى لو كان Java 11 موجودًا بالفعل، فلا يزال يستحق التذكر. ليست المهمة الأكثر شيوعا، ولكنها مثيرة للاهتمام للغاية. توجد مراجعة جيدة حول هذا الموضوع على الإنترنت: " Fork/Join Framework in Java 7 ". Fork/JoinPool
يعمل في عمله بمفهوم مثل java.util.concurrent.RecursiveTask
. هناك أيضًا نظير - java.util.concurrent.RecursiveAction
. RecursiveActions لا ترجع نتيجة. وهكذا RecursiveTask
تشبه Callable
، RecursiveAction
وتشبه Runnable
. حسنًا، بالنظر إلى الاسم، نرى طريقتين رئيسيتين - fork
و join
. تقوم الطريقة fork
بتشغيل مهمة بشكل غير متزامن في موضوع منفصل. وتسمح لك الطريقة join
بالانتظار حتى يكتمل العمل. هناك عدة طرق لاستخدامها: هذه الصورة جزء من شريحة من تقرير أليكسي شيبيليف " الشوكة/الانضمام: التنفيذ، الاستخدام، الأداء ". لتوضيح الأمر أكثر، من المفيد مشاهدة تقريره في JEE CONF: " ميزات تنفيذ Fork Join ".
تلخيص
إذن، ها نحن قد أنهينا الجزء التالي من المراجعة. لقد اكتشفنا ما توصلنا إليه أولاًExecutor
لتنفيذ المواضيع. ومن ثم قررنا الاستمرار في الفكرة وتوصلنا إليها ExecutorService
. ExecutorService
يسمح لك بإرسال المهام للتنفيذ باستخدام submit
و invoke
، وكذلك إدارة الخدمة عن طريق إيقاف تشغيلها. لأن ExecutorService
'نحن بحاجة إلى تطبيقات، لقد كتبنا فصلًا دراسيًا بأساليب المصنع وأطلقنا عليه اسم Executors
. انها تسمح لك بإنشاء تجمعات المواضيع ThreadPoolExecutor
. في الوقت نفسه، هناك تجمعات مؤشرات ترابط تسمح لك أيضًا بتحديد جدول زمني للتنفيذ، ولكنها WorkStealingPool
مخفية خلف ForkJoinPool
. آمل أن ما كتب أعلاه لم يكن مثيرًا للاهتمام بالنسبة لك فحسب، بل كان مفهومًا أيضًا) ويسعدني دائمًا تلقي الاقتراحات والتعليقات. # فياتشيسلاف
GO TO FULL VERSION