首页

通过CompletableFuture将一个任务分解多线程异步并发分解多任务处理并合并汇总最终任务结果代码示例

标签:CompletableFuture,多线程     发布时间:2022-12-06   

一、demo说明

基于java.util.concurrent.CompletableFuture实现单一任务分解多任务(基于线程池,异步并发处理),最终合并CompletableFuture.allOf(cfs).join();等待所有子线程任务处理完成后(参考更多子任务同步处理代码示例),打印最终结果“完成!result=”代码示例。,点击更多多线程代码示例&教程下载(《亿级流量Jαva高并发与网络编程实战》-源码)。

二、代码示例

package com.xwood.demo.thread;@b@@b@import java.util.ArrayList;@b@import java.util.List;@b@import java.util.concurrent.CompletableFuture;@b@import java.util.concurrent.CopyOnWriteArrayList;@b@import java.util.concurrent.ExecutorService;@b@import java.util.concurrent.Executors;@b@@b@public class CompletableFutureDemo {@b@    public static void main(String[] args) {@b@        //原始数据集@b@        CopyOnWriteArrayList<Integer> taskList = new CopyOnWriteArrayList();@b@        taskList.add(1);@b@        taskList.add(2);@b@        taskList.add(3);@b@        taskList.add(4);@b@@b@        // 结果集@b@        List<Character> resultList = new ArrayList<>();@b@        //线程池,可容纳四个线程@b@        ExecutorService executorService = Executors.newFixedThreadPool(4);@b@@b@        CompletableFuture[] cfs = taskList.stream()@b@                //第一阶段@b@                .map(integer -> CompletableFuture.supplyAsync(@b@                        () -> calcASCII(integer), executorService)@b@                        //第二阶段@b@                        .thenApply(i -> {@b@                            char c = (char) (i.intValue());@b@                            System.out.println("【阶段2】线程"@b@                                    + Thread.currentThread().getName() + "执行完毕,"@b@                                    + "已将int"@b@                                    + i + "转为了字符" + c);@b@                            return c;@b@                        })@b@                        //第三阶段@b@                        .whenComplete((ch, e) -> {@b@                            resultList.add(ch);@b@                            System.out.println("【阶段3】线程" +@b@                                    Thread.currentThread().getName() + "执行完毕," + "已将"@b@                                    + ch + "增加到了结果集" + resultList + "中");@b@                            executorService.shutdown();@b@                        })@b@                ).toArray(CompletableFuture[]::new);@b@@b@        // 封装后无返回值,必须自己whenComplete()获取@b@        CompletableFuture.allOf(cfs).join();//future.get()@b@@b@@b@        System.out.println("完成!result=" + resultList);@b@    }@b@@b@    //计算i的ASCII值@b@    public static Integer calcASCII(Integer i) {@b@        try {@b@            if (i == 1) {@b@                Thread.sleep(5000);@b@            } else {@b@                Thread.sleep(1000);@b@            }@b@            //数字 -> A-D对应的ascii@b@            i = i + 64;@b@            System.out.println("【阶段1】线程" + Thread.currentThread().getName()@b@                    + "执行完毕," + "已将" + i@b@                    + "转为了A(或B或C或D)对应的ASCII" + i);@b@        } catch (InterruptedException e) {@b@            e.printStackTrace();@b@        }@b@        return i;@b@    }@b@}

控制台打印结果

【阶段1】线程pool-1-thread-2执行完毕,已将66转为了A(或B或C或D)对应的ASCII66@b@【阶段1】线程pool-1-thread-3执行完毕,已将67转为了A(或B或C或D)对应的ASCII67@b@【阶段2】线程pool-1-thread-2执行完毕,已将int66转为了字符B@b@【阶段3】线程pool-1-thread-2执行完毕,已将B增加到了结果集[B]中@b@【阶段1】线程pool-1-thread-4执行完毕,已将68转为了A(或B或C或D)对应的ASCII68@b@【阶段2】线程pool-1-thread-4执行完毕,已将int68转为了字符D@b@【阶段3】线程pool-1-thread-4执行完毕,已将D增加到了结果集[B, D]中@b@【阶段2】线程pool-1-thread-3执行完毕,已将int67转为了字符C@b@【阶段3】线程pool-1-thread-3执行完毕,已将C增加到了结果集[B, D, C]中@b@【阶段1】线程pool-1-thread-1执行完毕,已将65转为了A(或B或C或D)对应的ASCII65@b@【阶段2】线程pool-1-thread-1执行完毕,已将int65转为了字符A@b@【阶段3】线程pool-1-thread-1执行完毕,已将A增加到了结果集[B, D, C, A]中@b@完成!result=[B, D, C, A]@b@@b@Process finished with exit code 0