Introduction
So, we know that Java has threads, as you can read about in the overview " Threads Don't Screw Java: Part I - Threads ". Let's look at the sample code again:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
As we can see, the code for launching the task is quite typical, but for each new launch we will have to repeat it. One solution is to move it to a separate method, for example, execute(Runnable runnable)
. But the Java developers have already taken care of us and come up with an interface Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
As you can see, the code has become more concise and allowed us to simply write the code to run Runnable
in the thread. It's great, isn't it? But this is just the beginning:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
has a child interface ExecutorService
. The JavaDoc for this interface says that ExecutorService
it is a description of a special Executor
'a that provides methods to stop 'a's work Executor
and allows you to get 'a's java.util.concurrent.Future
to track progress. Earlier, in " You Can't Mess with Java Threads: Part IV - Callable, Future, and Friends " we already had a brief look at the possibilities Future
. Who forgot or didn't read - I advise you to refresh your memory;) What else is interesting in JavaDoc? That we have a special factory java.util.concurrent.Executors
that allows us to create default implementations ExecutorService
.
ExecutorService
Let's remember again. We haveExecutor
to execute (i.e., execute) a certain task in a thread, when the implementation of creating a thread is hidden from us. We have ExecutorService
- special Executor
, which has a set of features for managing the progress of execution. And we have a factory Executors
that allows us to create ExecutorService
. Let's do it ourselves now:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
As we can see, we have specified a fixed thread pool ( Fixed Thread Pool
) of size 2. After that, we send tasks to the pool one by one. Each task returns a string ( String
) containing the name of the thread ( currentThread().getName()
). It is important to shutdown at the very end for ExecutorService
, because otherwise our program will not end. Executors
There are other factory methods in the factory . For example, we can create a pool of just one thread - newSingleThreadExecutor
or a caching pool newCachedThreadPool
when threads are removed from the pool if they are idle for 1 minute. In fact, behind these ExecutorService
is a blocking queue , in which tasks are placed and from which these tasks are executed. You can see more about blocking queues in the video "Blocking Queue - Collections #5 - Advanced Java ". You can also read the overview " Blocking queues of the concurrent package " and the answer to the question " When to prefer LinkedBlockingQueue over ArrayBlockingQueue? ". Super simplified - BlockingQueue
(blocking queue) blocks a thread, in two cases:
- the thread is trying to get elements from an empty queue
- the thread is trying to put elements in the full queue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
As we can see, implementations are created inside the factory methods ExecutorService
. And basically this ThreadPoolExecutor
. Only the attributes that affect the work change.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
As we saw earlier, inside factory methodsThreadPoolExecutor
, . The functionality is affected by what values are passed as maximum and minimum threads, and which queue is used. And any implementation of the interface can be used java.util.concurrent.BlockingQueue
. Speaking of ThreadPoolExecutor
'ah, it is worth noting interesting features at work. For example, you cannot send tasks to ThreadPoolExecutor
if there is no space there:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
This code will fail with an error like:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
That is, task
you cannot submit, because SynchronousQueue
is arranged in such a way that it actually consists of one element and does not allow you to put more there. As we can see, queued tasks
here is 0, and there is nothing strange in this, because this is specific SynchronousQueue
- in fact, this is a queue of 1 element, which is always empty. (!) When one thread puts an element into the queue, it will wait until another thread picks up the element from the queue. Therefore, we can replace with new LinkedBlockingQueue<>(1)
and the error will change what will be indicated queued tasks = 1
. Because the queue is only 1 element, then we cannot put the second one. And let's fall on it. Continuing the theme of the queue, it is worth noting that the class ThreadPoolExecutor
has additional methods for servicing the queue. For example, the methodthreadPoolExecutor.purge()
will remove all canceled tasks from the queue to free up space in the queue. Another interesting queue-related feature is the pending task handler:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
For example, the handler simply outputs a word Rejected
for each refusal to accept a task in the queue. Convenient, isn't it? In addition, ThreadPoolExecutor
it has an interesting successor - ScheduledThreadPoolExecutor
which is ScheduledExecutorService
. It provides the ability to execute a task on a timer.
ScheduledExecutorService
ExecutorService
types ScheduledExecutorService
allow you to run tasks on a schedule (schedule). Let's look at an example:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Everything is simple here. Tasks are sent, we get a "scheduled task" java.util.concurrent.ScheduledFuture
. With a schedule, the following case may also be useful:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Here we send Runnable
the task to be executed at a fixed rate (Fixed Rate) with a certain delay. In this case, after 1 second, every 2 seconds, start executing the task. There is a similar option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
But here the tasks are executed with a given interval BETWEEN the execution of different tasks. That is, the task task
will be completed in 1 second. Further, as soon as it is completed, 2 seconds will pass, and then the new task will be launched. On this topic, you can read the following materials:
- An introduction to thread pools
- Introduction to Thread Pools
- Java Multithreading Steeplechase: Cancelling Tasks In Executors
- Picking correct Java executors for background tasks
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
In addition to the thread pools mentioned above, there is one more. You can tell he's a little special. His name is Work Stealing Pool. In short, Work Stealing is a work algorithm in which idle threads start picking up tasks from other threads or tasks from a common queue. Let's look at an example:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
If we run this code, then ExecutorService
we will create 5 threads, because each thread will queue up in the wait queue for the object's lock lock
. About monitors and locks on it, we have previously dealt with " You can't spoil Java thread: Part II - synchronization ". And now we will replace Executors.newCachedThreadPool
with Executors.newWorkStealingPool()
. What will change? We will see that our tasks are not executed in 5 threads, but less. Remember that cachedThreadPool
you created your own thread for each task? Because wait
it blocked the thread, and the following tasks wanted to be executed and new threads were created in the pool for them. In the case of with, StealingPool
the threads will not be idle forever in wait
, they will begin to perform neighboring tasks. What makes this thread so different from other thread pools WorkStealingPool
? The fact that inside it lives actually magicalForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Actually there is another difference. The threads that are created for ForkJoinPool
default are daemon threads, as opposed to the threads created via regular ThreadPool
. In general, it is worth remembering about daemon threads, because. for example, when CompletableFuture
daemon threads are also used, if you do not specify your own ThreadFactory
, which will create non-daemon threads. These are the surprises that can await in an unexpected place!)
Fork/Join Pool
In this part, we will talk about the very oneForkJoinPool
(it is also called the fork / join framework) that lives "under the hood" of WorkStealingPool
. In general, Fork Join Framework appeared in Java 1.7. And even though Java 11 is already in the yard, it’s still worth remembering. Not the most common task, but quite interesting. There is a good overview on the net about this topic: " Fork/Join Framework in Java 7 ". Fork/JoinPool
operates in his work with such a concept as java.util.concurrent.RecursiveTask
. There is also an analogue - java.util.concurrent.RecursiveAction
. RecursiveAction does not return a result. Thus RecursiveTask
similar to Callable
, but RecursiveAction
similar to Runnable
. Well, looking at the name, we see two key methods - fork
and join
. The method fork
starts some task asynchronously in a separate thread. A methodjoin
allows you to wait for the completion of the work. There are several ways to use it: This picture is part of a slide in Aleksey Shipilev's report " Fork/Join: Implementation, Usage, Performance ". To make it clearer, it's worth looking at his talk at JEE CONF: " Fork Join Implementation Features ".
Summarizing
So, here we have finished the next part of the review. We figured out what we first came up withExecutor
for executing threads. Then we decided to continue the idea and came up with ExecutorService
. ExecutorService
allows you to send tasks for execution using submit
and invoke
, as well as manage the service by turning it off. Because ExecutorService
'u need implementations, wrote a class with factory methods and named it Executors
. It allows you to create pools of thread ThreadPoolExecutor
's. At the same time, there are such thread pools that allow you to also specify a schedule for execution, but WorkStealingPool
hides ForkJoinPool
. I hope you were not only interested in what was written above, but also understandable) I am always glad to suggestions and comments. #Viacheslav