Introduction
We already looked at how threads are created in
Part 1 . Let's remember again.
A thread is
Thread
something running in it
run
, so let's use
tutorialspoint java online compiler and run the following code:
public class HelloWorld {
public static void main(String []args){
Runnable task = () -> {
System.out.println("Hello World");
};
new Thread(task).start();
}
}
Is this the only way to start a task in a thread?
java.util.concurrent.Callable
It turns out that
java.lang.Runnable has a brother and his name is
java.util.concurrent.Callable and he was born in Java 1.5. What are the differences? If we take a closer look at the JavaDoc of this interface, we see that, unlike
Runnable
, the new interface declares a method
call
that returns a result. Also, it throws Exception by default. That is, it saves us from the need to write
try-catch
blocks for checked exceptions. Already good, right? Now we have
Runnable
a new task instead:
Callable task = () -> {
return "Hello, World!";
};
But what to do with it? Why do we even need a task running on a thread that returns a result? Obviously, in the future we expect to receive the result of actions that will be performed in the future. Future in English - Future. And there is an interface with exactly the same name:
java.util.concurrent.Future
java.util.concurrent.Future
The java.util.concurrent.Future interface describes an API for working with tasks whose results we plan to receive in the future: methods for obtaining the result, methods for checking the status. We
Future
are interested in its implementation of
java.util.concurrent.FutureTask . That is
Task
, it is to be executed in
Future
. What makes this implementation even more interesting is that it implements and
Runnable
. You can think of it as a sort of adapter between the old threading model and the new model (new in the sense that it was introduced in java 1.5). Here is an example:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class HelloWorld {
public static void main(String []args) throws Exception {
Callable task = () -> {
return "Hello, World!";
};
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.get());
}
}
get
As you can see from the example, we get the result from the task using the method
task
.
(!)Important, that at the moment the result is received using the method,
get
the execution becomes synchronous. What mechanism do you think will be used here? That's right, there is no synchronization block - therefore, we will see
WAITING in JVisualVM not as
monitor
or
wait
, but as the same one
park
(because the mechanism is used
LockSupport
).
Functional interfaces
Next, we will talk about classes from Java 1.8, so it will not be superfluous to make a brief introduction. Let's look at the following code:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "String";
}
};
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Function<String, Integer> converter = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.valueOf(s);
}
};
How much extra code, right? Each of the declared classes performs a single function, but to describe it, we use a bunch of extra helper code. And the Java developers thought so too. Therefore, they introduced a set of "functional interfaces" (
@FunctionalInterface
) and decided that now Java itself will "think out" everything for us, except for the important:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier
- provider. It has no parameters, but returns something, i.e. supplies it.
Consumer
- consumer. It takes something as input (parameter s) and does something with it, that is, it consumes something. There is another function. It takes something as input (parameter
s
), does something and returns something. As we can see, generics are actively used. In case of uncertainty, you can remember about them and read "
The theory of generics in Java or how to put brackets in practice ".
CompletableFuture
As time went on, Java 1.8 introduced a new class called
CompletableFuture
. It implements the interface
Future
, that is, ours
task
will be executed in the future, and we will be able to execute
get
and get the result. But it also implements some
CompletionStage
. From the translation, its purpose is already clear: this is a certain stage (Stage) of some calculations. For a brief introduction to the topic, see
Introduction to CompletionStage and CompletableFuture . Let's get straight to the point. Let's take a look at the list of available static methods to help us get started:
Here are some examples of how to use them:
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String []args) throws Exception {
CompletableFuture<String> completed;
completed = CompletableFuture.completedFuture("Просто meaning");
CompletableFuture<Void> voidCompletableFuture;
voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("run " + Thread.currentThread().getName());
});
CompletableFuture<String> supplier;
supplier = CompletableFuture.supplyAsync(() -> {
System.out.println("supply " + Thread.currentThread().getName());
return "Значение";
});
}
}
If we execute this code, we will see that creation
CompletableFuture
implies the launch of the entire chain. Therefore, with some similarity with the SteamAPI from Java8, this is the difference between these approaches. For example:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
System.out.println("Executed");
return value.toUpperCase();
});
This is a Java 8 Stream Api example (you can read more about it here "
Java 8 Stream API Guide with Pictures and Examples "). If you run this code, it
Executed
will not display. That is, when creating a stream in Java, the stream does not start immediately, but waits for a value to be wanted from it. But
CompletableFuture
it launches the chain for execution immediately, without waiting for it to be asked for the calculated value. I think it's important to understand this. So we have a CompletableFuture. How can we make a chain and what means do we have? Recall the functional interfaces that we wrote about earlier.
- We have a function (
Function
) that takes A and returns B. It has a single method - apply
(apply).
- We have a consumer (
Consumer
) that accepts A and returns nothing (Void). Has a single method - accept
(accept).
- We have code running on a thread
Runnable
that does not accept or return. It has a single method - run
(run).
The second thing to remember is that it uses consumers and functions
CompletalbeFuture
in its work .
Runnable
Given this, you can always remember what
CompletableFuture
you can do with like this:
public static void main(String []args) throws Exception {
AtomicLong longValue = new AtomicLong(0);
Runnable task = () -> longValue.set(new Date().getTime());
Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
Consumer<Date> printer = date -> {
System.out.println(date);
System.out.flush();
};
CompletableFuture.runAsync(task)
.thenApply((v) -> longValue.get())
.thenApply(dateConverter)
.thenAccept(printer);
}
Methods
thenRun
,
thenApply
and
thenAccept
have versions
Async
. This means that these stages will be executed in a new thread. It will be taken from a special pool, so it is not known in advance which stream will be, new or old. It all depends on how difficult the tasks are. In addition to these methods, there are three more interesting possibilities. For clarity, let's imagine that we have a certain service that receives some message from somewhere and it takes time:
public static class NewsService {
public static String getMessage() {
try {
Thread.currentThread().sleep(3000);
return "Message";
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
Now, let's look at other features that
CompletableFuture
. We can combine the result
CompletableFuture
with the result of another
CompletableFuture
:
Supplier newsSupplier = () -> NewsService.getMessage();
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
.thenCombine(reader, (a, b) -> b + a)
.thenAccept(result -> System.out.println(result))
.get();
It's worth noting here that threads will be daemon threads by default, so for clarity we use
get
to wait for the result. And we can not only combine (combine), but also return
CompletableFuture
:
CompletableFuture.completedFuture(2L)
.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
.thenAccept(result -> System.out.println(result));
Here I would like to note that for brevity, the method is used
CompletableFuture.completedFuture
. This method does not create a new thread, so the rest of the chain will be executed on the same thread that was called
completedFuture
. There is also a method
thenAcceptBoth
. It is very similar to
accept
, but if
thenAccept
it accepts
consumer
, then
thenAcceptBoth
it takes another
CompletableStage
+ as input
BiConsumer
, that is
consumer
, which takes 2 sources as input, not one. There is also an interesting possibility with the word
Either
:
These methods accept an alternative
CompletableStage
and will be executed on the one
CompletableStage
that is executed first. And I would like to finish this review with another interesting feature
CompletableFuture
- error handling.
CompletableFuture.completedFuture(2L)
.thenApply((a) -> {
throw new IllegalStateException("error");
}).thenApply((a) -> 3L)
.thenAccept(val -> System.out.println(val));
This code will not do anything, because an exception will be thrown and nothing will happen. But if we uncomment the
exceptionally
, then we define the behavior. On the subject,
CompletableFuture
I also advise you to watch the following video:
In my humble opinion, these videos are one of the most visual on the Internet. From them it should be clear how it all works, what kind of arsenal we have and why all this is needed.
Conclusion
I hope it's clear now how you can use streams to get calculations after they've been computed. Additional material:
#Viacheslav
GO TO FULL VERSION