JavaRush /Java Blog /Random-KO /스레드로 Java를 망칠 수는 없습니다: 5부 - 실행자, ThreadPool, 포크 조인
Viacheslav
레벨 3

스레드로 Java를 망칠 수는 없습니다: 5부 - 실행자, ThreadPool, 포크 조인

Random-KO 그룹에 게시되었습니다

소개

따라서 우리는 " 스레드로 Java를 망칠 수 없습니다: 1부 - 스레드 " 리뷰에서 읽을 수 있는 스레드가 Java에 있다는 것을 알고 있습니다 . 스레드로 Java를 망칠 수는 없습니다: 5부 - 실행자, ThreadPool, Fork Join - 1샘플 코드를 다시 살펴보겠습니다.
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
보시다시피 작업을 시작하는 코드는 매우 표준적이지만 새로 시작할 때마다 이를 반복해야 합니다. 한 가지 해결책은 이를 별도의 메서드(예: )로 옮기는 것입니다 execute(Runnable runnable). 그러나 Java 개발자는 이미 우리를 걱정하고 인터페이스를 제시했습니다 Executor.
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
보시다시피, 코드가 더욱 간결해졌고 Runnable스레드에서 실행하기 위한 코드를 간단히 작성할 수 있게 되었습니다. 좋아요, 그렇죠? 그러나 이것은 시작에 불과합니다. 스레드로 Java를 망칠 수는 없습니다: 파트 V - 실행자, ThreadPool, 포크 조인 - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

보시다시피 인터페이스에는 Executor하위 인터페이스가 있습니다 ExecutorService. 이 인터페이스의 JavaDoc에서는 작업 'a'를 중지하는 방법을 제공 하고 실행 진행 상황을 추적 할 수 있는 ExecutorService특수 'a'에 대한 설명이라고 말합니다. 이전에 " Thread로 Java를 망칠 수는 없습니다: Part IV - Callable, Future 및 Friends "에서 가능성에 대해 간략하게 검토했습니다 . 잊어버렸거나 읽지 않았다면 기억을 되새기시기 바랍니다. ;) JavaDoc의 또 다른 흥미로운 점은 무엇입니까? 기본적으로 사용 가능한 구현을 생성할 수 있는 특수 팩토리가 있습니다 . ExecutorExecutorjava.util.concurrent.FutureFuturejava.util.concurrent.ExecutorsExecutorService

실행자 서비스

다시 기억합시다. Executor스레드 생성 구현이 우리에게 숨겨져 있을 때 스레드에서 특정 작업을 실행(즉, 실행)해야 합니다 . 실행 진행 상황을 관리하는 일련의 기능을 갖춘 ExecutorService특별한 기능이 있습니다 . 그리고 우리는 당신이 만들 수 있는 Executor공장을 가지고 있습니다 . 이제 스스로 해봅시다: ExecutorsExecutorService
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
보시다시피 Fixed Thread Pool크기 2의 고정 스레드 풀( )을 지정했습니다. 그 후 작업을 하나씩 풀로 보냅니다. 각 작업은 String스레드 이름( currentThread().getName())이 포함된 문자열( )을 반환합니다. 마지막에 종료하는 것이 중요합니다 ExecutorService. 그렇지 않으면 프로그램이 종료되지 않기 때문입니다. 팩토리에는 Executors다른 팩토리 메소드가 있습니다 . 예를 들어, 단 하나의 스레드로 구성된 풀을 만들 newSingleThreadExecutor거나 캐싱이 있는 풀을 만들 수 있습니다 newCachedThreadPool. 여기서 스레드는 1분 동안 유휴 상태이면 풀에서 제거됩니다. 실제로 이들 뒤에는 작업이 배치되고 해당 작업이 실행되는 차단 대기열이ExecutorService 있습니다 . 차단 대기열에 대한 자세한 내용은 " Blocking queue - Collections #5 - Advanced Java " 비디오에서 확인할 수 있습니다 . 또한 " 동시 패키지의 대기열 차단 " 리뷰 와 " ArrayBlockingQueue보다 LinkedBlockingQueue를 선호하는 경우는 언제입니까? " 라는 질문에 대한 답변을 읽을 수도 있습니다. 매우 단순화됨 - (차단 대기열)은 두 가지 경우에 스레드를 차단합니다. BlockingQueue
  • 스레드가 빈 큐에서 요소를 가져오려고 합니다.
  • 스레드가 요소를 전체 대기열에 넣으려고 합니다.
팩토리 메소드의 구현을 살펴보면 메소드가 어떻게 구성되어 있는지 확인할 수 있습니다. 예를 들어:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
또는
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
보시다시피 구현은 팩토리 메소드 내부에서 생성됩니다 ExecutorService. 그리고 그것은 기본적으로 그것입니다 ThreadPoolExecutor. 작업에 영향을 미치는 속성만 변경됩니다. 스레드로 Java를 망칠 수는 없습니다: Part V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

이전에 살펴본 것처럼 팩토리 메소드 내부에서는 ThreadPoolExecutor. 기능은 최대 및 최소 스레드로 전달되는 값과 사용되는 큐에 따라 영향을 받습니다. 그리고 인터페이스의 모든 구현을 사용할 수 있습니다 java.util.concurrent.BlockingQueue. '아' 라고 말하면 ThreadPoolExecutor작동 중에 흥미로운 기능을 주목할 가치가 있습니다. ThreadPoolExecutor예를 들어, 공간이 없으면 작업을 보낼 수 없습니다 .
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
이 코드는 다음과 같은 오류로 인해 실패합니다.
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
즉, task제출할 수 없습니다. SynchronousQueue실제로는 하나의 요소로 구성되며 거기에 더 많은 요소를 넣을 수 없도록 설계되었습니다. 보시다시피 queued tasks여기에는 0이 있고 여기에는 이상한 것이 없습니다. 이는 구체적입니다 SynchronousQueue. 실제로는 항상 비어 있는 1개 요소의 대기열입니다. (!) 한 스레드가 요소를 대기열에 넣으면 다른 스레드가 대기열에서 요소를 가져올 때까지 기다립니다. 따라서 로 대체할 수 new LinkedBlockingQueue<>(1)있으며 오류에 표시되는 내용은 변경됩니다 queued tasks = 1. 왜냐하면 대기열에 요소가 1개뿐이면 두 번째 요소를 추가할 수 없습니다. 그리고 우리는 이것에 빠질 것입니다. 대기열의 주제를 계속해서 살펴보면, 클래스에 ThreadPoolExecutor대기열 서비스를 위한 추가 메서드가 있다는 점에 주목할 가치가 있습니다. 예를 들어 이 메서드는 threadPoolExecutor.purge()대기열에서 취소된 모든 작업을 제거하여 대기열의 공간을 확보합니다. 대기열과 관련된 또 다른 흥미로운 기능은 허용되지 않는 작업 처리기입니다.
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Rejected예를 들어, 처리기는 대기열에 대한 작업 수락을 거부할 때마다 단어를 인쇄합니다 . 편리하지 않나요? 게다가 ThreadPoolExecutor그에게는 흥미로운 상속인이 ScheduledThreadPoolExecutor있습니다 ScheduledExecutorService. 타이머에 따라 작업을 수행하는 기능을 제공합니다.

ScheduledExecutorService

ExecutorService유형을 사용하면 ScheduledExecutorService일정에 따라 작업을 실행할 수 있습니다. 예를 살펴보겠습니다:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
여기에서는 모든 것이 간단합니다. 작업이 전송되면 "예약된 작업"이 수신됩니다 java.util.concurrent.ScheduledFuture. 다음과 같은 경우도 일정에 유용할 수 있습니다.
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Runnable여기에서는 특정 지연을 포함하여 고정된 속도로 실행될 작업을 보냅니다 . 이 경우 2초마다 1초 후에 작업 실행을 시작합니다. 비슷한 옵션이 있습니다:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
그러나 여기서 작업은 다른 작업 실행 사이에 지정된 간격으로 실행됩니다. 즉, 작업은 task1초 안에 완료됩니다. 다음 작업이 완료되자마자 2초가 지나면 새 작업이 시작됩니다. 이 주제에 대해 다음 자료를 읽을 수 있습니다. 스레드로 Java를 망칠 수는 없습니다: 5부 - 실행자, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

일훔치기풀

위에서 언급한 스레드 풀 외에도 하나 더 있습니다. 그 사람은 좀 특별하다고 할 수 있죠. 그 이름은 Work Stealing Pool입니다. 간단히 말해서, Work Stealing은 유휴 스레드가 다른 스레드에서 작업을 가져오거나 일반 대기열에서 작업을 가져오기 시작하는 작업 알고리즘입니다. 예를 살펴보겠습니다:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
이 코드를 실행하면 ExecutorService5개의 스레드가 생성됩니다. 각 스레드는 개체 위치의 대기 대기열에 합류합니다 lock. 우리는 이미 " You Can't Spoil Java with a Thread: Part II -Synchronization " 에서 모니터와 잠금에 대해 논의했습니다 . 이제 이를 Executors.newCachedThreadPool. Executors.newWorkStealingPool()무엇이 바뀔까요? 우리의 작업이 5개의 스레드가 아닌 더 적은 스레드에서 수행되는 것을 볼 수 있습니다. cachedThreadPool각 작업에 대해 자신만의 스레드를 생성했다는 것을 기억하시나요 ? 스레드를 차단 했지만 wait다음 작업이 실행되기를 원했고 이를 위해 풀에 새 스레드가 생성되었습니다. 스레드 의 경우 에서는 StealingPool영원히 유휴 상태가 아니며 wait인접한 작업을 실행하기 시작합니다. 이것이 다른 스레드 풀과 어떻게 다릅니까 WorkStealingPool? 실제로 그 사람 안에는 마법 같은 무언가가 살고 있기 때문입니다 ForkJoinPool.
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
사실 차이점이 하나 더 있습니다. 기본적 으로 생성되는 스레드는 ForkJoinPool일반 스레드를 통해 생성되는 스레드와 달리 데몬 스레드입니다 ThreadPool. 일반적으로 데몬 스레드에 대해 기억해 두는 것이 좋습니다. 왜냐하면... 예를 들어, 데몬 스레드가 아닌 스레드를 생성하는 CompletableFuture자신만의 를 지정하지 않으면 데몬 스레드도 사용됩니다 . ThreadFactory예상치 못한 곳에서 여러분을 기다릴 수 있는 놀라운 일들이 있습니다!)

포크/조인 풀

ForkJoinPool이 부분에서는 의 "내부"에 있는 동일한 프레임워크(포크/조인 프레임워크라고도 함)에 대해 이야기하겠습니다 WorkStealingPool. 일반적으로 Fork Join Framework는 Java 1.7에 등장했습니다. Java 11이 이미 사용 중이더라도 여전히 기억할 가치가 있습니다. 가장 일반적인 작업은 아니지만 매우 흥미롭습니다. 인터넷에는 이 주제에 대한 좋은 리뷰가 있습니다: " Java 7의 Fork/Join Framework ". Fork/JoinPool와 같은 개념으로 그의 작업을 수행합니다 java.util.concurrent.RecursiveTask. 아날로그도 있습니다 - java.util.concurrent.RecursiveAction. RecursiveActions는 결과를 반환하지 않습니다. 따라서 와 유사하고 와 RecursiveTask유사 합니다 . 글쎄, 이름을 보면 두 가지 주요 메소드인 및 가 표시됩니다 . 이 메서드는 별도의 스레드에서 비동기적으로 작업을 실행합니다. 그리고 이 방법을 사용 하면 작업이 완료될 때까지 기다릴 수 있습니다. 이를 사용하는 방법에는 여러 가지가 있습니다. 이 그림은 Alexey Shipilev의 보고서 " 포크/조인: 구현, 사용, 성능 " 슬라이드의 일부입니다 . 더 명확하게 설명하려면 JEE CONF에서 그가 작성한 보고서인 " Fork Join 구현 기능 "을 살펴보는 것이 좋습니다 . CallableRecursiveActionRunnableforkjoinforkjoin스레드로 Java를 망칠 수는 없습니다: Part V - Executor, ThreadPool, Fork Join - 5

요약

그럼, 여기까지 리뷰의 다음 부분을 마치겠습니다. 우리는 스레드를 실행하기 위해 처음 생각해낸 것이 무엇인지 알아냈습니다 Executor. 그런 다음 우리는 아이디어를 계속하기로 결정하고 아이디어를 내놓았습니다 ExecutorService. 및 를 ExecutorService사용하여 실행할 작업을 보낼 수 있을 뿐만 아니라 서비스를 꺼서 관리할 수도 있습니다. 왜냐하면 '우리는 구현이 필요합니다. 우리는 팩토리 메소드를 사용하여 클래스를 작성하고 이를 . 스레드 풀을 생성할 수 있습니다 . 동시에 실행 일정을 지정할 수 있지만 숨기는 스레드 풀도 있습니다 . 위에 쓴 내용이 여러분에게 흥미로울 뿐만 아니라 이해하기 쉽기를 바랍니다.) 제안과 의견을 받는 것은 언제나 기쁘게 생각합니다. #비아체슬라프submitinvokeExecutorServiceExecutorsThreadPoolExecutorWorkStealingPoolForkJoinPool
코멘트
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION