JavaRush /จาวาบล็อก /Random-TH /คุณไม่สามารถสปอย Java ด้วย Thread: ตอนที่ V - Executor, T...
Viacheslav
ระดับ

คุณไม่สามารถสปอย Java ด้วย Thread: ตอนที่ V - Executor, ThreadPool, Fork Join

เผยแพร่ในกลุ่ม

การแนะนำ

ดังนั้นเราจึงรู้ว่ามีเธรดใน Java ซึ่งคุณสามารถอ่านได้ในบทวิจารณ์ “ คุณไม่สามารถสปอย Java ด้วยเธรด: ตอนที่ 1 - เธรดคุณไม่สามารถทำให้ Java เสียด้วย Thread: Part V - Executor, ThreadPool, Fork Join - 1ลองดูโค้ดตัวอย่างอีกครั้ง:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
ดังที่เราเห็นโค้ดสำหรับการเปิดตัวงานนั้นค่อนข้างมาตรฐาน แต่สำหรับการเปิดตัวใหม่แต่ละครั้งเราจะต้องทำซ้ำ วิธีแก้ไขประการหนึ่งคือย้ายไปยังวิธีแยกกันexecute(Runnable runnable)เช่น แต่นักพัฒนา Java กังวลเกี่ยวกับเราแล้วและสร้างอินเทอร์เฟซขึ้นมาExecutor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
อย่างที่คุณเห็น โค้ดมีความกระชับมากขึ้นและช่วยให้เราสามารถเขียนโค้ดเพื่อรันRunnableในเธรดได้ เยี่ยมมากใช่มั้ย? แต่นี่เป็นเพียงจุดเริ่มต้น: คุณไม่สามารถทำให้ Java เสียด้วยเธรดได้: ตอนที่ V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

อย่างที่คุณเห็น อินเทอร์เฟซExecutorมีอินเทอร์เฟซExecutorServiceสืบทอด JavaDoc ของอินเทอร์เฟซนี้ระบุว่าExecutorServiceเป็นคำอธิบายของExecutor'a' พิเศษที่ให้วิธีการหยุดงานExecutor'a' และช่วยให้คุณสามารถjava.util.concurrent.Futureติดตามความคืบหน้าของการดำเนินการได้ ก่อนหน้านี้ ใน “ You Can’t Spoil Java with Thread: Part IV - Callable, Future and Friends ” เราได้ตรวจสอบความเป็นไปได้โดยFutureย่อ หากคุณลืมหรือยังไม่ได้อ่าน ขอแนะนำให้รีเฟรชหน่วยความจำ ;) JavaDoc มีอะไรน่าสนใจอีกบ้าง? เรามีโรงงานพิเศษjava.util.concurrent.Executorsที่ช่วยให้เราสามารถสร้างการใช้งานที่พร้อมใช้งานตามค่าเริ่มExecutorServiceต้น

ExecutorService

เรามารำลึกกันอีกครั้ง เราต้องExecutorดำเนินการ (เช่น ดำเนินการ) งานบางอย่างในเธรด เมื่อการดำเนินการสร้างเธรดถูกซ่อนจากเรา เรามีExecutorServiceสิ่งพิเศษExecutorที่มีชุดความสามารถในการจัดการความคืบหน้าของการดำเนินการ และเรามีโรงงานExecutorsที่ให้คุณสร้างสรรค์ExecutorServiceได้ มาทำเองตอนนี้:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
ดังที่เราเห็น เราได้ระบุเธรดพูลคงที่ ( Fixed Thread Pool) ขนาด 2 หลังจากนั้นเราจะส่งงานไปที่พูลทีละงาน แต่ละงานส่งคืนสตริง ( String) ที่มีชื่อเธรด ( currentThread().getName()) สิ่งสำคัญคือต้องปิดเครื่องในตอนท้ายสุดExecutorServiceเพราะไม่เช่นนั้นโปรแกรมของเราจะไม่ออก Executorsมีวิธีโรงงานอื่นในโรงงาน ตัวอย่างเช่น เราสามารถสร้างพูลของเธรดเดียว - newSingleThreadExecutorหรือพูลที่มีการแคชnewCachedThreadPoolโดยที่เธรดจะถูกลบออกจากพูลหากไม่ได้ใช้งานเป็นเวลา 1 นาที ในความเป็นจริง เบื้องหลังสิ่งเหล่านี้ExecutorServiceมีคิวการบล็อกซึ่งงานจะถูกวางและงานเหล่านี้ถูกดำเนินการ ข้อมูลเพิ่มเติมเกี่ยวกับการบล็อกคิวสามารถดูได้ในวิดีโอ " การบล็อกคิว - คอลเลกชั่น #5 - Java ขั้นสูง " คุณยังสามารถอ่านบทวิจารณ์ “ การบล็อกคิวของแพ็คเกจที่เกิดขึ้นพร้อมกัน ” และคำตอบสำหรับคำถาม “ เมื่อใดจึงควรใช้ LinkedBlockingQueue มากกว่า ArrayBlockingQueue ” ง่ายมาก - BlockingQueue(การบล็อกคิว) บล็อกเธรดในสองกรณี:
  • เธรดกำลังพยายามรับองค์ประกอบจากคิวว่าง
  • เธรดกำลังพยายามใส่องค์ประกอบลงในคิวเต็ม
หากเราดูที่การนำวิธีการของโรงงานไปใช้ เราก็จะเห็นว่าวิธีการเหล่านั้นมีโครงสร้างอย่างไร ตัวอย่างเช่น:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
หรือ
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
ดังที่เราเห็น การใช้งานถูกสร้างขึ้นภายในวิธีการของExecutorServiceโรงงาน และนั่นคือโดยพื้นฐานThreadPoolExecutorแล้ว เฉพาะคุณลักษณะที่ส่งผลต่อการทำงานเท่านั้นที่เปลี่ยนแปลง คุณไม่สามารถทำลาย Java ด้วยเธรดได้: ตอนที่ V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

ดังที่เราเห็นก่อนหน้านี้ ภายในวิธีการของโรงงานThreadPoolExecutor. ฟังก์ชั่นการทำงานได้รับผลกระทบจากค่าที่ส่งผ่านเป็นเธรดสูงสุดและต่ำสุดรวมถึงคิวที่ใช้ และสามารถใช้อินเทอร์เฟซใด ๆjava.util.concurrent.BlockingQueueก็ได้ เมื่อพูดถึงThreadPoolExecutor'ahs ก็ควรสังเกตคุณสมบัติที่น่าสนใจระหว่างการใช้งาน ตัวอย่างเช่น คุณไม่สามารถส่งงานไปได้ThreadPoolExecutorหากไม่มีที่ว่าง:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
รหัสนี้จะล้มเหลวโดยมีข้อผิดพลาดเช่น:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
นั่นคือtaskคุณไม่สามารถส่งได้เพราะว่า SynchronousQueueมันได้รับการออกแบบในลักษณะที่ประกอบด้วยองค์ประกอบเดียวและไม่อนุญาตให้คุณใส่เข้าไปอีก อย่างที่เราเห็นqueued tasksมี 0 ตรงนี้ และไม่มีอะไรแปลกในเรื่องนี้ เพราะ นี่เป็นสิ่งเฉพาะSynchronousQueue- อันที่จริงมันคือคิวที่มี 1 องค์ประกอบซึ่งจะว่างเปล่าเสมอ (!) เมื่อเธรดหนึ่งวางองค์ประกอบลงในคิว มันจะรอจนกว่าเธรดอื่นจะดึงองค์ประกอบออกจากคิว ดังนั้นเราจึงสามารถแทนที่ด้วยnew LinkedBlockingQueue<>(1)และสิ่งที่จะระบุในข้อผิดพลาดจะเปลี่ยนqueued tasks = 1ไป เพราะ คิวมีเพียง 1 องค์ประกอบ ดังนั้นเราจึงไม่สามารถเพิ่มองค์ประกอบที่สองได้ และเราจะตกอยู่ในสิ่งนี้ เนื่องมาจากธีมของคิว เป็นที่น่าสังเกตว่าคลาสThreadPoolExecutorมีวิธีการเพิ่มเติมในการให้บริการคิว ตัวอย่างเช่น วิธีการthreadPoolExecutor.purge()จะลบงานที่ถูกยกเลิกทั้งหมดออกจากคิวเพื่อเพิ่มพื้นที่ว่างในคิว คุณสมบัติที่น่าสนใจอีกอย่างที่เกี่ยวข้องกับคิวคือตัวจัดการงานที่ได้รับการยอมรับ:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
ตัวอย่างเช่น ตัวจัดการเพียงพิมพ์คำRejectedสำหรับการปฏิเสธที่จะรับงานเข้าไปในคิวแต่ละครั้ง สะดวกไม่ใช่เหรอ? นอกจากนี้ThreadPoolExecutorเขายังมีทายาทที่น่าสนใจอีกคนหนึ่งคือScheduledThreadPoolExecutorใคร ScheduledExecutorServiceมันให้ความสามารถในการทำงานบนตัวจับเวลา

ScheduledExecutorService

ExecutorServiceประเภทScheduledExecutorServiceอนุญาตให้คุณรันงานตามกำหนดเวลา ลองดูตัวอย่าง:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
ทุกอย่างเรียบง่ายที่นี่ งานถูกส่งไปแล้ว เราได้รับ “งานที่กำหนดเวลาไว้java.util.concurrent.ScheduledFuture” กรณีต่อไปนี้อาจเป็นประโยชน์กับกำหนดการด้วย:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
ที่นี่เราส่งRunnableงานให้ดำเนินการในอัตราคงที่โดยมีความล่าช้าบางอย่าง ในกรณีนี้ หลังจาก 1 วินาทีทุกๆ 2 วินาที ให้เริ่มดำเนินการงาน มีตัวเลือกที่คล้ายกัน:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
แต่ที่นี่งานจะถูกดำเนินการตามช่วงเวลาที่กำหนดระหว่างการปฏิบัติงานที่แตกต่างกัน นั่นคืองานtaskจะเสร็จสิ้นภายใน 1 วินาที ต่อไปเมื่อเสร็จสิ้น 2 วินาทีก็จะผ่านไป จากนั้นงานใหม่ก็จะเริ่มดำเนินการ คุณสามารถอ่านเนื้อหาต่อไปนี้ในหัวข้อนี้: คุณไม่สามารถทำลาย Java ด้วยเธรด: ตอนที่ V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

นอกจากกลุ่มเธรดที่กล่าวถึงข้างต้นแล้ว ยังมีอีกกลุ่มหนึ่งอีกด้วย คุณสามารถพูดได้ว่าเขาพิเศษนิดหน่อย ชื่อของมันคือสระขโมยงาน กล่าวโดยย่อ Work Stealing เป็นอัลกอริธึมการทำงานที่เธรดที่ไม่ได้ใช้งานเริ่มรับงานจากเธรดอื่นหรืองานจากคิวทั่วไป ลองดูตัวอย่าง:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
ถ้าเรารันโค้ดนี้ExecutorServiceมันจะสร้าง 5 เธรด เพราะว่า แต่ละเธรดจะเข้าร่วมคิวรอที่ตำแหน่งของlockวัตถุ เราได้พูดคุยกันแล้วเกี่ยวกับมอนิเตอร์และการล็อคใน “ You Can't Spoil Java with a Thread: Part II - Synchronization ” และตอนนี้เราจะแทนที่มันExecutors.newCachedThreadPoolด้วยExecutors.newWorkStealingPool(). อะไรจะเปลี่ยนไป? เราจะเห็นว่างานของเราไม่ได้ดำเนินการใน 5 เธรด แต่น้อยกว่า จำไว้ว่าcachedThreadPoolคุณสร้างกระทู้ของตัวเองสำหรับแต่ละงาน? เพราะwaitมันบล็อกเธรด แต่งานต่อไปต้องการดำเนินการและมีการสร้างเธรดใหม่ในพูลสำหรับพวกเขา ในกรณีของStealingPoolเธรด พวกเขาจะไม่ไม่ได้ใช้งานตลอดไปในwaitพวกเขาจะเริ่มดำเนินการงานใกล้เคียง สิ่งนี้แตกต่างจากกลุ่มเธรดอื่นWorkStealingPoolอย่างไร เพราะมีบางสิ่งที่มหัศจรรย์อาศัยอยู่ภายในตัวเขาForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
จริงๆ แล้วมีความแตกต่างอีกอย่างหนึ่ง เธรดที่สร้างขึ้นโดยForkJoinPoolดีฟอลต์คือเธรด daemon ซึ่งตรงข้ามกับเธรดที่สร้างผ่านเธรดThreadPoolปกติ โดยทั่วไปแล้ว เป็นเรื่องที่ควรค่าแก่การจดจำเกี่ยวกับเธรด daemon เนื่องจาก... ตัวอย่างเช่นCompletableFutureมีการใช้เธรด daemon หากคุณไม่ได้ระบุ ของคุณเองThreadFactoryซึ่งจะสร้างเธรดที่ไม่ใช่ daemon สิ่งเหล่านี้คือเรื่องเซอร์ไพรส์ที่อาจรอคุณอยู่ในสถานที่ที่ไม่คาดคิด!)

ส้อม/เข้าร่วมพูล

ในส่วนนี้เราจะพูดถึงสิ่งเดียวกันForkJoinPool(หรือที่เรียกว่า fork/join framework) ที่ทำงาน "ภายใต้ประทุน" ของWorkStealingPool. โดยทั่วไป Fork Join Framework จะปรากฏใน Java 1.7 และแม้ว่า Java 11 จะอยู่ในสนามแล้ว แต่ก็ยังคุ้มค่าที่จะจดจำ ไม่ใช่งานที่พบบ่อยที่สุด แต่ค่อนข้างน่าสนใจ มีบทวิจารณ์ที่ดีในหัวข้อนี้บนอินเทอร์เน็ต: “ Fork/Join Framework ใน Java 7Fork/JoinPoolดำเนินงานในงานของเขาด้วยแนวคิดjava.util.concurrent.RecursiveTaskเช่น นอกจากนี้ยังมีอะนาล็อก - java.util.concurrent.RecursiveAction. RecursiveActions ไม่ส่งคืนผลลัพธ์ จึงRecursiveTaskคล้ายกับCallableและRecursiveActionคล้ายRunnableกับ เมื่อดูชื่อเราจะเห็นวิธีสำคัญสองวิธี - forkและjoin. วิธีการforkรันงานแบบอะซิงโครนัสในเธรดที่แยกต่างหาก และวิธีนี้joinช่วยให้คุณรอให้งานเสร็จ มีหลายวิธีในการใช้งาน: คุณไม่สามารถทำลาย Java ด้วยเธรดได้: ตอนที่ V - Executor, ThreadPool, Fork Join - 5รูปภาพนี้เป็นส่วนหนึ่งของสไลด์จากรายงานของ Alexey Shipilev “ Fork/Join: Implement, Use, Performance ” เพื่อให้ชัดเจนยิ่งขึ้น ควรดูรายงานของเขาที่ JEE CONF: “ คุณลักษณะการใช้งาน Fork Join

สรุป

เอาล่ะ เรามาจบการรีวิวส่วนต่อไปกัน เราพบว่าเราคิดอะไรขึ้นมาเป็นครั้งแรกExecutorในการรันเธรด จากนั้นเราก็ตัดสินใจที่จะสานต่อแนวคิดนี้และเกิดไอเดียขึ้นExecutorServiceมา ExecutorServiceช่วยให้คุณสามารถส่งงานเพื่อดำเนินการโดยใช้submitและinvokeรวมถึงจัดการบริการโดยการปิดการทำงาน เพราะ ExecutorService'เราต้องการการนำไปใช้งาน เราเขียนคลาสด้วยวิธีแบบโรงงานและเรียกมันExecutorsว่า ช่วยให้คุณสร้างกลุ่มThreadPoolExecutorเธรด ในเวลาเดียวกัน มีเธรดพูลที่ให้คุณระบุกำหนดเวลาสำหรับการดำเนินการได้ แต่จะWorkStealingPoolซ่อน อยู่หลัง ForkJoinPool. ฉันหวังว่าสิ่งที่เขียนไว้ข้างต้นไม่เพียงแต่น่าสนใจสำหรับคุณ แต่ยังเข้าใจได้) ฉันยินดีรับข้อเสนอแนะและความคิดเห็นเสมอ #เวียเชสลาฟ
ความคิดเห็น
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION