miller
发布于

CompletableFuture 并发获取 方法结果

package thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.collect.Lists;

/**
 *
 * @ClassName:CompletableFutureDemo
 * @Description:多线程并发任务,取结果归集
 * @author diandian.zhang
 * @date 2017年6月14日下午12:44:01
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //结果集
        List<String> list = new ArrayList<String>();
        List<String> list2 = new ArrayList<String>(); // todo 并发问题
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        final List<Integer> taskList = Lists.newArrayList(2,1,3,4,5,6,7,8,9,10);
        try {
            ////方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取
            //for(int i=0;i<taskList.size();i++){
            //    final int j=i;
            //    //异步执行
            //    CompletableFuture<String> future = CompletableFuture.supplyAsync(()->calc(taskList.get(j)), exs)
            //        //Integer转换字符串    thenAccept只接受不返回不影响结果
            //        .thenApply(e->Integer.toString(e))
            //        //如需获取任务完成先后顺序,此处代码即可
            //        .whenComplete((v, e) -> {
            //            System.out.println("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date());
            //            list2.add(v);
            //        })
            //        ;
            //    futureList.add(future);
            //}
            ////流式获取结果:此处是根据任务添加顺序获取的结果
            //list = sequence(futureList).get();

            //方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
            CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs)
                    .thenApply(h->Integer.toString(h))
                    //如需获取任务完成先后顺序,此处代码即可
                    .whenComplete((v, e) -> {
                        System.out.println("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date());
                        list2.add(v);
                    })).toArray(CompletableFuture[]::new);
            //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
            CompletableFuture.allOf(cfs).join();
            System.out.println("任务完成先后顺序,结果list2="+list2+";任务提交顺序,结果list="+list+",耗时="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            exs.shutdown();
        }
    }

    public static Integer calc(Integer i){
        try {
            if(i==1){
                //任务1耗时3秒
                Thread.sleep(3000);
            }else if(i==5){
                //任务5耗时5秒
                Thread.sleep(5000);
            }else{
                //其它任务耗时1秒
                Thread.sleep(1000);
            }
            System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!+"+new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }

    /**
     *
     * @Description 组合多个CompletableFuture为一个CompletableFuture,所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
     * @param futures List
     * @return
     * @author diandian.zhang
     * @date 2017年6月19日下午3:01:09
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
        //1.构造一个空CompletableFuture,子任务数为入参任务list size
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        //2.流式(总任务完成后,每个子任务join取结果,后转换为list)
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

    /**
     *
     * @Description Stream流式类型futures转换成一个CompletableFuture,所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
     * @param futures Stream
     * @return
     * @author diandian.zhang
     * @date 2017年6月19日下午6:23:40
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
        List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
        return sequence(futureList);
    }
}

上边list.add 在内部可能有并发concurrent 问题, 借助stream 直接返回,最后collect(Collectors.toList())

    public static CompletableFuture<String> printStr(String str, ExecutorService executorService) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("str:{}", str);
            return str;
        }, executorService);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<String> list = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        List<CompletableFuture<String>> futureList = list.stream()
                .map(str -> printStr(str, executorService)).collect(Collectors.toList());
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
        CompletableFuture<List<String>> resultFuture = allFuture.thenApply(v -> futureList.stream().map(future -> future.join()).collect(Collectors.toList()));
        log.info("result:{}", resultFuture.get());
        executorService.shutdown();
    }

allOf:等待所有任务完成,anyOf:只要有一个任务完成。

golang版本 https://www.maocaoying.com/topic/933

浏览 (1063)
点赞
收藏
评论