JavaRush /Java Blog /Random EN /You Can't Spoil Java with Threads: Part V - Executor, Thr...
Viacheslav
Level 3

You Can't Spoil Java with Threads: Part V - Executor, ThreadPool, Fork Join

Published in the Random EN group

Introduction

So, we know that Java has threads, as you can read about in the overview " Threads Don't Screw Java: Part I - Threads ". You Can't Spoil Java with Threads: Part V - Executor, ThreadPool, Fork Join - 1Let's look at the sample code again:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
As we can see, the code for launching the task is quite typical, but for each new launch we will have to repeat it. One solution is to move it to a separate method, for example, execute(Runnable runnable). But the Java developers have already taken care of us and come up with an interface Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
As you can see, the code has become more concise and allowed us to simply write the code to run Runnablein the thread. It's great, isn't it? But this is just the beginning: You Can't Spoil Java with Threads: Part V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

As you can see, the interface Executorhas a child interface ExecutorService. The JavaDoc for this interface says that ExecutorServiceit is a description of a special Executor'a that provides methods to stop 'a's work Executorand allows you to get 'a's java.util.concurrent.Futureto track progress. Earlier, in " You Can't Mess with Java Threads: Part IV - Callable, Future, and Friends " we already had a brief look at the possibilities Future. Who forgot or didn't read - I advise you to refresh your memory;) What else is interesting in JavaDoc? That we have a special factory java.util.concurrent.Executorsthat allows us to create default implementations ExecutorService.

ExecutorService

Let's remember again. We have Executorto execute (i.e., execute) a certain task in a thread, when the implementation of creating a thread is hidden from us. We have ExecutorService- special Executor, which has a set of features for managing the progress of execution. And we have a factory Executorsthat allows us to create ExecutorService. Let's do it ourselves now:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
As we can see, we have specified a fixed thread pool ( Fixed Thread Pool) of size 2. After that, we send tasks to the pool one by one. Each task returns a string ( String) containing the name of the thread ( currentThread().getName()). It is important to shutdown at the very end for ExecutorService, because otherwise our program will not end. ExecutorsThere are other factory methods in the factory . For example, we can create a pool of just one thread - newSingleThreadExecutoror a caching pool newCachedThreadPoolwhen threads are removed from the pool if they are idle for 1 minute. In fact, behind these ExecutorServiceis a blocking queue , in which tasks are placed and from which these tasks are executed. You can see more about blocking queues in the video "Blocking Queue - Collections #5 - Advanced Java ". You can also read the overview " Blocking queues of the concurrent package " and the answer to the question " When to prefer LinkedBlockingQueue over ArrayBlockingQueue? ". Super simplified - BlockingQueue(blocking queue) blocks a thread, in two cases:
  • the thread is trying to get elements from an empty queue
  • the thread is trying to put elements in the full queue
If we look at the implementation of factory methods, we will see how they are arranged. For example:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
As we can see, implementations are created inside the factory methods ExecutorService. And basically this ThreadPoolExecutor. Only the attributes that affect the work change. You Can't Spoil Java with Threads: Part V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

As we saw earlier, inside factory methods ThreadPoolExecutor, . The functionality is affected by what values ​​are passed as maximum and minimum threads, and which queue is used. And any implementation of the interface can be used java.util.concurrent.BlockingQueue. Speaking of ThreadPoolExecutor'ah, it is worth noting interesting features at work. For example, you cannot send tasks to ThreadPoolExecutorif there is no space there:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
This code will fail with an error like:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
That is, taskyou cannot submit, because SynchronousQueueis arranged in such a way that it actually consists of one element and does not allow you to put more there. As we can see, queued taskshere is 0, and there is nothing strange in this, because this is specific SynchronousQueue- in fact, this is a queue of 1 element, which is always empty. (!) When one thread puts an element into the queue, it will wait until another thread picks up the element from the queue. Therefore, we can replace with new LinkedBlockingQueue<>(1)and the error will change what will be indicated queued tasks = 1. Because the queue is only 1 element, then we cannot put the second one. And let's fall on it. Continuing the theme of the queue, it is worth noting that the class ThreadPoolExecutorhas additional methods for servicing the queue. For example, the methodthreadPoolExecutor.purge()will remove all canceled tasks from the queue to free up space in the queue. Another interesting queue-related feature is the pending task handler:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
For example, the handler simply outputs a word Rejectedfor each refusal to accept a task in the queue. Convenient, isn't it? In addition, ThreadPoolExecutorit has an interesting successor - ScheduledThreadPoolExecutorwhich is ScheduledExecutorService. It provides the ability to execute a task on a timer.

ScheduledExecutorService

ExecutorServicetypes ScheduledExecutorServiceallow you to run tasks on a schedule (schedule). Let's look at an example:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Everything is simple here. Tasks are sent, we get a "scheduled task" java.util.concurrent.ScheduledFuture. With a schedule, the following case may also be useful:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Here we send Runnablethe task to be executed at a fixed rate (Fixed Rate) with a certain delay. In this case, after 1 second, every 2 seconds, start executing the task. There is a similar option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
But here the tasks are executed with a given interval BETWEEN the execution of different tasks. That is, the task taskwill be completed in 1 second. Further, as soon as it is completed, 2 seconds will pass, and then the new task will be launched. On this topic, you can read the following materials: You Can't Spoil Java with Threads: Part V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

In addition to the thread pools mentioned above, there is one more. You can tell he's a little special. His name is Work Stealing Pool. In short, Work Stealing is a work algorithm in which idle threads start picking up tasks from other threads or tasks from a common queue. Let's look at an example:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
If we run this code, then ExecutorServicewe will create 5 threads, because each thread will queue up in the wait queue for the object's lock lock. About monitors and locks on it, we have previously dealt with " You can't spoil Java thread: Part II - synchronization ". And now we will replace Executors.newCachedThreadPoolwith Executors.newWorkStealingPool(). What will change? We will see that our tasks are not executed in 5 threads, but less. Remember that cachedThreadPoolyou created your own thread for each task? Because waitit blocked the thread, and the following tasks wanted to be executed and new threads were created in the pool for them. In the case of with, StealingPoolthe threads will not be idle forever in wait, they will begin to perform neighboring tasks. What makes this thread so different from other thread pools WorkStealingPool? The fact that inside it lives actually magicalForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Actually there is another difference. The threads that are created for ForkJoinPooldefault are daemon threads, as opposed to the threads created via regular ThreadPool. In general, it is worth remembering about daemon threads, because. for example, when CompletableFuturedaemon threads are also used, if you do not specify your own ThreadFactory, which will create non-daemon threads. These are the surprises that can await in an unexpected place!)

Fork/Join Pool

In this part, we will talk about the very one ForkJoinPool(it is also called the fork / join framework) that lives "under the hood" of WorkStealingPool. In general, Fork Join Framework appeared in Java 1.7. And even though Java 11 is already in the yard, it’s still worth remembering. Not the most common task, but quite interesting. There is a good overview on the net about this topic: " Fork/Join Framework in Java 7 ". Fork/JoinPooloperates in his work with such a concept as java.util.concurrent.RecursiveTask. There is also an analogue - java.util.concurrent.RecursiveAction. RecursiveAction does not return a result. Thus RecursiveTasksimilar to Callable, but RecursiveActionsimilar to Runnable. Well, looking at the name, we see two key methods - forkand join. The method forkstarts some task asynchronously in a separate thread. A methodjoinallows you to wait for the completion of the work. There are several ways to use it: You Can't Spoil Java with Threads: Part V - Executor, ThreadPool, Fork Join - 5This picture is part of a slide in Aleksey Shipilev's report " Fork/Join: Implementation, Usage, Performance ". To make it clearer, it's worth looking at his talk at JEE CONF: " Fork Join Implementation Features ". You Can't Spoil Java with Threads: Part V - Executor, ThreadPool, Fork Join - 6

Summarizing

So, here we have finished the next part of the review. We figured out what we first came up with Executorfor executing threads. Then we decided to continue the idea and came up with ExecutorService. ExecutorServiceallows you to send tasks for execution using submitand invoke, as well as manage the service by turning it off. Because ExecutorService'u need implementations, wrote a class with factory methods and named it Executors. It allows you to create pools of thread ThreadPoolExecutor's. At the same time, there are such thread pools that allow you to also specify a schedule for execution, but WorkStealingPoolhides ForkJoinPool. I hope you were not only interested in what was written above, but also understandable) I am always glad to suggestions and comments. #Viacheslav
Comments
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION