java, CompletableFuture를 활용한 병렬처리와 함수형 프로그래밍

자바 8 이전의 Future의 사용

  • Future는 미래의 결과를 얻기 위한 인터페이스다.
  • Executor에 수행할 작업(Callable, Runnable)을 인자로 주입하면 Future 객체를 리턴 받는다. Future#get()으로 미래의 결과를 받을 때까지의 메인 스레드의 코드블럭은 논블로킹으로 동작한다.
ExecutorService executor = Executors.newCachedThreadPool();

Future<String> future = executor.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return doSomethingLongComputation();
    }
});

// future.get()이 별도의 작업을 할 때까지, 메인스레드가 추가작업을 할 수 있다.

try {
    String result = future.get(); // future의 결과가 발생하면 블록킹이 풀린다.
} catch (InterruptedException e) {
    throw new RuntimeException(e);
} catch (ExecutionException e) {
    throw new RuntimeException(e);
}

CompletableFuture

  • Future의 기능을 확장한 CompletableFuture는 Future를 대체한다.
  • Future는 하위 스레드의 동작 결과로서 한정되나 CompletableFuture의 경우 스레드와 관계 없이 그것의 결과와 행동을 조정할 수 있다.
  • CompletableFuture의 동작의 완료는 해당 객체의 complete() 메서드를 통해 결정되며, 아래와 같은 방식으로 코드가 작성된다.
CompletableFuture<String> future = new CompletableFuture<>();
assert !future.isDone();

future.complete("good");
assert future.isDone();
String result = future.join();
assert result.equals("good");
  • isDone은 스레드가 종료되었는지를 확인하는 메서드이다.
  • join(혹은 get)은 앞서의 Future#get과 동일한 역할을 수행한다. 다만, get은 InterruptException 등 checked exception을 명시적으로 처리해야 하지만 join은 그렇지 않다. CompletableFuture 객체에서 예외에 대한 처리 기능이 이미 내장되어 있으므로, 코드 작성에 간편한 join을 주로 사용한다.

  • 예외 상황이 발생할 경우 아래와 같이 completeExceptionally()를 사용하여 해당 예외 객체를 수집한다.
CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("예외 발생!"));
assert future.isCompletedExceptionally();

CompletableFuture와 스레드

  • CompletableFuture를 Thread 객체와 함께 사용할 때는 다음과 같이 코드를 작성한다.
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> future.complete(Thread.currentThread().getName())).start(); 
System.out.println(future.join()); // Thread-3
  • CompletableFuture를 사용하여 좀 더 확장성 있게 통제하는 코드는 아래와 같다.
void main() {
    CompletableFuture<Integer> future = getPriceAsync("apple");
    CompletableFuture<Integer> result = future
            .exceptionally(t -> -1) // 예외가 발생시 -1을 응답
            .completeOnTimeout(-2, 10, TimeUnit.MILLISECONDS); // 타임아웃이 발생시 -2를 응답

    Integer join = result.join();
    System.out.println(join);
}

public CompletableFuture<Integer> getPriceAsync(String product){
    CompletableFuture<Integer> future = new CompletableFuture<>();
    new Thread(() -> {
        try{
            future.complete(doSomeLongTakenJob()); // 성공에 대한 값을 삽입
        }catch (Exception e){
            future.completeExceptionally(e); // 예외에 대한 값을 삽입
        }
    }).start();
    return future;
}

private Integer doSomeLongTakenJob() {
    // throw new IllegalArgumentException("!"); // exceptionally에 따라 -1을 응답
    // delay(100); return 1; // completeOnTimeout에 따라 -2를 응답
    // return 1; // // 별다른 예외상황이 없으므로 정상적으로 1을 응답
}
  • CompletableFuture 덕분에 표준 인터페이스를 활용하여 스레드의 동작 완료와 예외를 쉽고 명시적으로 통제할 수 있게 되었다.
  • 메서드 체이닝을 사용하여 exceptionally와 completeOnTimeout 등 다양한 에러에 대응 가능하다.

  • 아래는 직접 Thread 객체를 사용하지 않고 팩터리 메서드로 간편하게 CompletableFuture를 생성하는 방법이다.
public CompletableFuture<Integer> getPriceAsync(String product){
    return CompletableFuture.supplyAsync(() -> doSomeLongTakenJob())
}
  • supplyAsync의 경우 값을 리턴하는 로직에 사용하여 Supplier를 인자로 한다. 리턴값이 필요 없다면 runAsync을 사용하여 Runnable을 인자로 삽입한다.
  • supplyAsync에는 두 번째 인자를 삽입할 수 있으며 스레드풀인 Executor을 그 타입으로 한다. 인자로 삽입하지 않을 경우 JVM에서 생성한 ForkJoinPool의 스레드를 사용한다.
  • 결과적으로 CompletableFuture란 표준 인터페이스를 활용하여 쉽고 단순하고 명확한 방식으로 병렬 프로그래밍을 할 수 있게 되었다.

Executor의 활용

  • CompletableFuture를 스레드와 사용할 때는, 쓰레드 객체를 직접 생성할 수도 있고, 앞서의 예제처럼 JVM이 자동으로 생성하는 ForkJoinPool을 사용할 수 있다. 사실 성능 및 사용성과 편의성 문제로 ForkJoinPool을 사용하는 것이 합리적인 선택이다.
  • 다만 ForkJoinPool의 스레드 갯수는 많지 않다. IO 대기가 자주 발생하는 상황에서 스레드의 갯수가 부족할 수 있고, 그 여파가 어플리케이션 전역으로 전파될 수도 있다. 왜냐하면 다른 로직 또한 같은 ForkJoinPool을 바라보고 있기 때문이다.
  • 그러므로 IO를 사용하거나 기타 이유로 별도의 스레드풀 객체인 Executor를 생성하여 사용할 수 있다.
  • 아래는 Executor의 스레드를 사용하였으며, 스트림을 사용하여 본격적으로 병렬처리를 수행하는 코드이다.
ExecutorService executor = Executors.newFixedThreadPool(10, r -> {
    Thread t = new Thread(r);
    t.setDaemon(true);
    return t;
});

List<CompletableFuture<String>> futures = Stream.of("kim", "lee", "choi")
        .map(n -> CompletableFuture.supplyAsync(() -> String.format("%s 학생의 점수는 %d 입니다.", n, evaluateGrade(n)), executor))
        .toList();

List<String> result = futures.stream()
        .map(CompletableFuture::join)
        .toList();

System.out.println(result);
private Integer evaluateGrade(String name) {
    delay(100); // 100 ms 만큼의 시간이 걸리는 작업이다.
    return ThreadLocalRandom.current().nextInt(name.length(), 100);
}
  • 위의 코드는 학생 세 명의 성적을 병렬처리로 판정(evaluateGrade)하고 그 결과를 프린트하는 로직이다.
  • supplyAsync의 두 번째 인자로 ExecutorService 객체를 삽입한다. 이 경우 ForkJoinPool이 아닌 해당 스레드 풀을 사용하게 된다.
  • ExecutorService를 초기화 할 때, 데몬 스레드를 설정하는 setDaemon을 true로 설정하였다. JVM은 종료하기 전 모든 스레드의 종료를 기다리지만 데몬의 경우 기다리지 않는다.

CompletableFuture의 값의 조작과 합성

  • stream api가 map과 filter를 사용하여 Loop Fusion을 하는 것과 같이 CompletableFuture 역시 CompletableFuture를 합성할 수 있다. 각 퓨처의 결과를 다른 퓨처가 인수 받아 그 값을 조작하거나 추가적인 비동기 작업을 수행할 수 있다. 이때 사용되는 체이닝 메서드의 이름은 thenSomething의 형태이다. 이번에 다룰 메서드는 thenCompose와 thenApply, thenCombine이다.
List<CompletableFuture<Void>> futures = Stream.of("kim", "lee", "choi")
    .map(n -> CompletableFuture.supplyAsync(() -> String.format("%s 학생의 점수는 %d 입니다.", n, evaluateGrade(n))))
    .map(f -> f.thenApply(s -> String.format("<span class='grade'>%s</span>", s)))
    .map(f -> f.thenCompose(s -> CompletableFuture.runAsync(() -> advertiseGrades(s))))
    .toList();
private void advertiseGrades(String s) {
    delay(100);
    System.out.println("학교 모든 게시판에 다음의 게시글이 업로드 되었습니다 : " + s);
}
  • 첫 번째 map에서 supplyAsync를 사용해 evaluateGrade로 성적 계산을 병렬처리하고 그것의 결과를 문자열로 변환한다.
  • 두 번재 map에서 thenApply를 사용해 퓨처의 결과값을 html 형태에 맞춰 수정한다. 그리고 다시 CompletableFuture로 리턴한다.
  • 세 번째 map에서 thenCompose를 통해 추가적인 비동기 업무를 수행한다. 앞서의 CompletableFuture가 처리한 결과값을 advertiseGrades 메서드의 인자로 하여 처리한다.
  • thenApply와 thenCompose는 thenApplyAsync와 thenComposeAsync로 대체할 수 있다. 전자의 경우 앞의 퓨처가 사용한 스레드를 사용하고 후자는 별도의 스레드를 사용한다. 위의 로직은 오버헤드를 지불하면서까지 별도의 스레드를 사용할 이유가 없으므로 전자의 방식을 택했다.

  • 앞서의 코드는 인자의 결과가 순차적인 로직에 따라 전달 및 처리되었다.
  • 어떤 경우는 서로의 결과에 의존하지 않아 각 각의 로직을 병렬로 처리한 후, 그 결과물을 합성하는 것이 더 효과적일 수 있다. 이런 경우 thenCombine을 사용한다.
  • 아래 예제는 thenCombine을 사용하여 두 개의 퓨처가 1) 학생의 신상정보(findPersonalInformation)와 2) 학생의 성적(evaluateGrade)을 병렬적으로 처리한 후, BiFunction을 사용하여 두 퓨처의 결과값을 합성하는 로직이다.
List<CompletableFuture<Void>> futures = Stream.of("kim", "lee", "choi")
    .map(n -> CompletableFuture.supplyAsync(() -> findPersonalInformation(n)) // 첫 번째 퓨처
                .thenCombine(CompletableFuture.supplyAsync(() -> evaluateGrade(n)) // 두 번째 퓨처
                    , (info, grade) -> String.format("%s의 점수는 %d입니다.", info, grade) // 첫 번째와 두 번째의 결과를 합성
                )
    )
    .map(f -> f.thenApply(s -> String.format("<span class='grade'>%s</span>", s)))
    .map(f -> f.thenCompose(s -> CompletableFuture.runAsync(() -> advertiseGrades(s))))
    .toList();
private String findPersonalInformation(String name) {
    delay(100);
    String region = List.of("서울", "대전", "부산", "광주", "제주", "강원").get(ThreadLocalRandom.current().nextInt(5));
    return String.format("%s 출신의 학생 %s", region, name);
}

allOf, anyOf

  • 앞서의 예제에서는 CompletableFuture#join을 사용하여 병렬처리를 블로킹하고 결과값을 응답 받았다.
  • CompletableFuture#allOf를 사용할 경우 명시적으로 모든 퓨처를 실행하고 결과값이 생성될 때까지 블록킹한다.
List<CompletableFuture<String>> futures = Stream.of("kim", "lee", "choi")
        .map(n -> CompletableFuture.supplyAsync(() -> String.format("%s 학생의 점수는 %d 입니다.", n, evaluateGrade(n)), executor))
        .toList();

// List<String> result = futures.stream().map(CompletableFuture::join).toList();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));

futures.stream().map(CompletableFuture::join).forEach(System.out::println);
  • anyOf는 가장 빨리 끝나는 퓨처 하나를 리턴한다.
CompletableFuture<Object> result = CompletableFuture.anyOf(futures.toArray(CompletableFuture[]::new));
System.out.println(result.join()); // kim 학생의 점수는 83 입니다.