JavaRush /בלוג Java /Random-HE /אתה לא יכול לקלקל את ג'אווה עם שרשור: חלק V - Executor, T...
Viacheslav
רָמָה

אתה לא יכול לקלקל את ג'אווה עם שרשור: חלק V - Executor, ThreadPool, Fork Join

פורסם בקבוצה

מבוא

אז, אנחנו יודעים שיש שרשורים ב-Java, עליהם תוכלו לקרוא בסקירה " You Can't Spoil Java with a Thread: Part I - Threads ". אתה לא יכול לקלקל את ג'אווה עם שרשור: חלק V - Executor, ThreadPool, Fork Join - 1בואו נסתכל שוב על הקוד לדוגמה:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
כפי שאנו יכולים לראות, הקוד להפעלת המשימה הוא די סטנדרטי, אך עבור כל השקה חדשה נצטרך לחזור עליו. פתרון אחד הוא להעביר אותו לשיטה נפרדת, למשל execute(Runnable runnable). אבל מפתחי Java כבר דאגו לנו והגיעו עם ממשק Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
כפי שאתה יכול לראות, הקוד נעשה תמציתי יותר ואיפשר לנו פשוט לכתוב קוד כדי להפעיל אותו Runnableבשרשור. נהדר, לא? אבל זו רק ההתחלה: אתה לא יכול לקלקל את Java עם שרשור: חלק V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

כפי שאתה יכול לראות, לממשק Executorיש ממשק צאצא ExecutorService. ה-JavaDoc של ממשק זה אומר שזהו ExecutorServiceתיאור של Executor'a' מיוחד המספק שיטות לעצירת עבודה Executor'a' ומאפשר לך לעקוב java.util.concurrent.Futureאחר התקדמות הביצוע. בעבר, ב"אתה לא יכול לקלקל ג'אווה עם שרשור: חלק IV - ניתן להתקשרות, עתיד וחברים ", סקרנו בקצרה את האפשרויות Future. אם שכחת או לא קראת אותו, אני ממליץ לך לרענן את הזיכרון ;) אילו עוד דברים מעניינים כתובים ב-JavaDoc? שיש לנו מפעל מיוחד java.util.concurrent.Executorsשמאפשר לנו ליצור יישומים שזמינים כברירת מחדל ExecutorService.

ExecutorService

בואו נזכור שוב. עלינו Executorלבצע (כלומר לבצע) משימה מסוימת בשרשור, כאשר היישום של יצירת שרשור מוסתר מאיתנו. יש לנו ExecutorServiceאחד מיוחד Executorשיש לו סט של יכולות לניהול התקדמות הביצוע. ויש לנו מפעל Executorsשמאפשר לך ליצור ExecutorService. בואו נעשה את זה בעצמנו עכשיו:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
כפי שאנו יכולים לראות, ציינו מאגר חוטים קבוע ( Fixed Thread Pool) בגודל 2. לאחר מכן אנו שולחים משימות לבריכה אחת אחת. כל משימה מחזירה מחרוזת ( String) המכילה את שם השרשור ( currentThread().getName()). חשוב לסגור ממש בסוף ExecutorService, כי אחרת התוכנית שלנו לא תצא. Executorsישנן שיטות מפעל אחרות במפעל . לדוגמה, אנו יכולים ליצור מאגר של חוט אחד בלבד - newSingleThreadExecutorאו מאגר עם מטמון newCachedThreadPool, שבו חוטים יוסרו מהבריכה אם הם לא פעילים במשך דקה. למעשה, מאחורי אלה ExecutorServiceישנו תור חסימה שלתוכו ממוקמות משימות וממנו מבוצעות המשימות הללו. מידע נוסף על חסימת תורים ניתן לראות בסרטון " תור חסימה - אוספים #5 - ג'אווה מתקדמת ". אתה יכול גם לקרוא את הסקירה " חסימת תורים של החבילה במקביל " ואת התשובה לשאלה " מתי להעדיף LinkedBlockingQueue על פני ArrayBlockingQueue? " סופר פשוט - BlockingQueue(תור חסימה) חוסם שרשור, בשני מקרים:
  • שרשור מנסה להשיג אלמנטים מתור ריק
  • השרשור מנסה להכניס אלמנטים לתור מלא
אם נסתכל על יישום שיטות המפעל, נוכל לראות כיצד הן בנויות. לדוגמה:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
אוֹ
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
כפי שאנו יכולים לראות, יישומים נוצרים בתוך שיטות המפעל ExecutorService. וזהו בעצם ThreadPoolExecutor. רק התכונות שמשפיעות על העבודה משתנות. אתה לא יכול להרוס את ג'אווה עם שרשור: חלק V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

כפי שראינו בעבר, בתוך שיטות המפעל ThreadPoolExecutor, . הפונקציונליות מושפעת מאילו ערכים מועברים בתור השרשורים המקסימליים והמינימליים, כמו גם באיזה תור נעשה שימוש. וניתן להשתמש בכל יישום של הממשק java.util.concurrent.BlockingQueue. אם כבר מדברים על ThreadPoolExecutor'אה, כדאי לשים לב לתכונות מעניינות במהלך הפעולה. לדוגמה, לא ניתן לשלוח משימות אל ThreadPoolExecutorאם אין שם מקום:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
קוד זה ייכשל עם שגיאה כמו:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
כלומר, taskאתה לא יכול להגיש, כי SynchronousQueueהוא מעוצב בצורה כזו שהוא למעשה מורכב מאלמנט אחד ולא מאפשר לך לשים שם יותר. כפי שאנו יכולים לראות, queued tasksיש כאן 0, ואין בזה שום דבר מוזר, כי זה ספציפי SynchronousQueue- למעשה, זה תור של אלמנט אחד, שתמיד ריק. (!) כאשר שרשור אחד מכניס אלמנט לתור, הוא יחכה עד שרשור אחר ייקח את האלמנט מהתור. לכן, נוכל להחליף עם new LinkedBlockingQueue<>(1)ומה שיצוין בשגיאה ישתנה queued tasks = 1. כי התור הוא רק אלמנט אחד, אז אנחנו לא יכולים להוסיף את השני. ואנחנו ניפול על זה. בהמשך לנושא התור, ראוי לציין שלכיתה ThreadPoolExecutorיש שיטות נוספות לשירות התור. לדוגמה, השיטה threadPoolExecutor.purge()תסיר את כל המשימות שבוטלו מהתור כדי לפנות מקום בתור. תכונה מעניינת נוספת הקשורה לתור היא מטפל המשימות הלא מקובל:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
לדוגמה, המטפל פשוט מדפיס מילה Rejectedעבור כל סירוב לקבל משימה לתור. נוח, לא? בנוסף, ThreadPoolExecutorיש לו יורש מעניין - ScheduledThreadPoolExecutorשהוא ScheduledExecutorService. הוא מספק את היכולת לבצע משימה בטיימר.

ScheduledExecutorService

ExecutorServiceסוג ScheduledExecutorServiceמאפשר לך להריץ משימות לפי לוח זמנים. בואו נסתכל על דוגמה:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
הכל פשוט כאן. משימות נשלחות, אנו מקבלים "משימה מתוזמנת" java.util.concurrent.ScheduledFuture. המקרה הבא עשוי להיות שימושי גם עם לוח הזמנים:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
כאן אנו שולחים Runnableאת המשימה לביצוע בקצב קבוע (Fixed Rate) עם עיכוב מסוים. במקרה זה, לאחר שנייה אחת כל 2 שניות, התחל לבצע את המשימה. יש אפשרות דומה:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
אבל כאן משימות מבוצעות במרווח נתון בין ביצוע משימות שונות. כלומר, המשימה taskתושלם תוך שנייה אחת. לאחר מכן, ברגע שהיא תושלם, יעברו 2 שניות, ולאחר מכן תושק משימה חדשה. אתה יכול לקרוא את החומרים הבאים בנושא זה: אתה לא יכול להרוס את ג'אווה עם שרשור: חלק V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

בנוסף לבריכות החוטים שהוזכרו לעיל, יש עוד אחת. אפשר לומר שהוא קצת מיוחד. השם שלו הוא Work Stealing Pool. בקיצור, Work Stealing הוא אלגוריתם עבודה שבו שרשורים סרק מתחילים לקחת משימות משרשורים אחרים או משימות מהתור הכללי. בואו נסתכל על דוגמה:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
אם נריץ את הקוד הזה, ExecutorServiceהוא יצור 5 שרשורים, כי כל שרשור יצטרף לתור ההמתנה במיקום האובייקט lock. כבר דיברנו על צגים ומנעולים עליו ב"אתה לא יכול לקלקל ג'אווה עם שרשור: חלק ב' - סנכרון ." ועכשיו נחליף אותו Executors.newCachedThreadPoolב Executors.newWorkStealingPool(). מה ישתנה? נראה שהמשימות שלנו מבוצעות לא ב-5 שרשורים, אלא בפחות. זוכר cachedThreadPoolשיצרת שרשור משלך עבור כל משימה? כי waitזה חסם את השרשור, אבל המשימות הבאות רצו להתבצע ונוצרו עבורם שרשורים חדשים במאגר. במקרה של StealingPoolחוטים, הם לא יבטלו לנצח ב wait, הם יתחילו לבצע משימות שכנות. במה זה כל כך שונה ממאגר שרשורים אחרים WorkStealingPool? כי בעצם חי בתוכו משהו קסום ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
למעשה יש עוד הבדל אחד. שרשורים שנוצרים ForkJoinPoolכברירת מחדל הם שרשורי דמון, בניגוד לשרשורים שנוצרו באמצעות רגיל ThreadPool. באופן כללי, כדאי לזכור את שרשורי הדמונים, כי... לדוגמה, CompletableFutureנעשה שימוש גם בשרשורי דמון, אם אינך מציין משלך ThreadFactory, מה שייצור שרשורים שאינם דמונים. אלו סוג ההפתעות שיכולות לחכות לך במקום לא צפוי!)

מזלג/הצטרפות לבריכה

בחלק זה נדבר על אותו אחד ForkJoinPool(נקרא גם fork/join framework) שחי "מתחת למכסה המנוע" של WorkStealingPool. באופן כללי, Fork Join Framework הופיע ב-Java 1.7. וגם אם Java 11 כבר בחצר, עדיין כדאי לזכור. לא המשימה הנפוצה ביותר, אבל די מעניינת. יש סקירה טובה בנושא זה באינטרנט: " Fork/Join Framework in Java 7 ". Fork/JoinPoolפועל בעבודתו עם מושג כמו java.util.concurrent.RecursiveTask. יש גם אנלוגי - java.util.concurrent.RecursiveAction. פעולות רקורסיביות אינן מחזירות תוצאה. כך RecursiveTaskדומה ל Callable, ודומה RecursiveActionל Runnable. ובכן, בהסתכלות על השם, אנו רואים שתי שיטות מפתח - forkו join. השיטה forkמפעילה משימה באופן אסינכרוני בשרשור נפרד. והשיטה joinמאפשרת לחכות לסיום העבודה. ישנן מספר דרכים להשתמש בו: אתה לא יכול להרוס את ג'אווה עם שרשור: חלק V - Executor, ThreadPool, Fork Join - 5תמונה זו היא חלק משקופית מהדוח של אלכסיי שיפילב " Fork/Join: implementation, use, performance ." כדי להבהיר את זה, כדאי לצפות בדו"ח שלו ב-JEE CONF: " תכונות יישום Fork Join ."

תִמצוּת

אז הנה אנחנו מסיימים את החלק הבא של הסקירה. הבנו מה הגענו לראשונה Executorלביצוע שרשורים. ואז החלטנו להמשיך את הרעיון והמצאנו אותו ExecutorService. ExecutorServiceמאפשר לשלוח משימות לביצוע באמצעות submitו invoke, כמו גם לנהל את השירות על ידי כיבויו. כי ExecutorService'אנחנו צריכים יישומים, כתבנו מחלקה עם שיטות מפעל וקראו לזה Executors. זה מאפשר לך ליצור בריכות חוטים ThreadPoolExecutor. יחד עם זאת, ישנם בריכות שרשורים המאפשרות גם לציין לוח זמנים לביצוע, אך הוא WorkStealingPoolמוסתר מאחורי ForkJoinPool. אני מקווה שמה שנכתב למעלה לא רק היה מעניין אותך, אלא גם מובן) אני תמיד שמח לקבל הצעות והערות. #ויאצ'סלב
הערות
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION