Introduction
So, we know that there are threads in Java, which you can read about in the review “ You Can’t Spoil Java with a Thread: Part I - Threads ”. Let's look at the sample code again:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
As we can see, the code for launching the task is quite standard, but for each new launch we will have to repeat it. One solution is to move it into a separate method, for example execute(Runnable runnable)
. But the Java developers have already worried about us and come up with an interface Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
As you can see, the code has become more concise and allowed us to simply write code to run it Runnable
in a thread. Great, isn't it? But this is just the beginning:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
has a descendant interface ExecutorService
. The JavaDoc of this interface says that ExecutorService
it is a description of a special Executor
'a' that provides methods for stopping work Executor
'a' and allows you to get java.util.concurrent.Future
to track the progress of execution. Previously, in “ You Can’t Spoil Java with Thread: Part IV - Callable, Future and Friends, ” we briefly reviewed the possibilities Future
. If you forgot or haven’t read it, I advise you to refresh your memory ;) What other interesting things are written in JavaDoc? That we have a special factory java.util.concurrent.Executors
that allows us to create implementations that are available by default ExecutorService
.
ExecutorService
Let's remember again. We haveExecutor
to execute (i.e. execute) a certain task in a thread, when the implementation of creating a thread is hidden from us. We have ExecutorService
a special one Executor
that has a set of capabilities for managing the progress of execution. And we have a factory Executors
that allows you to create ExecutorService
. Let's do it ourselves now:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
As we can see, we have specified a fixed thread pool ( Fixed Thread Pool
) of size 2. After which we send tasks to the pool one by one. Each task returns a string ( String
) containing the thread name ( currentThread().getName()
). It is important to shutdown at the very end ExecutorService
, because otherwise our program will not exit. Executors
There are other factory methods in the factory . For example, we can create a pool of just one thread - newSingleThreadExecutor
or a pool with caching newCachedThreadPool
, where threads will be removed from the pool if they are idle for 1 minute. In fact, behind these ExecutorService
there is a blocking queue into which tasks are placed and from which these tasks are executed. More information about blocking queues can be seen in the video " Blocking queue - Collections #5 - Advanced Java ". You can also read the review “ Blocking queues of the concurrent package ” and the answer to the question “ When to prefer LinkedBlockingQueue over ArrayBlockingQueue? ” Super simplified - BlockingQueue
(blocking queue) blocks a thread, in two cases:
- a thread is trying to get elements from an empty queue
- the thread is trying to put elements into a full queue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
As we can see, implementations are created inside factory methods ExecutorService
. And that's basically it ThreadPoolExecutor
. Only the attributes that affect the work change.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
As we previously saw, inside factory methodsThreadPoolExecutor
, . The functionality is affected by what values are passed as the maximum and minimum threads, as well as what queue is used. And any implementation of the interface can be used java.util.concurrent.BlockingQueue
. Speaking of ThreadPoolExecutor
'ahs, it is worth noting interesting features during operation. For example, you cannot send tasks to ThreadPoolExecutor
if there is no space there:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
This code will fail with an error like:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
That is, task
you cannot submit, because SynchronousQueue
it is designed in such a way that it actually consists of one element and does not allow you to put more there. As we can see, queued tasks
there is 0 here, and there is nothing strange in this, because this is specific SynchronousQueue
- in fact, it is a queue of 1 element, which is always empty. (!) When one thread puts an element into the queue, it will wait until another thread takes the element from the queue. Therefore, we can replace with new LinkedBlockingQueue<>(1)
and what will be indicated in the error will change queued tasks = 1
. Because the queue is only 1 element, then we cannot add the second one. And we will fall on this. Continuing the theme of the queue, it is worth noting that the class ThreadPoolExecutor
has additional methods for servicing the queue. For example, the method threadPoolExecutor.purge()
will remove all canceled tasks from the queue to free up space in the queue. Another interesting feature related to the queue is the unaccepted task handler:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
For example, the handler simply prints a word Rejected
for each refusal to accept a task into the queue. Convenient, isn't it? In addition, ThreadPoolExecutor
he has an interesting heir - ScheduledThreadPoolExecutor
who is ScheduledExecutorService
. It provides the ability to perform a task on a timer.
ScheduledExecutorService
ExecutorService
type ScheduledExecutorService
allow you to run tasks according to a schedule. Let's look at an example:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Everything is simple here. Tasks are sent, we receive a “scheduled task” java.util.concurrent.ScheduledFuture
. The following case may also be useful with the schedule:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Here we send Runnable
the task to be executed at a fixed rate (Fixed Rate) with a certain delay. In this case, after 1 second every 2 seconds, start executing the task. There is a similar option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
But here tasks are executed with a given interval BETWEEN the execution of different tasks. That is, the task task
will be completed in 1 second. Next, as soon as it is completed, 2 seconds will pass, and then a new task will be launched. You can read the following materials on this topic:
- An introduction to thread pools
- Introduction to Thread Pools
- Java Multithreading Steeplechase: Canceling Tasks In Executors
- Picking correct Java executors for background tasks
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
In addition to the thread pools mentioned above, there is one more. You could say he's a little special. Its name is Work Stealing Pool. In short, Work Stealing is a work algorithm in which idle threads begin to take tasks from other threads or tasks from the general queue. Let's look at an example:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
If we run this code, ExecutorService
it will create 5 threads, because each thread will join the wait queue at the object's location lock
. We have already discussed about monitors and locks on it in “ You Can’t Spoil Java with a Thread: Part II - Synchronization .” And now we will replace it Executors.newCachedThreadPool
with Executors.newWorkStealingPool()
. What will change? We will see that our tasks are performed not in 5 threads, but in fewer. Remember that cachedThreadPool
you created your own thread for each task? Because wait
it blocked the thread, but the next tasks wanted to be executed and new threads were created in the pool for them. In the case of StealingPool
threads, they will not idle forever in wait
, they will start executing neighboring tasks. How is this so different from other thread pools WorkStealingPool
? Because there is actually something magical living inside of him ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
There is actually one more difference. Threads that are created by ForkJoinPool
default are daemon threads, as opposed to threads created through regular ThreadPool
. In general, it is worth remembering about daemon threads, because... for example, CompletableFuture
daemon threads are also used, if you do not specify your own ThreadFactory
, which will create non-daemon threads. These are the kind of surprises that can await you in an unexpected place!)
Fork/Join Pool
In this part we will talk about the same oneForkJoinPool
(also called fork/join framework) that lives “under the hood” of WorkStealingPool
. In general, the Fork Join Framework appeared in Java 1.7. And even if Java 11 is already in the yard, it’s still worth remembering. Not the most common task, but quite interesting. There is a good review on this topic on the Internet: “ Fork/Join Framework in Java 7 ”. Fork/JoinPool
operates in his work with such a concept as java.util.concurrent.RecursiveTask
. There is also an analogue - java.util.concurrent.RecursiveAction
. RecursiveActions do not return a result. Thus RecursiveTask
similar to Callable
, and RecursiveAction
similar to Runnable
. Well, looking at the name, we see two key methods - fork
and join
. The method fork
runs a task asynchronously in a separate thread. And the method join
allows you to wait for the work to complete. There are several ways to use it: This picture is part of a slide from Alexey Shipilev’s report “ Fork/Join: implementation, use, performance .” To make it clearer, it’s worth watching his report at JEE CONF: “ Fork Join implementation features .”
Summarizing
So, here we are, finishing the next part of the review. We figured out what we first came up withExecutor
for executing threads. Then we decided to continue the idea and came up with it ExecutorService
. ExecutorService
allows you to send tasks for execution using submit
and invoke
, as well as manage the service by turning it off. Because ExecutorService
'we need implementations, we wrote a class with factory methods and called it Executors
. It allows you to create thread pools ThreadPoolExecutor
. At the same time, there are thread pools that also allow you to specify a schedule for execution, but it WorkStealingPool
is hidden behind ForkJoinPool
. I hope that what was written above was not only interesting to you, but also understandable) I am always happy to receive suggestions and comments. #Viacheslav
GO TO FULL VERSION