Link Search Menu Expand Document

섹션 6. CompletableFuture

자바 Concurrent 프로그래밍 소개

Concurrent 소프트웨어

  • 동시에 여러 작업을 할 수 있는 소프트웨어
    • 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있다.
    • 녹화를 하면서 인텔리J로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있다.

자바에서 지원하는 컨커런트 프로그래밍

  • 멀티프로세싱 (ProcessBuilder)
  • 멀티쓰레드

자바 멀티쓰레드 프로그래밍

  • Thread / Runnable

자바 멀티쓰레드 프로그래밍 예제

Thread 상속

public class Practice1 {
    public static void main(String[] args) {
        HelloThread helloThread = new HelloThread();
        helloThread.start();
        System.out.println("hello : " + Thread.currentThread().getName());
    }

    static class HelloThread extends Thread {
        @Override
        public void run() {
            System.out.println("world : " + Thread.currentThread().getName());
        }
    }
}
/*
world : Thread-0
hello : main
-------or-------
hello : main
world : Thread-0
*/

Runnable 구현

public class Practice1 {
    public static void main(String[] args) { 
		Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Thread: " + Thread.currentThread().getName());
            }
        });
        System.out.println("hello : " + Thread.currentThread().getName());
    }
}
/*
hello : main
Thread: Thread-0
-------or-------
Thread: Thread-0
hello : main
*/

Lambda Expression

public class Practice1 {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            System.out.println("world : " + Thread.currentThread().getName());
        });
        thread.start();
        System.out.println("hello : " + Thread.currentThread().getName());
    }
}
/*
world : Thread-0
hello : main
-------or-------
hello : main
world : Thread-0
*/

쓰레드 주요 기능

  • 현재 쓰레드 멈춰두기 (sleep): 다른 쓰레드가 처리할 수 있도록 기회를 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 걸릴 수 있겠죠.)
public class Practice1 {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread : " + Thread.currentThread().getName());
        });
        thread.start();
        System.out.println("hello : " + Thread.currentThread().getName());
    }
}
/*
hello : main
Thread : Thread-0
*/
  • 다른 쓰레드 깨우기 (interupt): 다른 쓰레드를 깨워서 interruptedExeption을 발생 시킨다. 그 에러가 발생했을 때 할 일은 코딩하기 나름. 종료 시킬 수도 있고 계속 하던 일 할 수도 있고.
public class Practice1 {
    public static void main(String[] args) throws InterruptedException { // Thread.sleep()
        Thread thread = new Thread(() -> {
            while(true) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                System.out.println("exit!");
                return;
            }
            System.out.println("Thread : " + Thread.currentThread().getName());
            }
        });
        thread.start();

        System.out.println("hello : " + Thread.currentThread().getName());
        Thread.sleep(3000L);
        thread.interrupt();
    }
}
/*
hello : main
Thread : Thread-0
Thread : Thread-0
Thread : Thread-0
exit!
*/
  • 다른 쓰레드 기다리기 (join): 다른 쓰레드가 끝날 때까지 기다린다.
public class Practice1 {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                throw new IllegalStateException();
            }
            System.out.println("Thread : " + Thread.currentThread().getName());
        });
        thread.start();

        System.out.println("hello : " + Thread.currentThread().getName());
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(thread + "is finished");
    }
}
/*
hello : main
---after 3000ms---
Thread : Thread-0
Thread[Thread-0,5,]is finished
*/

개발자가 이렇게 일일이 쓰레드를 제어하는 것은 굉장히 비효율적이다.

Executors

고수준 (High-Level) Concurrency 프로그래밍

  • 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리.
  • 그런 기능을 Executors에게 위임.

Executors가 하는 일

  • 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어 관리한다.
  • 쓰레드 관리: 쓰레드 생명 주기를 관리한다.
  • 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.

주요 인터페이스

  • Executor: execute(Runnable) -> 실제로 사용할 일은 없다.

  • ExecutorService: Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.

  • ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.

ExecutorService로 작업 실행하기

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Practice2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
                System.out.println("Thread " + Thread.currentThread().getName());
        });
    }
}
/*
Thread pool-1-thread-1
---Program Continues---
*/

ExecutorService로 멈추기

ExecutorService는 작업을 실행한 뒤 새로운 작업이 들어올 때까지 대기하므로 프로세스를 명시적으로 shutdown 해야 한다.

		executorService.shutdown();  // 처리중인 작업 기다렸다가 종료 (Graceful Shutdown)
		executorService.shutdownNow(); // 당장 종료

ExecutorService로 멀티쓰레드 프로그래밍

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Practice2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("MELT"));
        executorService.submit(getRunnable("The"));
        executorService.submit(getRunnable("Java"));
        executorService.submit(getRunnable("Thread"));

        executorService.shutdown();
    }

    private static Runnable getRunnable(String message) {
        return () -> System.out.println(message + Thread.currentThread().getName());
    }
}
/*
Hellopool-1-thread-1
MELTpool-1-thread-2
Thepool-1-thread-1
Javapool-1-thread-1
Threadpool-1-thread-1
*/

ExecutorService 동작 원리

02 ExecutorService

  • Thread가 모두 사용 중일 경우 Task는 Blocking Queue에서 대기

  • Thread는 생성 비용이 많이 드는 자원이므로 Thread를 한정된 수만큼 생성하고 Thread Pool로 관리하는 것이 효율적

ScheduledExecutorService

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Practice2 {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
//        executorService.schedule(getRunnable("Hello"), 3, TimeUnit.SECONDS);
        executorService.scheduleAtFixedRate(getRunnable("Hello"), 1, 2, TimeUnit.SECONDS);
    }

    private static Runnable getRunnable(String message) {
        return () -> System.out.println(message + Thread.currentThread().getName());
    }
}
/*
sec 1: Hellopool-1-thread-1
sec 3: Hellopool-1-thread-1
sec 5: Hellopool-1-thread-1
... repeat every 2 secs ...
*/

Fork/Join 프레임워크

  • ExecutorService의 구현체로 손쉽게 멀티 프로세서를 활용할 수 있게끔 도와줌

Callable과 Future

Callable

Runnable과 유사하지만 작업의 결과를 받을 수 있다.

Future

비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

  • get(): 결과를 가져온다.
    • Blocking Call이므로 결과를 가져올 때까지 대기한다.
    • 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.
import java.util.concurrent.*;

public class Practice3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "hello";
        };

        Future<String> submit = executorService.submit(hello);
        System.out.println("Start!");

        submit.get(); // Blocking Call

        System.out.println("End!");
        executorService.shutdown();
    }
}
/*
Start!
---after 2 secs---
End!
*/
  • isDone(): 작업 상태를 확인한다.
    • 완료했으면 true 아니면 false를 반환한다.
  • cancel(): 작업을 취소한다.
    • 취소했으면 true 못 했으면 false를 반환한다.
    • parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행 중인 작업이 끝날 때까지 기다린다.
import java.util.concurrent.*;

public class Practice3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "hello";
        };

        Future<String> helloFuture = executorService.submit(hello);
        System.out.println(helloFuture.isDone());
        System.out.println("Started!");

        helloFuture.cancel(false);

        System.out.println(helloFuture.isDone());
        System.out.println("End!");
        executorService.shutdown();
    }
}
/*
false
Started!
true -> cancel()로 인하여 작업이 취소되었을 뿐, get()으로 값을 가져올 수는 없다.
End!
*/
  • invokeAll(): 여러 작업을 동시에 실행한다.
    • 동시에 실행한 작업 중에 제일 오래 걸리는 작업만큼 시간이 걸린다.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Practice3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "hello";
        };
        Callable<String> java = () -> {
            Thread.sleep(4000L);
            return "java";
        };
        Callable<String> MELT = () -> {
            Thread.sleep(1000L);
            return "MELT";
        };
        
        List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, MELT));
        for (Future<String> f : futures) {
            System.out.println(f.get());
        }
        
        executorService.shutdown();
    }
}
/*
---after 4 sec-
hello
java
MELT
*/
  • invokeAny(): 여러 작업 중에 하나라도 먼저 응답이 오면 끝낸다.
    • 동시에 실행한 작업 중 제일 짧게 걸리는 작업만큼 시간이 걸린다.
    • Blocking Call이다.
import java.util.Arrays;
import java.util.concurrent.*;

public class Practice3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "hello";
        };
        Callable<String> java = () -> {
            Thread.sleep(4000L);
            return "java";
        };
        Callable<String> MELT = () -> {
            Thread.sleep(1000L);
            return "MELT";
        };

        String s = executorService.invokeAny(Arrays.asList(hello, java, MELT));
        System.out.println(s);

        executorService.shutdown();
    }
}
/*
---after 1 sec---
MELT
*/

CompletableFuture

CompletableFuture: 자바에서 비동기(Asynchronous) 프로그래밍을 가능케 하는 인터페이스

  • Future를 사용해도 어느 정도 가능하지만 하기 힘든 일들이 많았다.
    • Future를 외부에서 완료시킬 수 없다. 취소하거나 get()에 타임아웃을 설정할 수는 있다.
    • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    • 여러 Future를 조합할 수 없다. 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
    • 예외 처리용 API를 제공하지 않는다.

CompletableFuture

  • implements Future
  • implements CompletionStage: 외부에서 완료시킬 수 있다.

비동기로 작업 실행하기

  • 리턴값이 없는 경우: runAsync()
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Hello + " + Thread.currentThread().getName());
});
future.get();
/*
Hello ForkJoinPool.commonPool-worker-3
*/
  • 리턴값이 있는 경우: supplyAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
});
System.out.println(future.get());
/*
Hello ForkJoinPool.commonPool-worker-3
Hello
*/
  • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool())
    • runAsync()의 2번째 인자로 ExecutorService를 제공하면 된다.
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
}, executorService).thenAccept((s) -> {
    System.out.println(Thread.currentThread().getName());
});
future.get();
/*
Hello pool-1-thread-1
pool-1-thread-1
*/

콜백 제공하기

  • thenApply(Function): 리턴값을 받아서 다른 값으로 바꾸는 콜백
  • thenAccept(Consumer): 리턴값을 리턴이 없는 또 다른 작업을 처리하는 콜백
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello + " + Thread.currentThread().getName());
return "Hello";
}).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
System.out.println(future.get());
/*
Hello ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
HELLO
*/
  • thenRun(Runnable): 리턴값도 받지 않고 다른 작업을 처리하는 콜백
  • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
}).thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName());
}, executorService);
future.get();
/*
Hello ForkJoinPool.commonPool-worker-3
pool-1-thread-1
*/

조합하기

  • thenCompose(): 두 작업이 서로 이어서 실행하도록 조합
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Practice5 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
        CompletableFuture<String> future = hello.thenCompose(Practice5::getWorld);
        
        System.out.println(future.get());
    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}
/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-3
Hello World
*/
  • thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료했을 때 콜백 실행
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Practice5 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
        System.out.println(future.get());
    }
}
  • allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
        CompletableFuture<Void> future = CompletableFuture.allOf(hello, world)
                .thenAccept(System.out::println);

        System.out.println(future.get());
/*
World ForkJoinPool.commonPool-worker-5
Hello ForkJoinPool.commonPool-worker-3
null
null
*/

모든 작업의 결과값의 타입이 동일하리라는 보장도, 모든 작업이 에러 없이 실행된다는 보장도 없음. 따라서 모두 null을 반환함.

List<CompletableFuture> futures = Arrays.asList(hello, world);
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);

CompletableFuture<List<Object>> results = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));

results.get().forEach(System.out::println);
/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello
World
*/

반환값을 참조하기 위해서는 위와 같이 구현한다.

  • anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
future.get();
/* 
Hello
---or---
World
*/

예외처리

  • exeptionally(Function)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;


public class Practice5 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean throwError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalArgumentException();
            }
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).exceptionally(ex -> {
            System.out.println(ex);
            return "Error!";
        });

        System.out.println(hello.get());
    }
}
/*
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
Error!
*/
  • handle(BiFunction): 예외가 발생하지 않은 경우와 발생한 경우를 모두 허용
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;


public class Practice5 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean throwError = false;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalArgumentException();
            }
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println(ex);
                return "Error!";
            }
            return result;
        });

        System.out.println(hello.get());
    }
}
/*
1. When throwError == true
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
Error!
2. When throwError == false
Hello ForkJoinPool.commonPool-worker-3
Hello
*/