การแนะนำ
ดังนั้นเราจึงรู้ว่ามีเธรดใน Java ซึ่งคุณสามารถอ่านได้ในบทวิจารณ์ “ คุณไม่สามารถสปอย Java ด้วยเธรด: ตอนที่ 1 - เธรด ”public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
ดังที่เราเห็นโค้ดสำหรับการเปิดตัวงานนั้นค่อนข้างมาตรฐาน แต่สำหรับการเปิดตัวใหม่แต่ละครั้งเราจะต้องทำซ้ำ วิธีแก้ไขประการหนึ่งคือย้ายไปยังวิธีแยกกันexecute(Runnable runnable)
เช่น แต่นักพัฒนา Java กังวลเกี่ยวกับเราแล้วและสร้างอินเทอร์เฟซขึ้นมาExecutor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
อย่างที่คุณเห็น โค้ดมีความกระชับมากขึ้นและช่วยให้เราสามารถเขียนโค้ดเพื่อรันRunnable
ในเธรดได้ เยี่ยมมากใช่มั้ย? แต่นี่เป็นเพียงจุดเริ่มต้น: ![คุณไม่สามารถทำให้ Java เสียด้วยเธรดได้: ตอนที่ V - Executor, ThreadPool, Fork Join - 2](https://cdn.javarush.com/images/article/fe98eee4-5064-4626-bc15-535a728659ca/512.webp)
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
มีอินเทอร์เฟซExecutorService
สืบทอด JavaDoc ของอินเทอร์เฟซนี้ระบุว่าExecutorService
เป็นคำอธิบายของExecutor
'a' พิเศษที่ให้วิธีการหยุดงานExecutor
'a' และช่วยให้คุณสามารถjava.util.concurrent.Future
ติดตามความคืบหน้าของการดำเนินการได้ ก่อนหน้านี้ ใน “ You Can’t Spoil Java with Thread: Part IV - Callable, Future and Friends ” เราได้ตรวจสอบความเป็นไปได้โดยFuture
ย่อ หากคุณลืมหรือยังไม่ได้อ่าน ขอแนะนำให้รีเฟรชหน่วยความจำ ;) JavaDoc มีอะไรน่าสนใจอีกบ้าง? เรามีโรงงานพิเศษjava.util.concurrent.Executors
ที่ช่วยให้เราสามารถสร้างการใช้งานที่พร้อมใช้งานตามค่าเริ่มExecutorService
ต้น
ExecutorService
เรามารำลึกกันอีกครั้ง เราต้องExecutor
ดำเนินการ (เช่น ดำเนินการ) งานบางอย่างในเธรด เมื่อการดำเนินการสร้างเธรดถูกซ่อนจากเรา เรามีExecutorService
สิ่งพิเศษExecutor
ที่มีชุดความสามารถในการจัดการความคืบหน้าของการดำเนินการ และเรามีโรงงานExecutors
ที่ให้คุณสร้างสรรค์ExecutorService
ได้ มาทำเองตอนนี้:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
ดังที่เราเห็น เราได้ระบุเธรดพูลคงที่ ( Fixed Thread Pool
) ขนาด 2 หลังจากนั้นเราจะส่งงานไปที่พูลทีละงาน แต่ละงานส่งคืนสตริง ( String
) ที่มีชื่อเธรด ( currentThread().getName()
) สิ่งสำคัญคือต้องปิดเครื่องในตอนท้ายสุดExecutorService
เพราะไม่เช่นนั้นโปรแกรมของเราจะไม่ออก Executors
มีวิธีโรงงานอื่นในโรงงาน ตัวอย่างเช่น เราสามารถสร้างพูลของเธรดเดียว - newSingleThreadExecutor
หรือพูลที่มีการแคชnewCachedThreadPool
โดยที่เธรดจะถูกลบออกจากพูลหากไม่ได้ใช้งานเป็นเวลา 1 นาที ในความเป็นจริง เบื้องหลังสิ่งเหล่านี้ExecutorService
มีคิวการบล็อกซึ่งงานจะถูกวางและงานเหล่านี้ถูกดำเนินการ ข้อมูลเพิ่มเติมเกี่ยวกับการบล็อกคิวสามารถดูได้ในวิดีโอ " การบล็อกคิว - คอลเลกชั่น #5 - Java ขั้นสูง " คุณยังสามารถอ่านบทวิจารณ์ “ การบล็อกคิวของแพ็คเกจที่เกิดขึ้นพร้อมกัน ” และคำตอบสำหรับคำถาม “ เมื่อใดจึงควรใช้ LinkedBlockingQueue มากกว่า ArrayBlockingQueue ” ง่ายมาก - BlockingQueue
(การบล็อกคิว) บล็อกเธรดในสองกรณี:
- เธรดกำลังพยายามรับองค์ประกอบจากคิวว่าง
- เธรดกำลังพยายามใส่องค์ประกอบลงในคิวเต็ม
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
หรือ
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ดังที่เราเห็น การใช้งานถูกสร้างขึ้นภายในวิธีการของExecutorService
โรงงาน และนั่นคือโดยพื้นฐานThreadPoolExecutor
แล้ว เฉพาะคุณลักษณะที่ส่งผลต่อการทำงานเท่านั้นที่เปลี่ยนแปลง ![คุณไม่สามารถทำลาย Java ด้วยเธรดได้: ตอนที่ V - Executor, ThreadPool, Fork Join - 3](https://cdn.javarush.com/images/article/decabcf3-8341-429b-9266-a262f8a4152b/800.webp)
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
ดังที่เราเห็นก่อนหน้านี้ ภายในวิธีการของโรงงานThreadPoolExecutor
. ฟังก์ชั่นการทำงานได้รับผลกระทบจากค่าที่ส่งผ่านเป็นเธรดสูงสุดและต่ำสุดรวมถึงคิวที่ใช้ และสามารถใช้อินเทอร์เฟซใด ๆjava.util.concurrent.BlockingQueue
ก็ได้ เมื่อพูดถึงThreadPoolExecutor
'ahs ก็ควรสังเกตคุณสมบัติที่น่าสนใจระหว่างการใช้งาน ตัวอย่างเช่น คุณไม่สามารถส่งงานไปได้ThreadPoolExecutor
หากไม่มีที่ว่าง:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
รหัสนี้จะล้มเหลวโดยมีข้อผิดพลาดเช่น:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
นั่นคือtask
คุณไม่สามารถส่งได้เพราะว่า SynchronousQueue
มันได้รับการออกแบบในลักษณะที่ประกอบด้วยองค์ประกอบเดียวและไม่อนุญาตให้คุณใส่เข้าไปอีก อย่างที่เราเห็นqueued tasks
มี 0 ตรงนี้ และไม่มีอะไรแปลกในเรื่องนี้ เพราะ นี่เป็นสิ่งเฉพาะSynchronousQueue
- อันที่จริงมันคือคิวที่มี 1 องค์ประกอบซึ่งจะว่างเปล่าเสมอ (!) เมื่อเธรดหนึ่งวางองค์ประกอบลงในคิว มันจะรอจนกว่าเธรดอื่นจะดึงองค์ประกอบออกจากคิว ดังนั้นเราจึงสามารถแทนที่ด้วยnew LinkedBlockingQueue<>(1)
และสิ่งที่จะระบุในข้อผิดพลาดจะเปลี่ยนqueued tasks = 1
ไป เพราะ คิวมีเพียง 1 องค์ประกอบ ดังนั้นเราจึงไม่สามารถเพิ่มองค์ประกอบที่สองได้ และเราจะตกอยู่ในสิ่งนี้ เนื่องมาจากธีมของคิว เป็นที่น่าสังเกตว่าคลาสThreadPoolExecutor
มีวิธีการเพิ่มเติมในการให้บริการคิว ตัวอย่างเช่น วิธีการthreadPoolExecutor.purge()
จะลบงานที่ถูกยกเลิกทั้งหมดออกจากคิวเพื่อเพิ่มพื้นที่ว่างในคิว คุณสมบัติที่น่าสนใจอีกอย่างที่เกี่ยวข้องกับคิวคือตัวจัดการงานที่ได้รับการยอมรับ:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
ตัวอย่างเช่น ตัวจัดการเพียงพิมพ์คำRejected
สำหรับการปฏิเสธที่จะรับงานเข้าไปในคิวแต่ละครั้ง สะดวกไม่ใช่เหรอ? นอกจากนี้ThreadPoolExecutor
เขายังมีทายาทที่น่าสนใจอีกคนหนึ่งคือScheduledThreadPoolExecutor
ใคร ScheduledExecutorService
มันให้ความสามารถในการทำงานบนตัวจับเวลา
ScheduledExecutorService
ExecutorService
ประเภทScheduledExecutorService
อนุญาตให้คุณรันงานตามกำหนดเวลา ลองดูตัวอย่าง:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
ทุกอย่างเรียบง่ายที่นี่ งานถูกส่งไปแล้ว เราได้รับ “งานที่กำหนดเวลาไว้java.util.concurrent.ScheduledFuture
” กรณีต่อไปนี้อาจเป็นประโยชน์กับกำหนดการด้วย:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
ที่นี่เราส่งRunnable
งานให้ดำเนินการในอัตราคงที่โดยมีความล่าช้าบางอย่าง ในกรณีนี้ หลังจาก 1 วินาทีทุกๆ 2 วินาที ให้เริ่มดำเนินการงาน มีตัวเลือกที่คล้ายกัน:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
แต่ที่นี่งานจะถูกดำเนินการตามช่วงเวลาที่กำหนดระหว่างการปฏิบัติงานที่แตกต่างกัน นั่นคืองานtask
จะเสร็จสิ้นภายใน 1 วินาที ต่อไปเมื่อเสร็จสิ้น 2 วินาทีก็จะผ่านไป จากนั้นงานใหม่ก็จะเริ่มดำเนินการ คุณสามารถอ่านเนื้อหาต่อไปนี้ในหัวข้อนี้:
- บทนำเกี่ยวกับกลุ่มเธรด
- รู้เบื้องต้นเกี่ยวกับเธรดพูล
- Java Multithreading Steeplechase: การยกเลิกงานใน Executors
- การเลือกตัวดำเนินการ Java ที่ถูกต้องสำหรับงานเบื้องหลัง
![คุณไม่สามารถทำลาย Java ด้วยเธรด: ตอนที่ V - Executor, ThreadPool, Fork Join - 4](https://cdn.javarush.com/images/article/d4ef6ba2-470d-4778-a638-56f0df3150eb/800.webp)
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
นอกจากกลุ่มเธรดที่กล่าวถึงข้างต้นแล้ว ยังมีอีกกลุ่มหนึ่งอีกด้วย คุณสามารถพูดได้ว่าเขาพิเศษนิดหน่อย ชื่อของมันคือสระขโมยงาน กล่าวโดยย่อ Work Stealing เป็นอัลกอริธึมการทำงานที่เธรดที่ไม่ได้ใช้งานเริ่มรับงานจากเธรดอื่นหรืองานจากคิวทั่วไป ลองดูตัวอย่าง:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
ถ้าเรารันโค้ดนี้ExecutorService
มันจะสร้าง 5 เธรด เพราะว่า แต่ละเธรดจะเข้าร่วมคิวรอที่ตำแหน่งของlock
วัตถุ เราได้พูดคุยกันแล้วเกี่ยวกับมอนิเตอร์และการล็อคใน “ You Can't Spoil Java with a Thread: Part II - Synchronization ” และตอนนี้เราจะแทนที่มันExecutors.newCachedThreadPool
ด้วยExecutors.newWorkStealingPool()
. อะไรจะเปลี่ยนไป? เราจะเห็นว่างานของเราไม่ได้ดำเนินการใน 5 เธรด แต่น้อยกว่า จำไว้ว่าcachedThreadPool
คุณสร้างกระทู้ของตัวเองสำหรับแต่ละงาน? เพราะwait
มันบล็อกเธรด แต่งานต่อไปต้องการดำเนินการและมีการสร้างเธรดใหม่ในพูลสำหรับพวกเขา ในกรณีของStealingPool
เธรด พวกเขาจะไม่ไม่ได้ใช้งานตลอดไปในwait
พวกเขาจะเริ่มดำเนินการงานใกล้เคียง สิ่งนี้แตกต่างจากกลุ่มเธรดอื่นWorkStealingPool
อย่างไร เพราะมีบางสิ่งที่มหัศจรรย์อาศัยอยู่ภายในตัวเขาForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
จริงๆ แล้วมีความแตกต่างอีกอย่างหนึ่ง เธรดที่สร้างขึ้นโดยForkJoinPool
ดีฟอลต์คือเธรด daemon ซึ่งตรงข้ามกับเธรดที่สร้างผ่านเธรดThreadPool
ปกติ โดยทั่วไปแล้ว เป็นเรื่องที่ควรค่าแก่การจดจำเกี่ยวกับเธรด daemon เนื่องจาก... ตัวอย่างเช่นCompletableFuture
มีการใช้เธรด daemon หากคุณไม่ได้ระบุ ของคุณเองThreadFactory
ซึ่งจะสร้างเธรดที่ไม่ใช่ daemon สิ่งเหล่านี้คือเรื่องเซอร์ไพรส์ที่อาจรอคุณอยู่ในสถานที่ที่ไม่คาดคิด!)
ส้อม/เข้าร่วมพูล
ในส่วนนี้เราจะพูดถึงสิ่งเดียวกันForkJoinPool
(หรือที่เรียกว่า fork/join framework) ที่ทำงาน "ภายใต้ประทุน" ของWorkStealingPool
. โดยทั่วไป Fork Join Framework จะปรากฏใน Java 1.7 และแม้ว่า Java 11 จะอยู่ในสนามแล้ว แต่ก็ยังคุ้มค่าที่จะจดจำ ไม่ใช่งานที่พบบ่อยที่สุด แต่ค่อนข้างน่าสนใจ มีบทวิจารณ์ที่ดีในหัวข้อนี้บนอินเทอร์เน็ต: “ Fork/Join Framework ใน Java 7 ” Fork/JoinPool
ดำเนินงานในงานของเขาด้วยแนวคิดjava.util.concurrent.RecursiveTask
เช่น นอกจากนี้ยังมีอะนาล็อก - java.util.concurrent.RecursiveAction
. RecursiveActions ไม่ส่งคืนผลลัพธ์ จึงRecursiveTask
คล้ายกับCallable
และRecursiveAction
คล้ายRunnable
กับ เมื่อดูชื่อเราจะเห็นวิธีสำคัญสองวิธี - fork
และjoin
. วิธีการfork
รันงานแบบอะซิงโครนัสในเธรดที่แยกต่างหาก และวิธีนี้join
ช่วยให้คุณรอให้งานเสร็จ มีหลายวิธีในการใช้งาน: ![คุณไม่สามารถทำลาย Java ด้วยเธรดได้: ตอนที่ V - Executor, ThreadPool, Fork Join - 5](https://cdn.javarush.com/images/article/d3a97020-34fd-44cf-b0a2-bf85b9a18fe5/800.webp)
สรุป
เอาล่ะ เรามาจบการรีวิวส่วนต่อไปกัน เราพบว่าเราคิดอะไรขึ้นมาเป็นครั้งแรกExecutor
ในการรันเธรด จากนั้นเราก็ตัดสินใจที่จะสานต่อแนวคิดนี้และเกิดไอเดียขึ้นExecutorService
มา ExecutorService
ช่วยให้คุณสามารถส่งงานเพื่อดำเนินการโดยใช้submit
และinvoke
รวมถึงจัดการบริการโดยการปิดการทำงาน เพราะ ExecutorService
'เราต้องการการนำไปใช้งาน เราเขียนคลาสด้วยวิธีแบบโรงงานและเรียกมันExecutors
ว่า ช่วยให้คุณสร้างกลุ่มThreadPoolExecutor
เธรด ในเวลาเดียวกัน มีเธรดพูลที่ให้คุณระบุกำหนดเวลาสำหรับการดำเนินการได้ แต่จะWorkStealingPool
ซ่อน อยู่หลัง ForkJoinPool
. ฉันหวังว่าสิ่งที่เขียนไว้ข้างต้นไม่เพียงแต่น่าสนใจสำหรับคุณ แต่ยังเข้าใจได้) ฉันยินดีรับข้อเสนอแนะและความคิดเห็นเสมอ #เวียเชสลาฟ
GO TO FULL VERSION