JavaRush /Blog Java /Random-VI /Bạn không thể làm hỏng Java bằng một chủ đề: Phần V - Exe...
Viacheslav
Mức độ

Bạn không thể làm hỏng Java bằng một chủ đề: Phần V - Executor, ThreadPool, Fork Join

Xuất bản trong nhóm

Giới thiệu

Vì vậy, chúng tôi biết rằng có những luồng trong Java mà bạn có thể đọc trong bài đánh giá “ Bạn không thể làm hỏng Java bằng một luồng: Phần I - Chủ đề ”. Bạn không thể làm hỏng Java bằng một Chủ đề: Phần V - Executor, ThreadPool, Fork Join - 1Hãy xem lại mã mẫu:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Như chúng ta có thể thấy, mã để khởi chạy tác vụ khá chuẩn, nhưng với mỗi lần khởi chạy mới, chúng ta sẽ phải lặp lại nó. Một giải pháp là chuyển nó sang một phương thức riêng, ví dụ execute(Runnable runnable). Nhưng các nhà phát triển Java đã lo lắng cho chúng tôi và đưa ra một giao diện Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Như bạn có thể thấy, mã đã trở nên ngắn gọn hơn và cho phép chúng ta chỉ cần viết mã để chạy nó Runnabletrong một luồng. Tuyệt vời phải không? Nhưng điều này chỉ là khởi đầu: Bạn không thể làm hỏng Java bằng một luồng: Phần V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Như bạn có thể thấy, giao diện Executorcó giao diện con cháu ExecutorService. JavaDoc của giao diện này nói rằng ExecutorServiceđó là mô tả về một Executor'a' đặc biệt cung cấp các phương thức để dừng công việc Executor'a' và cho phép bạn theo java.util.concurrent.Futuredõi tiến trình thực thi. Trước đây, trong “ Bạn không thể làm hỏng Java với Chủ đề: Phần IV - Có thể gọi được, Tương lai và Bạn bè, ” chúng tôi đã xem xét ngắn gọn các khả năng Future. Nếu bạn quên hoặc chưa đọc thì tôi khuyên bạn nên nhớ lại ;) JavaDoc còn có gì thú vị nữa? Rằng chúng tôi có một nhà máy đặc biệt java.util.concurrent.Executorscho phép chúng tôi tạo các triển khai có sẵn theo mặc định ExecutorService.

Dịch vụ thực thi

Chúng ta hãy nhớ lại. Chúng ta phải Executorthực thi (tức là thực thi) một tác vụ nhất định trong một luồng, khi việc triển khai tạo một luồng bị ẩn khỏi chúng ta. Chúng tôi có ExecutorServicemột công cụ đặc biệt Executorcó tập hợp các khả năng để quản lý tiến độ thực hiện. Và chúng tôi có một nhà máy Executorscho phép bạn tạo các tệp ExecutorService. Hãy tự mình làm điều đó ngay bây giờ:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Như chúng ta có thể thấy, chúng ta đã chỉ định một nhóm luồng cố định ( Fixed Thread Pool) có kích thước 2. Sau đó, chúng ta gửi từng tác vụ đến nhóm đó. Mỗi tác vụ trả về một chuỗi ( String) chứa tên luồng ( currentThread().getName()). Điều quan trọng là phải tắt máy ở cuối ExecutorService, vì nếu không chương trình của chúng tôi sẽ không thoát. ExecutorsCó những phương pháp sản xuất khác trong nhà máy . Ví dụ: chúng ta có thể tạo một nhóm chỉ có một luồng - newSingleThreadExecutorhoặc một nhóm có bộ nhớ đệm newCachedThreadPool, trong đó các luồng sẽ bị xóa khỏi nhóm nếu chúng không hoạt động trong 1 phút. Trên thực tế, đằng sau những thứ này ExecutorServicemột hàng đợi chặn trong đó các tác vụ được đặt và từ đó các tác vụ này được thực thi. Bạn có thể xem thêm thông tin về việc chặn hàng đợi trong video " Chặn hàng đợi - Bộ sưu tập #5 - Java nâng cao ". Bạn cũng có thể đọc bài đánh giá “ Chặn hàng đợi của gói đồng thời ” và câu trả lời cho câu hỏi “ Khi nào nên ưu tiên LinkedBlockingQueue hơn ArrayBlockingQueue? ” Siêu đơn giản - BlockingQueue(chặn hàng đợi) chặn một luồng, trong hai trường hợp:
  • một luồng đang cố lấy các phần tử từ một hàng đợi trống
  • luồng đang cố gắng đưa các phần tử vào một hàng đợi đầy đủ
Nếu chúng ta xem xét việc triển khai các phương thức Factory, chúng ta có thể thấy chúng được cấu trúc như thế nào. Ví dụ:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
hoặc
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Như chúng ta có thể thấy, việc triển khai được tạo bên trong các phương thức xuất xưởng ExecutorService. Và về cơ bản là vậy ThreadPoolExecutor. Chỉ những thuộc tính ảnh hưởng đến công việc mới thay đổi. Bạn không thể phá hỏng Java bằng một luồng: Phần V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Như chúng ta đã thấy trước đây, bên trong các phương thức xuất xưởng ThreadPoolExecutor, . Chức năng bị ảnh hưởng bởi giá trị nào được truyền dưới dạng luồng tối đa và tối thiểu, cũng như hàng đợi nào được sử dụng. Và bất kỳ việc triển khai giao diện nào cũng có thể được sử dụng java.util.concurrent.BlockingQueue. Nói đến ThreadPoolExecutor'à, cần lưu ý những tính năng thú vị trong quá trình hoạt động. Ví dụ: bạn không thể gửi tác vụ tới ThreadPoolExecutornếu không còn chỗ trống ở đó:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Mã này sẽ thất bại với một lỗi như:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Tức là taskbạn không thể gửi, bởi vì SynchronousQueuenó được thiết kế theo cách nó thực sự bao gồm một phần tử và không cho phép bạn đặt nhiều hơn ở đó. Như chúng ta có thể thấy, queued tasksở đây có số 0 và không có gì lạ ở đây, bởi vì đây là điều cụ thể SynchronousQueue- trên thực tế, nó là hàng đợi gồm 1 phần tử, luôn trống. (!) Khi một luồng đưa một phần tử vào hàng đợi, nó sẽ đợi cho đến khi một luồng khác lấy phần tử đó ra khỏi hàng đợi. Do đó, chúng ta có thể thay thế bằng new LinkedBlockingQueue<>(1)và những gì được chỉ ra trong lỗi sẽ thay đổi queued tasks = 1. Bởi vì hàng đợi chỉ có 1 phần tử nên chúng ta không thể thêm phần tử thứ hai. Và chúng ta sẽ rơi vào điều này. Tiếp tục chủ đề về hàng đợi, điều đáng chú ý là lớp này ThreadPoolExecutorcó các phương thức bổ sung để phục vụ hàng đợi. Ví dụ: phương thức này threadPoolExecutor.purge()sẽ xóa tất cả tác vụ bị hủy khỏi hàng đợi để giải phóng dung lượng trong hàng đợi. Một tính năng thú vị khác liên quan đến hàng đợi là trình xử lý tác vụ không được chấp nhận:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ví dụ: trình xử lý chỉ in một từ Rejectedcho mỗi lần từ chối chấp nhận một tác vụ vào hàng đợi. Thật tiện lợi phải không? Ngoài ra, ThreadPoolExecutoranh ta còn có một người thừa kế thú vị - ScheduledThreadPoolExecutorđó là ScheduledExecutorService. Nó cung cấp khả năng thực hiện một nhiệm vụ trên một bộ đếm thời gian.

Dịch vụ thực thi theo lịch trình

ExecutorServicetype ScheduledExecutorServicecho phép bạn chạy các tác vụ theo lịch trình. Hãy xem một ví dụ:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Mọi thứ đều đơn giản ở đây. Nhiệm vụ được gửi đi, chúng ta nhận được “nhiệm vụ theo lịch trình” java.util.concurrent.ScheduledFuture. Trường hợp sau đây cũng có thể hữu ích với lịch trình:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ở đây chúng tôi gửi Runnabletác vụ sẽ được thực thi ở tốc độ cố định với độ trễ nhất định. Trong trường hợp này, cứ sau 1 giây cứ sau 2 giây, hãy bắt đầu thực hiện tác vụ. Có một lựa chọn tương tự:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Nhưng ở đây các nhiệm vụ được thực thi với một khoảng thời gian nhất định GIỮA việc thực hiện các nhiệm vụ khác nhau. Tức là nhiệm vụ tasksẽ hoàn thành trong 1 giây. Tiếp theo, ngay sau khi hoàn thành, 2 giây sẽ trôi qua và sau đó một nhiệm vụ mới sẽ được triển khai. Bạn có thể đọc các tài liệu sau về chủ đề này: Bạn không thể phá hỏng Java bằng một luồng: Phần V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

Công việcTrộm cắpHồ bơi

Ngoài các nhóm chủ đề được đề cập ở trên, còn có một nhóm nữa. Có thể nói anh ấy hơi đặc biệt. Tên của nó là Work Stealing Pool. Nói tóm lại, Đánh cắp công việc là một thuật toán công việc trong đó các luồng nhàn rỗi bắt đầu nhận nhiệm vụ từ các luồng khác hoặc nhiệm vụ từ hàng đợi chung. Hãy xem một ví dụ:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Nếu chúng ta chạy đoạn mã này, ExecutorServicenó sẽ tạo ra 5 luồng, bởi vì mỗi luồng sẽ tham gia hàng đợi tại vị trí của đối tượng lock. Chúng ta đã thảo luận về màn hình và khóa trên nó trong “ Bạn không thể làm hỏng Java bằng một chủ đề: Phần II - Đồng bộ hóa ”. Và bây giờ chúng ta sẽ thay thế nó Executors.newCachedThreadPoolbằng Executors.newWorkStealingPool(). Điều gì sẽ thay đổi? Chúng ta sẽ thấy rằng các nhiệm vụ của chúng ta được thực hiện không phải trong 5 luồng mà ít hơn. Hãy nhớ rằng cachedThreadPoolbạn đã tạo chủ đề của riêng mình cho từng nhiệm vụ? Bởi vì waitnó đã chặn luồng, nhưng các tác vụ tiếp theo muốn được thực thi và các luồng mới đã được tạo trong nhóm cho chúng. Trong trường hợp StealingPoolcác luồng, chúng sẽ không ở trạng thái nhàn rỗi mãi mãi waitmà sẽ bắt đầu thực thi các tác vụ lân cận. Điều này khác với các nhóm chủ đề khác như thế nào WorkStealingPool? Bởi vì thực sự có điều gì đó kỳ diệu đang sống bên trong anh ấy ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Thực tế còn có một sự khác biệt nữa. Các luồng được tạo theo ForkJoinPoolmặc định là các luồng daemon, trái ngược với các luồng được tạo thông qua ThreadPool. Nói chung, cần nhớ về các luồng daemon, bởi vì... ví dụ: CompletableFuturecác luồng daemon cũng được sử dụng, nếu bạn không chỉ định các luồng của riêng mình ThreadFactory, điều này sẽ tạo ra các luồng không phải daemon. Đây là những điều bất ngờ có thể chờ đợi bạn ở một nơi không ngờ tới!)

Ngã ba/Tham gia nhóm

Trong phần này, chúng ta sẽ nói về cùng một khung ForkJoinPool(còn được gọi là khung fork/join) tồn tại “dưới vỏ bọc” của WorkStealingPool. Nói chung, Fork Join Framework đã xuất hiện trong Java 1.7. Và ngay cả khi Java 11 đã có sẵn thì nó vẫn đáng được ghi nhớ. Không phải là nhiệm vụ phổ biến nhất, nhưng khá thú vị. Có một bài đánh giá hay về chủ đề này trên Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPoolhoạt động trong công việc của mình với một khái niệm như java.util.concurrent.RecursiveTask. Ngoài ra còn có một chất tương tự - java.util.concurrent.RecursiveAction. RecursiveActions không trả về kết quả. Do đó RecursiveTasktương tự như Callable, và RecursiveActiontương tự như Runnable. Chà, nhìn vào tên, chúng ta thấy hai phương thức chính - forkjoin. Phương thức này forkchạy một tác vụ không đồng bộ trong một luồng riêng biệt. Và phương pháp joincho phép bạn đợi công việc hoàn thành. Có một số cách để sử dụng nó: Bạn không thể phá hỏng Java bằng một luồng: Phần V - Executor, ThreadPool, Fork Join - 5Hình ảnh này là một phần của trang trình bày từ báo cáo của Alexey Shipilev “ Fork/Join: triển khai, sử dụng, hiệu suất .” Để làm rõ hơn, bạn nên xem báo cáo của anh ấy tại JEE CONF: “ Các tính năng triển khai Fork Join ”.

Tóm tắt

Vậy là chúng ta đã hoàn thành phần tiếp theo của bài đánh giá. Chúng tôi đã tìm ra điều đầu tiên chúng tôi nghĩ ra Executorđể thực thi các luồng. Sau đó, chúng tôi quyết định tiếp tục ý tưởng và nghĩ ra nó ExecutorService. ExecutorServicecho phép bạn gửi các tác vụ để thực thi bằng cách sử dụng submitinvokecũng như quản lý dịch vụ bằng cách tắt nó. Bởi vì ExecutorService'chúng tôi cần triển khai, chúng tôi đã viết một lớp với các phương thức xuất xưởng và gọi nó là Executors. Nó cho phép bạn tạo các nhóm chủ đề ThreadPoolExecutor. Đồng thời, có các nhóm luồng cũng cho phép bạn chỉ định lịch thực hiện nhưng nó WorkStealingPoolbị ẩn đằng sau ForkJoinPool. Tôi hy vọng rằng những gì viết ở trên không chỉ thú vị với bạn mà còn dễ hiểu) Tôi luôn vui vẻ nhận được những gợi ý và nhận xét. #Viacheslav
Bình luận
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION