java, CompletableFuture를 활용한 병렬처리와 함수형 프로그래밍
자바 8 이전의 Future의 사용
- Future는 미래의 결과를 얻기 위한 인터페이스다.
- Executor에 수행할 작업(Callable, Runnable)을 인자로 주입하면 Future 객체를 리턴 받는다. Future#get()으로 미래의 결과를 받을 때까지의 메인 스레드의 코드블럭은 논블로킹으로 동작한다.
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return doSomethingLongComputation();
}
});
// future.get()이 별도의 작업을 할 때까지, 메인스레드가 추가작업을 할 수 있다.
try {
String result = future.get(); // future의 결과가 발생하면 블록킹이 풀린다.
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
CompletableFuture
- Future의 기능을 확장한 CompletableFuture는 Future를 대체한다.
- Future는 하위 스레드의 동작 결과로서 한정되나 CompletableFuture의 경우 스레드와 관계 없이 그것의 결과와 행동을 조정할 수 있다.
- CompletableFuture의 동작의 완료는 해당 객체의 complete() 메서드를 통해 결정되며, 아래와 같은 방식으로 코드가 작성된다.
CompletableFuture<String> future = new CompletableFuture<>();
assert !future.isDone();
future.complete("good");
assert future.isDone();
String result = future.join();
assert result.equals("good");
- isDone은 스레드가 종료되었는지를 확인하는 메서드이다.
-
join(혹은 get)은 앞서의 Future#get과 동일한 역할을 수행한다. 다만, get은 InterruptException 등 checked exception을 명시적으로 처리해야 하지만 join은 그렇지 않다. CompletableFuture 객체에서 예외에 대한 처리 기능이 이미 내장되어 있으므로, 코드 작성에 간편한 join을 주로 사용한다.
- 예외 상황이 발생할 경우 아래와 같이 completeExceptionally()를 사용하여 해당 예외 객체를 수집한다.
CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("예외 발생!"));
assert future.isCompletedExceptionally();
CompletableFuture와 스레드
- CompletableFuture를 Thread 객체와 함께 사용할 때는 다음과 같이 코드를 작성한다.
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> future.complete(Thread.currentThread().getName())).start();
System.out.println(future.join()); // Thread-3
- CompletableFuture를 사용하여 좀 더 확장성 있게 통제하는 코드는 아래와 같다.
void main() {
CompletableFuture<Integer> future = getPriceAsync("apple");
CompletableFuture<Integer> result = future
.exceptionally(t -> -1) // 예외가 발생시 -1을 응답
.completeOnTimeout(-2, 10, TimeUnit.MILLISECONDS); // 타임아웃이 발생시 -2를 응답
Integer join = result.join();
System.out.println(join);
}
public CompletableFuture<Integer> getPriceAsync(String product){
CompletableFuture<Integer> future = new CompletableFuture<>();
new Thread(() -> {
try{
future.complete(doSomeLongTakenJob()); // 성공에 대한 값을 삽입
}catch (Exception e){
future.completeExceptionally(e); // 예외에 대한 값을 삽입
}
}).start();
return future;
}
private Integer doSomeLongTakenJob() {
// throw new IllegalArgumentException("!"); // exceptionally에 따라 -1을 응답
// delay(100); return 1; // completeOnTimeout에 따라 -2를 응답
// return 1; // // 별다른 예외상황이 없으므로 정상적으로 1을 응답
}
- CompletableFuture 덕분에 표준 인터페이스를 활용하여 스레드의 동작 완료와 예외를 쉽고 명시적으로 통제할 수 있게 되었다.
-
메서드 체이닝을 사용하여 exceptionally와 completeOnTimeout 등 다양한 에러에 대응 가능하다.
- 아래는 직접 Thread 객체를 사용하지 않고 팩터리 메서드로 간편하게 CompletableFuture를 생성하는 방법이다.
public CompletableFuture<Integer> getPriceAsync(String product){
return CompletableFuture.supplyAsync(() -> doSomeLongTakenJob())
}
- supplyAsync의 경우 값을 리턴하는 로직에 사용하여 Supplier를 인자로 한다. 리턴값이 필요 없다면 runAsync을 사용하여 Runnable을 인자로 삽입한다.
- supplyAsync에는 두 번째 인자를 삽입할 수 있으며 스레드풀인 Executor을 그 타입으로 한다. 인자로 삽입하지 않을 경우 JVM에서 생성한 ForkJoinPool의 스레드를 사용한다.
- 결과적으로 CompletableFuture란 표준 인터페이스를 활용하여 쉽고 단순하고 명확한 방식으로 병렬 프로그래밍을 할 수 있게 되었다.
Executor의 활용
- CompletableFuture를 스레드와 사용할 때는, 쓰레드 객체를 직접 생성할 수도 있고, 앞서의 예제처럼 JVM이 자동으로 생성하는 ForkJoinPool을 사용할 수 있다. 사실 성능 및 사용성과 편의성 문제로 ForkJoinPool을 사용하는 것이 합리적인 선택이다.
- 다만 ForkJoinPool의 스레드 갯수는 많지 않다. IO 대기가 자주 발생하는 상황에서 스레드의 갯수가 부족할 수 있고, 그 여파가 어플리케이션 전역으로 전파될 수도 있다. 왜냐하면 다른 로직 또한 같은 ForkJoinPool을 바라보고 있기 때문이다.
- 그러므로 IO를 사용하거나 기타 이유로 별도의 스레드풀 객체인 Executor를 생성하여 사용할 수 있다.
- 아래는 Executor의 스레드를 사용하였으며, 스트림을 사용하여 본격적으로 병렬처리를 수행하는 코드이다.
ExecutorService executor = Executors.newFixedThreadPool(10, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
List<CompletableFuture<String>> futures = Stream.of("kim", "lee", "choi")
.map(n -> CompletableFuture.supplyAsync(() -> String.format("%s 학생의 점수는 %d 입니다.", n, evaluateGrade(n)), executor))
.toList();
List<String> result = futures.stream()
.map(CompletableFuture::join)
.toList();
System.out.println(result);
private Integer evaluateGrade(String name) {
delay(100); // 100 ms 만큼의 시간이 걸리는 작업이다.
return ThreadLocalRandom.current().nextInt(name.length(), 100);
}
- 위의 코드는 학생 세 명의 성적을 병렬처리로 판정(evaluateGrade)하고 그 결과를 프린트하는 로직이다.
- supplyAsync의 두 번째 인자로 ExecutorService 객체를 삽입한다. 이 경우 ForkJoinPool이 아닌 해당 스레드 풀을 사용하게 된다.
- ExecutorService를 초기화 할 때, 데몬 스레드를 설정하는 setDaemon을 true로 설정하였다. JVM은 종료하기 전 모든 스레드의 종료를 기다리지만 데몬의 경우 기다리지 않는다.
CompletableFuture의 값의 조작과 합성
- stream api가 map과 filter를 사용하여 Loop Fusion을 하는 것과 같이 CompletableFuture 역시 CompletableFuture를 합성할 수 있다. 각 퓨처의 결과를 다른 퓨처가 인수 받아 그 값을 조작하거나 추가적인 비동기 작업을 수행할 수 있다. 이때 사용되는 체이닝 메서드의 이름은 thenSomething의 형태이다. 이번에 다룰 메서드는 thenCompose와 thenApply, thenCombine이다.
List<CompletableFuture<Void>> futures = Stream.of("kim", "lee", "choi")
.map(n -> CompletableFuture.supplyAsync(() -> String.format("%s 학생의 점수는 %d 입니다.", n, evaluateGrade(n))))
.map(f -> f.thenApply(s -> String.format("<span class='grade'>%s</span>", s)))
.map(f -> f.thenCompose(s -> CompletableFuture.runAsync(() -> advertiseGrades(s))))
.toList();
private void advertiseGrades(String s) {
delay(100);
System.out.println("학교 모든 게시판에 다음의 게시글이 업로드 되었습니다 : " + s);
}
- 첫 번째 map에서 supplyAsync를 사용해 evaluateGrade로 성적 계산을 병렬처리하고 그것의 결과를 문자열로 변환한다.
- 두 번재 map에서 thenApply를 사용해 퓨처의 결과값을 html 형태에 맞춰 수정한다. 그리고 다시 CompletableFuture로 리턴한다.
- 세 번째 map에서 thenCompose를 통해 추가적인 비동기 업무를 수행한다. 앞서의 CompletableFuture가 처리한 결과값을 advertiseGrades 메서드의 인자로 하여 처리한다.
-
thenApply와 thenCompose는 thenApplyAsync와 thenComposeAsync로 대체할 수 있다. 전자의 경우 앞의 퓨처가 사용한 스레드를 사용하고 후자는 별도의 스레드를 사용한다. 위의 로직은 오버헤드를 지불하면서까지 별도의 스레드를 사용할 이유가 없으므로 전자의 방식을 택했다.
- 앞서의 코드는 인자의 결과가 순차적인 로직에 따라 전달 및 처리되었다.
- 어떤 경우는 서로의 결과에 의존하지 않아 각 각의 로직을 병렬로 처리한 후, 그 결과물을 합성하는 것이 더 효과적일 수 있다. 이런 경우 thenCombine을 사용한다.
- 아래 예제는 thenCombine을 사용하여 두 개의 퓨처가 1) 학생의 신상정보(findPersonalInformation)와 2) 학생의 성적(evaluateGrade)을 병렬적으로 처리한 후, BiFunction을 사용하여 두 퓨처의 결과값을 합성하는 로직이다.
List<CompletableFuture<Void>> futures = Stream.of("kim", "lee", "choi")
.map(n -> CompletableFuture.supplyAsync(() -> findPersonalInformation(n)) // 첫 번째 퓨처
.thenCombine(CompletableFuture.supplyAsync(() -> evaluateGrade(n)) // 두 번째 퓨처
, (info, grade) -> String.format("%s의 점수는 %d입니다.", info, grade) // 첫 번째와 두 번째의 결과를 합성
)
)
.map(f -> f.thenApply(s -> String.format("<span class='grade'>%s</span>", s)))
.map(f -> f.thenCompose(s -> CompletableFuture.runAsync(() -> advertiseGrades(s))))
.toList();
private String findPersonalInformation(String name) {
delay(100);
String region = List.of("서울", "대전", "부산", "광주", "제주", "강원").get(ThreadLocalRandom.current().nextInt(5));
return String.format("%s 출신의 학생 %s", region, name);
}
allOf, anyOf
- 앞서의 예제에서는 CompletableFuture#join을 사용하여 병렬처리를 블로킹하고 결과값을 응답 받았다.
- CompletableFuture#allOf를 사용할 경우 명시적으로 모든 퓨처를 실행하고 결과값이 생성될 때까지 블록킹한다.
List<CompletableFuture<String>> futures = Stream.of("kim", "lee", "choi")
.map(n -> CompletableFuture.supplyAsync(() -> String.format("%s 학생의 점수는 %d 입니다.", n, evaluateGrade(n)), executor))
.toList();
// List<String> result = futures.stream().map(CompletableFuture::join).toList();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
futures.stream().map(CompletableFuture::join).forEach(System.out::println);
- anyOf는 가장 빨리 끝나는 퓨처 하나를 리턴한다.
CompletableFuture<Object> result = CompletableFuture.anyOf(futures.toArray(CompletableFuture[]::new));
System.out.println(result.join()); // kim 학생의 점수는 83 입니다.