JavaRush /Java Blog /Random EN /You Can't Spoil Java with a Thread: Part V - Executor, Th...
Viacheslav
Level 3

You Can't Spoil Java with a Thread: Part V - Executor, ThreadPool, Fork Join

Published in the Random EN group

Introduction

So, we know that there are threads in Java, which you can read about in the review “ You Can’t Spoil Java with a Thread: Part I - Threads ”. You can't spoil Java with a Thread: Part V - Executor, ThreadPool, Fork Join - 1Let's look at the sample code again:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
As we can see, the code for launching the task is quite standard, but for each new launch we will have to repeat it. One solution is to move it into a separate method, for example execute(Runnable runnable). But the Java developers have already worried about us and come up with an interface Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
As you can see, the code has become more concise and allowed us to simply write code to run it Runnablein a thread. Great, isn't it? But this is just the beginning: You can't spoil Java with a thread: Part V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

As you can see, the interface Executorhas a descendant interface ExecutorService. The JavaDoc of this interface says that ExecutorServiceit is a description of a special Executor'a' that provides methods for stopping work Executor'a' and allows you to get java.util.concurrent.Futureto track the progress of execution. Previously, in “ You Can’t Spoil Java with Thread: Part IV - Callable, Future and Friends, ” we briefly reviewed the possibilities Future. If you forgot or haven’t read it, I advise you to refresh your memory ;) What other interesting things are written in JavaDoc? That we have a special factory java.util.concurrent.Executorsthat allows us to create implementations that are available by default ExecutorService.

ExecutorService

Let's remember again. We have Executorto execute (i.e. execute) a certain task in a thread, when the implementation of creating a thread is hidden from us. We have ExecutorServicea special one Executorthat has a set of capabilities for managing the progress of execution. And we have a factory Executorsthat allows you to create ExecutorService. Let's do it ourselves now:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
As we can see, we have specified a fixed thread pool ( Fixed Thread Pool) of size 2. After which we send tasks to the pool one by one. Each task returns a string ( String) containing the thread name ( currentThread().getName()). It is important to shutdown at the very end ExecutorService, because otherwise our program will not exit. ExecutorsThere are other factory methods in the factory . For example, we can create a pool of just one thread - newSingleThreadExecutoror a pool with caching newCachedThreadPool, where threads will be removed from the pool if they are idle for 1 minute. In fact, behind these ExecutorServicethere is a blocking queue into which tasks are placed and from which these tasks are executed. More information about blocking queues can be seen in the video " Blocking queue - Collections #5 - Advanced Java ". You can also read the review “ Blocking queues of the concurrent package ” and the answer to the question “ When to prefer LinkedBlockingQueue over ArrayBlockingQueue? ” Super simplified - BlockingQueue(blocking queue) blocks a thread, in two cases:
  • a thread is trying to get elements from an empty queue
  • the thread is trying to put elements into a full queue
If we look at the implementation of factory methods, we can see how they are structured. For example:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
As we can see, implementations are created inside factory methods ExecutorService. And that's basically it ThreadPoolExecutor. Only the attributes that affect the work change. You can't ruin Java with a thread: Part V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

As we previously saw, inside factory methods ThreadPoolExecutor, . The functionality is affected by what values ​​are passed as the maximum and minimum threads, as well as what queue is used. And any implementation of the interface can be used java.util.concurrent.BlockingQueue. Speaking of ThreadPoolExecutor'ahs, it is worth noting interesting features during operation. For example, you cannot send tasks to ThreadPoolExecutorif there is no space there:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
This code will fail with an error like:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
That is, taskyou cannot submit, because SynchronousQueueit is designed in such a way that it actually consists of one element and does not allow you to put more there. As we can see, queued tasksthere is 0 here, and there is nothing strange in this, because this is specific SynchronousQueue- in fact, it is a queue of 1 element, which is always empty. (!) When one thread puts an element into the queue, it will wait until another thread takes the element from the queue. Therefore, we can replace with new LinkedBlockingQueue<>(1)and what will be indicated in the error will change queued tasks = 1. Because the queue is only 1 element, then we cannot add the second one. And we will fall on this. Continuing the theme of the queue, it is worth noting that the class ThreadPoolExecutorhas additional methods for servicing the queue. For example, the method threadPoolExecutor.purge()will remove all canceled tasks from the queue to free up space in the queue. Another interesting feature related to the queue is the unaccepted task handler:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
For example, the handler simply prints a word Rejectedfor each refusal to accept a task into the queue. Convenient, isn't it? In addition, ThreadPoolExecutorhe has an interesting heir - ScheduledThreadPoolExecutorwho is ScheduledExecutorService. It provides the ability to perform a task on a timer.

ScheduledExecutorService

ExecutorServicetype ScheduledExecutorServiceallow you to run tasks according to a schedule. Let's look at an example:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Everything is simple here. Tasks are sent, we receive a “scheduled task” java.util.concurrent.ScheduledFuture. The following case may also be useful with the schedule:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Here we send Runnablethe task to be executed at a fixed rate (Fixed Rate) with a certain delay. In this case, after 1 second every 2 seconds, start executing the task. There is a similar option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
But here tasks are executed with a given interval BETWEEN the execution of different tasks. That is, the task taskwill be completed in 1 second. Next, as soon as it is completed, 2 seconds will pass, and then a new task will be launched. You can read the following materials on this topic: You can't ruin Java with a thread: Part V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

In addition to the thread pools mentioned above, there is one more. You could say he's a little special. Its name is Work Stealing Pool. In short, Work Stealing is a work algorithm in which idle threads begin to take tasks from other threads or tasks from the general queue. Let's look at an example:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
If we run this code, ExecutorServiceit will create 5 threads, because each thread will join the wait queue at the object's location lock. We have already discussed about monitors and locks on it in “ You Can’t Spoil Java with a Thread: Part II - Synchronization .” And now we will replace it Executors.newCachedThreadPoolwith Executors.newWorkStealingPool(). What will change? We will see that our tasks are performed not in 5 threads, but in fewer. Remember that cachedThreadPoolyou created your own thread for each task? Because waitit blocked the thread, but the next tasks wanted to be executed and new threads were created in the pool for them. In the case of StealingPoolthreads, they will not idle forever in wait, they will start executing neighboring tasks. How is this so different from other thread pools WorkStealingPool? Because there is actually something magical living inside of him ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
There is actually one more difference. Threads that are created by ForkJoinPooldefault are daemon threads, as opposed to threads created through regular ThreadPool. In general, it is worth remembering about daemon threads, because... for example, CompletableFuturedaemon threads are also used, if you do not specify your own ThreadFactory, which will create non-daemon threads. These are the kind of surprises that can await you in an unexpected place!)

Fork/Join Pool

In this part we will talk about the same one ForkJoinPool(also called fork/join framework) that lives “under the hood” of WorkStealingPool. In general, the Fork Join Framework appeared in Java 1.7. And even if Java 11 is already in the yard, it’s still worth remembering. Not the most common task, but quite interesting. There is a good review on this topic on the Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPooloperates in his work with such a concept as java.util.concurrent.RecursiveTask. There is also an analogue - java.util.concurrent.RecursiveAction. RecursiveActions do not return a result. Thus RecursiveTasksimilar to Callable, and RecursiveActionsimilar to Runnable. Well, looking at the name, we see two key methods - forkand join. The method forkruns a task asynchronously in a separate thread. And the method joinallows you to wait for the work to complete. There are several ways to use it: You can't ruin Java with a thread: Part V - Executor, ThreadPool, Fork Join - 5This picture is part of a slide from Alexey Shipilev’s report “ Fork/Join: implementation, use, performance .” To make it clearer, it’s worth watching his report at JEE CONF: “ Fork Join implementation features .”

Summarizing

So, here we are, finishing the next part of the review. We figured out what we first came up with Executorfor executing threads. Then we decided to continue the idea and came up with it ExecutorService. ExecutorServiceallows you to send tasks for execution using submitand invoke, as well as manage the service by turning it off. Because ExecutorService'we need implementations, we wrote a class with factory methods and called it Executors. It allows you to create thread pools ThreadPoolExecutor. At the same time, there are thread pools that also allow you to specify a schedule for execution, but it WorkStealingPoolis hidden behind ForkJoinPool. I hope that what was written above was not only interesting to you, but also understandable) I am always happy to receive suggestions and comments. #Viacheslav
Comments
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION