JavaRush /Java 博客 /Random-ZH /你不能用线程破坏 Java:第五部分 - 执行器、线程池、Fork Join
Viacheslav
第 3 级

你不能用线程破坏 Java:第五部分 - 执行器、线程池、Fork Join

已在 Random-ZH 群组中发布

介绍

所以,我们知道 Java 中有线程,您可以在评论“你不能用线程破坏 Java:第一部分 - 线程”中阅读有关线程的内容。 你不能用线程破坏 Java:第五部分 - 执行器、线程池、Fork Join - 1我们再看一下示例代码:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
正如我们所看到的,启动任务的代码非常标准,但对于每次新的启动,我们都必须重复它。例如,一种解决方案是将其移至单独的方法中execute(Runnable runnable)。但 Java 开发人员已经担心我们了,并提出了一个接口Executor
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
正如你所看到的,代码变得更加简洁,让我们只需编写代码即可Runnable在线程中运行它。太棒了,不是吗?但这只是开始: 你不能用线程破坏 Java:第五部分 - 执行器、线程池、Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

正如您所看到的,该接口Executor有一个后代接口ExecutorService。这个接口的 JavaDoc 说它ExecutorService是一个特殊的Executor“a”的描述,它提供了停止工作Executor“a”的方法,并允许您跟踪java.util.concurrent.Future执行的进度。之前,在“你不能用线程破坏 Java:第四部分 - Callable、Future 和 Friends ”中,我们简要回顾了可能性Future。如果你忘记了或者没有读过,我建议你重温一下记忆;)JavaDoc 还有什么有趣的地方?我们有一个特殊的工厂java.util.concurrent.Executors,允许我们创建默认可用的实现ExecutorService

执行服务

让我们再回忆一下。当创建线程的实现对我们隐藏时,我们必须Executor在线程中执行(即执行)某个任务。我们有ExecutorService一个特殊的Executor,它具有一组管理执行进度的功能。我们有一家工厂Executors可以让您创造ExecutorService。现在让我们自己做一下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
正如我们所看到的,我们指定了一个Fixed Thread Pool大小为2的固定线程池( )。之后我们将任务一个接一个地发送到线程池中。String每个任务返回一个包含线程名称 ( ) 的字符串( currentThread().getName())。最后关闭很重要ExecutorService,否则我们的程序将不会退出。工厂里Executors还有其他工厂方法。例如,我们可以创建一个只有一个线程的池,newSingleThreadExecutor或者一个带有缓存的池newCachedThreadPool,如果线程空闲 1 分钟,就会从池中删除它们。事实上,在这些后面ExecutorService一个阻塞队列,任务被放入其中并从中执行这些任务。有关阻塞队列的更多信息,请参阅视频“阻塞队列 - 集合 #5 - 高级 Java ”。您还可以阅读评论“并发包的阻塞队列”以及问题“何时更喜欢 LinkedBlockingQueue 而不是 ArrayBlockingQueue? ”的答案。超级简化—— BlockingQueue(阻塞队列)阻塞一个线程,有两种情况:
  • 线程正在尝试从空队列中获取元素
  • 线程正在尝试将元素放入完整队列中
如果我们查看工厂方法的实现,我们可以看到它们的结构。例如:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
或者
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
正如我们所看到的,实现是在工厂方法内创建的ExecutorService。基本上就是这样ThreadPoolExecutor。仅影响工作的属性发生变化。 你不能用线程毁掉 Java:第五部分 - 执行器、线程池、Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

线程池执行器

正如我们之前看到的,在工厂方法内部ThreadPoolExecutor,. 该功能受到作为最大和最小线程传递的值以及使用的队列的影响。并且可以使用该接口的任何实现java.util.concurrent.BlockingQueue。说到ThreadPoolExecutor“啊”,值得注意的是操作过程中有趣的功能。例如,ThreadPoolExecutor如果那里没有空间,则无法将任务发送到:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
此代码将失败并出现如下错误:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
也就是说,task你不能提交,因为 SynchronousQueue它的设计方式使其实际上由一个元素组成,并且不允许您在其中放置更多元素。我们可以看到,queued tasks这里有0,这并没有什么奇怪的,因为 这是具体的SynchronousQueue- 事实上,它是一个只有 1 个元素的队列,并且始终为空。(!) 当一个线程将一个元素放入队列时,它将等待,直到另一个线程从队列中取出该元素。因此,我们可以替换为new LinkedBlockingQueue<>(1),错误中指示的内容将会改变queued tasks = 1。因为 队列只有 1 个元素,那么我们无法添加第二个元素。我们将在这一点上失败。继续队列的主题,值得注意的是该类ThreadPoolExecutor还有用于服务队列的附加方法。例如,该方法threadPoolExecutor.purge()将从队列中删除所有已取消的任务以释放队列中的空间。与队列相关的另一个有趣的功能是不接受的任务处理程序:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
例如,处理程序只需Rejected为每次拒绝接受队列中的任务打印一个单词。方便不是吗?此外,ThreadPoolExecutor他还有一位有趣的继承人——ScheduledThreadPoolExecutor他就是ScheduledExecutorService。它提供了在计时器上执行任务的能力。

预定执行服务

ExecutorService类型ScheduledExecutorService允许您根据计划运行任务。让我们看一个例子:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
这里一切都很简单。任务发送后,我们收到“计划任务” java.util.concurrent.ScheduledFuture。以下情况对于时间表也可能有用:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
这里我们发送Runnable要以固定速率(Fixed Rate)执行的任务,并有一定的延迟。在这种情况下,每2秒1秒后,开始执行任务。还有一个类似的选项:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
但这里的任务是在不同任务的执行之间以给定的间隔执行的。也就是说,任务task将在1秒内完成。接下来,一旦完成,2秒过去,就会启动新的任务。您可以阅读以下有关此主题的材料: 你不能用线程毁掉 Java:第五部分 - 执行器、线程池、Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

工作偷池

除了上面提到的线程池之外,还有一种。可以说他有点特别。它的名字是工作窃取池。简而言之,Work Stealing 是一种工作算法,其中空闲线程开始从其他线程获取任务或从通用队列中获取任务。让我们看一个例子:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
如果我们运行这段代码,ExecutorService它将创建 5 个线程,因为 每个线程都会加入对象所在位置的等待队列lock我们已经在“你不能用线程破坏 Java:第二部分 - 同步”中讨论了监视器和锁。现在我们将其替换Executors.newCachedThreadPoolExecutors.newWorkStealingPool(). 会发生什么变化?我们将看到我们的任务不是在 5 个线程中执行,而是在更少的线程中执行。还记得cachedThreadPool您为每个任务创建了自己的线程吗?因为wait它阻塞了线程,但接下来的任务想要执行,并且在池中为它们创建了新线程。对于StealingPool线程来说,它们不会永远空闲wait,它们将开始执行相邻的任务。这与其他线程池有何不同WorkStealingPool?因为他的内心深处确实存在着某种神奇的东西ForkJoinPool
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
实际上还有一个区别。默认情况下创建的线程ForkJoinPool是守护线程,而不是通过常规ThreadPool. 一般来说,值得记住守护线程,因为...... 例如,CompletableFuture也使用守护线程,如果您不指定自己的ThreadFactory,这将创建非守护线程。这些惊喜可以在意想不到的地方等待着您!)

分叉/加入池

在这一部分中,我们将讨论ForkJoinPool生活在WorkStealingPool. 一般来说,Fork Join框架出现在Java 1.7中。即使 Java 11 已经出现,它仍然值得记住。这不是最常见的任务,但很有趣。互联网上有一个关于这个主题的很好的评论:“ Fork/Join Framework in Java 7 ”。 Fork/JoinPool他的作品中运用了这样的概念java.util.concurrent.RecursiveTask。还有一个类似物 - java.util.concurrent.RecursiveAction。RecursiveActions 不返回结果。因此RecursiveTask类似于Callable,并且RecursiveAction类似于Runnable。好吧,看名字,我们看到两个关键方法 -forkjoin。该方法fork在单独的线程中异步运行任务。并且该方法join允许您等待工作完成。有多种使用方法: 这张图片是 Alexey Shipilev 报告“ Fork/Join:实现、使用、性能你不能用线程毁掉 Java:第五部分 - 执行器、线程池、Fork Join - 5”幻灯片的一部分。为了更清楚地说明这一点,值得一看他在 JEE CONF 上的报告:“ Fork Join 实现功能”。

总结

所以,我们在这里完成了评论的下一部分。我们弄清楚了我们首先想到的Executor执行线程的方法。然后我们决定继续这个想法并想出了它ExecutorService。允许您使用和ExecutorService发送要执行的任务,以及通过关闭服务来管理该服务。因为 '我们需要实现,我们编写了一个带有工厂方法的类并调用它。它允许您创建线程池。同时,还有线程池也可以让你指定执行的时间表,只不过是隐藏在后面的。我希望上面写的内容不仅对您来说有趣,而且可以理解)我总是很高兴收到建议和评论。#维亚切斯拉夫submitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION