מבוא
אז, אנחנו יודעים שיש שרשורים ב-Java, עליהם תוכלו לקרוא בסקירה " You Can't Spoil Java with a Thread: Part I - Threads ". בואו נסתכל שוב על הקוד לדוגמה:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
כפי שאנו יכולים לראות, הקוד להפעלת המשימה הוא די סטנדרטי, אך עבור כל השקה חדשה נצטרך לחזור עליו. פתרון אחד הוא להעביר אותו לשיטה נפרדת, למשל execute(Runnable runnable)
. אבל מפתחי Java כבר דאגו לנו והגיעו עם ממשק Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
כפי שאתה יכול לראות, הקוד נעשה תמציתי יותר ואיפשר לנו פשוט לכתוב קוד כדי להפעיל אותו Runnable
בשרשור. נהדר, לא? אבל זו רק ההתחלה:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
יש ממשק צאצא ExecutorService
. ה-JavaDoc של ממשק זה אומר שזהו ExecutorService
תיאור של Executor
'a' מיוחד המספק שיטות לעצירת עבודה Executor
'a' ומאפשר לך לעקוב java.util.concurrent.Future
אחר התקדמות הביצוע. בעבר, ב"אתה לא יכול לקלקל ג'אווה עם שרשור: חלק IV - ניתן להתקשרות, עתיד וחברים ", סקרנו בקצרה את האפשרויות Future
. אם שכחת או לא קראת אותו, אני ממליץ לך לרענן את הזיכרון ;) אילו עוד דברים מעניינים כתובים ב-JavaDoc? שיש לנו מפעל מיוחד java.util.concurrent.Executors
שמאפשר לנו ליצור יישומים שזמינים כברירת מחדל ExecutorService
.
ExecutorService
בואו נזכור שוב. עלינוExecutor
לבצע (כלומר לבצע) משימה מסוימת בשרשור, כאשר היישום של יצירת שרשור מוסתר מאיתנו. יש לנו ExecutorService
אחד מיוחד Executor
שיש לו סט של יכולות לניהול התקדמות הביצוע. ויש לנו מפעל Executors
שמאפשר לך ליצור ExecutorService
. בואו נעשה את זה בעצמנו עכשיו:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
כפי שאנו יכולים לראות, ציינו מאגר חוטים קבוע ( Fixed Thread Pool
) בגודל 2. לאחר מכן אנו שולחים משימות לבריכה אחת אחת. כל משימה מחזירה מחרוזת ( String
) המכילה את שם השרשור ( currentThread().getName()
). חשוב לסגור ממש בסוף ExecutorService
, כי אחרת התוכנית שלנו לא תצא. Executors
ישנן שיטות מפעל אחרות במפעל . לדוגמה, אנו יכולים ליצור מאגר של חוט אחד בלבד - newSingleThreadExecutor
או מאגר עם מטמון newCachedThreadPool
, שבו חוטים יוסרו מהבריכה אם הם לא פעילים במשך דקה. למעשה, מאחורי אלה ExecutorService
ישנו תור חסימה שלתוכו ממוקמות משימות וממנו מבוצעות המשימות הללו. מידע נוסף על חסימת תורים ניתן לראות בסרטון " תור חסימה - אוספים #5 - ג'אווה מתקדמת ". אתה יכול גם לקרוא את הסקירה " חסימת תורים של החבילה במקביל " ואת התשובה לשאלה " מתי להעדיף LinkedBlockingQueue על פני ArrayBlockingQueue? " סופר פשוט - BlockingQueue
(תור חסימה) חוסם שרשור, בשני מקרים:
- שרשור מנסה להשיג אלמנטים מתור ריק
- השרשור מנסה להכניס אלמנטים לתור מלא
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
אוֹ
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
כפי שאנו יכולים לראות, יישומים נוצרים בתוך שיטות המפעל ExecutorService
. וזהו בעצם ThreadPoolExecutor
. רק התכונות שמשפיעות על העבודה משתנות.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
כפי שראינו בעבר, בתוך שיטות המפעלThreadPoolExecutor
, . הפונקציונליות מושפעת מאילו ערכים מועברים בתור השרשורים המקסימליים והמינימליים, כמו גם באיזה תור נעשה שימוש. וניתן להשתמש בכל יישום של הממשק java.util.concurrent.BlockingQueue
. אם כבר מדברים על ThreadPoolExecutor
'אה, כדאי לשים לב לתכונות מעניינות במהלך הפעולה. לדוגמה, לא ניתן לשלוח משימות אל ThreadPoolExecutor
אם אין שם מקום:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
קוד זה ייכשל עם שגיאה כמו:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
כלומר, task
אתה לא יכול להגיש, כי SynchronousQueue
הוא מעוצב בצורה כזו שהוא למעשה מורכב מאלמנט אחד ולא מאפשר לך לשים שם יותר. כפי שאנו יכולים לראות, queued tasks
יש כאן 0, ואין בזה שום דבר מוזר, כי זה ספציפי SynchronousQueue
- למעשה, זה תור של אלמנט אחד, שתמיד ריק. (!) כאשר שרשור אחד מכניס אלמנט לתור, הוא יחכה עד שרשור אחר ייקח את האלמנט מהתור. לכן, נוכל להחליף עם new LinkedBlockingQueue<>(1)
ומה שיצוין בשגיאה ישתנה queued tasks = 1
. כי התור הוא רק אלמנט אחד, אז אנחנו לא יכולים להוסיף את השני. ואנחנו ניפול על זה. בהמשך לנושא התור, ראוי לציין שלכיתה ThreadPoolExecutor
יש שיטות נוספות לשירות התור. לדוגמה, השיטה threadPoolExecutor.purge()
תסיר את כל המשימות שבוטלו מהתור כדי לפנות מקום בתור. תכונה מעניינת נוספת הקשורה לתור היא מטפל המשימות הלא מקובל:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
לדוגמה, המטפל פשוט מדפיס מילה Rejected
עבור כל סירוב לקבל משימה לתור. נוח, לא? בנוסף, ThreadPoolExecutor
יש לו יורש מעניין - ScheduledThreadPoolExecutor
שהוא ScheduledExecutorService
. הוא מספק את היכולת לבצע משימה בטיימר.
ScheduledExecutorService
ExecutorService
סוג ScheduledExecutorService
מאפשר לך להריץ משימות לפי לוח זמנים. בואו נסתכל על דוגמה:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
הכל פשוט כאן. משימות נשלחות, אנו מקבלים "משימה מתוזמנת" java.util.concurrent.ScheduledFuture
. המקרה הבא עשוי להיות שימושי גם עם לוח הזמנים:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
כאן אנו שולחים Runnable
את המשימה לביצוע בקצב קבוע (Fixed Rate) עם עיכוב מסוים. במקרה זה, לאחר שנייה אחת כל 2 שניות, התחל לבצע את המשימה. יש אפשרות דומה:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
אבל כאן משימות מבוצעות במרווח נתון בין ביצוע משימות שונות. כלומר, המשימה task
תושלם תוך שנייה אחת. לאחר מכן, ברגע שהיא תושלם, יעברו 2 שניות, ולאחר מכן תושק משימה חדשה. אתה יכול לקרוא את החומרים הבאים בנושא זה:
- מבוא לבריכות חוטים
- מבוא לבריכות חוטים
- ג'אווה Multithreading Steplechase: ביטול משימות ב-Executors
- בחירת מנהלי Java נכונים למשימות רקע
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
בנוסף לבריכות החוטים שהוזכרו לעיל, יש עוד אחת. אפשר לומר שהוא קצת מיוחד. השם שלו הוא Work Stealing Pool. בקיצור, Work Stealing הוא אלגוריתם עבודה שבו שרשורים סרק מתחילים לקחת משימות משרשורים אחרים או משימות מהתור הכללי. בואו נסתכל על דוגמה:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
אם נריץ את הקוד הזה, ExecutorService
הוא יצור 5 שרשורים, כי כל שרשור יצטרף לתור ההמתנה במיקום האובייקט lock
. כבר דיברנו על צגים ומנעולים עליו ב"אתה לא יכול לקלקל ג'אווה עם שרשור: חלק ב' - סנכרון ." ועכשיו נחליף אותו Executors.newCachedThreadPool
ב Executors.newWorkStealingPool()
. מה ישתנה? נראה שהמשימות שלנו מבוצעות לא ב-5 שרשורים, אלא בפחות. זוכר cachedThreadPool
שיצרת שרשור משלך עבור כל משימה? כי wait
זה חסם את השרשור, אבל המשימות הבאות רצו להתבצע ונוצרו עבורם שרשורים חדשים במאגר. במקרה של StealingPool
חוטים, הם לא יבטלו לנצח ב wait
, הם יתחילו לבצע משימות שכנות. במה זה כל כך שונה ממאגר שרשורים אחרים WorkStealingPool
? כי בעצם חי בתוכו משהו קסום ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
למעשה יש עוד הבדל אחד. שרשורים שנוצרים ForkJoinPool
כברירת מחדל הם שרשורי דמון, בניגוד לשרשורים שנוצרו באמצעות רגיל ThreadPool
. באופן כללי, כדאי לזכור את שרשורי הדמונים, כי... לדוגמה, CompletableFuture
נעשה שימוש גם בשרשורי דמון, אם אינך מציין משלך ThreadFactory
, מה שייצור שרשורים שאינם דמונים. אלו סוג ההפתעות שיכולות לחכות לך במקום לא צפוי!)
מזלג/הצטרפות לבריכה
בחלק זה נדבר על אותו אחדForkJoinPool
(נקרא גם fork/join framework) שחי "מתחת למכסה המנוע" של WorkStealingPool
. באופן כללי, Fork Join Framework הופיע ב-Java 1.7. וגם אם Java 11 כבר בחצר, עדיין כדאי לזכור. לא המשימה הנפוצה ביותר, אבל די מעניינת. יש סקירה טובה בנושא זה באינטרנט: " Fork/Join Framework in Java 7 ". Fork/JoinPool
פועל בעבודתו עם מושג כמו java.util.concurrent.RecursiveTask
. יש גם אנלוגי - java.util.concurrent.RecursiveAction
. פעולות רקורסיביות אינן מחזירות תוצאה. כך RecursiveTask
דומה ל Callable
, ודומה RecursiveAction
ל Runnable
. ובכן, בהסתכלות על השם, אנו רואים שתי שיטות מפתח - fork
ו join
. השיטה fork
מפעילה משימה באופן אסינכרוני בשרשור נפרד. והשיטה join
מאפשרת לחכות לסיום העבודה. ישנן מספר דרכים להשתמש בו: תמונה זו היא חלק משקופית מהדוח של אלכסיי שיפילב " Fork/Join: implementation, use, performance ." כדי להבהיר את זה, כדאי לצפות בדו"ח שלו ב-JEE CONF: " תכונות יישום Fork Join ."
תִמצוּת
אז הנה אנחנו מסיימים את החלק הבא של הסקירה. הבנו מה הגענו לראשונהExecutor
לביצוע שרשורים. ואז החלטנו להמשיך את הרעיון והמצאנו אותו ExecutorService
. ExecutorService
מאפשר לשלוח משימות לביצוע באמצעות submit
ו invoke
, כמו גם לנהל את השירות על ידי כיבויו. כי ExecutorService
'אנחנו צריכים יישומים, כתבנו מחלקה עם שיטות מפעל וקראו לזה Executors
. זה מאפשר לך ליצור בריכות חוטים ThreadPoolExecutor
. יחד עם זאת, ישנם בריכות שרשורים המאפשרות גם לציין לוח זמנים לביצוע, אך הוא WorkStealingPool
מוסתר מאחורי ForkJoinPool
. אני מקווה שמה שנכתב למעלה לא רק היה מעניין אותך, אלא גם מובן) אני תמיד שמח לקבל הצעות והערות. #ויאצ'סלב
GO TO FULL VERSION