CompletableFutureObserver.java 1.5 KB
Newer Older
xieshaojun's avatar
xieshaojun committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
package vc.thinker.utils;

import com.google.common.collect.Maps;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * 合并未来的观察者
 * 用于维护一批需要合并同步结果的线程关系
 *
 * @author HeTongHao
 * @since 2022/8/10 16:43
 */
public class CompletableFutureObserver<T> {
    private final Map<String, CompletableFuture<T>> completableFutureMap = Maps.newConcurrentMap();

    public CompletableFuture<T> subscribe(String key) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completableFutureMap.put(key, completableFuture);
        return completableFuture;
    }

    public void onMessage(String key, T msg) {
        CompletableFuture<T> completableFuture = completableFutureMap.get(key);
        if (completableFuture != null) {
            completableFuture.complete(msg);
            completableFutureMap.remove(key);
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            completableFuture.complete("666");
        }).start();
        String o = completableFuture.get();
        System.out.println(o);
    }
}