Java流parallelStream:如何将多个函数映射到流图

问题描述 投票:0回答:2

我有这段适用于 Arraylist 的代码。数组列表中的每个元素都需要由 ArithmeticBBB 和 ArithmeticCCC 处理,并创建 2 个元素返回到流中。

我在循环中使用

stream = stream.map(a::work)
,或使用 flatMap。

class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
        @Override
        public String arithmetic(String n) {
            String ss  = n+" 2" + " ,Thread ID:" +Thread.currentThread().getId();
            return ss;
        }
    }
    class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
        @Override
        public String arithmetic(String n) {
            String ss  = n+" 3" + " ,Thread ID:" +Thread.currentThread().getId();
            return ss;
        }
    }
    
    public class ArithmeticManagerStream {
        private List<String> integerList = null;
        private List<String> resultArray = null;
    
    
        public ArithmeticManagerStream(List<String> dataFromUser) {
            this.integerList =  dataFromUser;
        }
    
        public List<String> invokerActions(List<ArithmeticAction> actions) throws
                InterruptedException {
    
            Stream<String> stream = integerList.parallelStream();
            for (final ArithmeticManagerStream.ArithmeticAction a : actions) {
                stream = stream.flatMap(str -> actions.stream().map(b -> b.arithmetic(str)));
             }
            return resultArray = stream.collect(Collectors.toList());
        }
        public interface ArithmeticAction {
            String arithmetic(String n);
        }
    
        public static void main(String[] args) {
            List<ArithmeticAction> actions = new ArrayList();
            actions.add(new ArithmeticBBB());
            actions.add(new ArithmeticCCC());
            List<String> intData = new ArrayList<>();
            intData.add("1");
            intData.add("2");
            intData.add("3");
    
            ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
    
            try {
                List<String> result = arithmeticManagerStream.invokerActions(actions);
                System.out.println("***********************************************");
                for(String i : result) {
                    System.out.println(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

目前,如果您运行此程序,您将看到结果已连接到字符串。

1 2 ,Thread ID:12 2 ,Thread ID:12
1 2 ,Thread ID:12 3 ,Thread ID:12
1 3 ,Thread ID:12 2 ,Thread ID:12
1 3 ,Thread ID:12 3 ,Thread ID:12
2 2 ,Thread ID:1 2 ,Thread ID:1
2 2 ,Thread ID:1 3 ,Thread ID:1
2 3 ,Thread ID:1 2 ,Thread ID:1
2 3 ,Thread ID:1 3 ,Thread ID:1
3 2 ,Thread ID:13 2 ,Thread ID:13
3 2 ,Thread ID:13 3 ,Thread ID:13
3 3 ,Thread ID:13 2 ,Thread ID:13
3 3 ,Thread ID:13 3 ,Thread ID:13

我需要每个元素都在自己的元素中,例如:

1 2 ,Thread ID:12 
1 2 ,Thread ID:12  
1 3 ,Thread ID:12  
1 3 ,Thread ID:12 
2 2 ,Thread ID:1  
2 2 ,Thread ID:1 
2 3 ,Thread ID:1 
2 3 ,Thread ID:1 
3 2 ,Thread ID:13 
3 2 ,Thread ID:13  
3 3 ,Thread ID:13  
3 3 ,Thread ID:13 
java multithreading parallel-processing java-stream
2个回答
1
投票

显然,由于拼写错误和其他问题的数量,所发布问题中的代码尚未从实际运行的代码中剥离。所以不可能说出真正的问题是什么。

但是,以下代码确实会运行:

public class ParallelStreams {

    public static void main(String[] args) {

        List<String> strList = List.of("a1", "a2", "a3", "a4", "a5");
        List<IWork> w = List.of(new Work1(), new Work2());

        Stream<String> stream = strList.parallelStream();
        for (final IWork a : w) {
            stream = stream.map(a::work);
        }
        List<String> finalArray = stream.collect(Collectors.toList());

        for (String s : finalArray) {
            System.out.println(s);
        }
    }
}


interface IWork {
    String work(String s);
}

class Work1 implements IWork {
    @Override
    public String work(String s) {
        return s + " * " + Thread.currentThread().getId();
    }
}

class Work2 implements IWork {
    @Override
    public String work(String s) {
        return s + " ! " + Thread.currentThread().getId();
    }
}

并产生以下输出:

a1 * 13 ! 13
a2 * 13 ! 13
a3 * 1 ! 1
a4 * 1 ! 1
a5 * 1 ! 1

为了更清楚地了解哪个 IWork 实现正在运行,我添加了“*”或“!”来代替原始实现中的空格。


0
投票

首先,感谢您将其更改为可运行的代码,以便我们可以看到发生了什么。

你让它变得有点难以理解,因为数据只是个位数,而你的操作也只是个位数,线程 ID 也可以是个位数。所以我将数据更改为“A”、“B”和“C”以使其更容易。

for
循环中,您正在循环
actions
,但实际上并不将
action
用于循环中的任何内容。因此,它本质上只是一个循环,它会根据您的操作运行多次,但每次迭代都没有不同。由于您将循环变量称为
a
,因此使捕获变得更加困难。尝试将其命名为更有意义的名称。

真正令人困惑的是,您正在循环

actions
,然后在循环内流式传输操作。所以这就是所应用的操作数量的平方。我不敢相信这就是您正在寻找的东西,但您似乎并不对输出的这方面感到不安,只是它针对每个输入进行了合并。

此外,还不清楚您对线程本身有多关心。输出中有它们,但为什么呢?为什么要使用

ParallelStream()

通过将 Stream 视为变量并逐步移动它,也使事情变得更加复杂。在我发布的第一个答案中,我发现将它们组合在一起会导致整个流通过单个线程运行,因此我没有在该答案中这样做,因为我不确定线程对您来说有多重要(并且他们不应该是)。

最后,如果您转储

for
循环,然后将其全部合并到一个 Stream 语句中,它就会完美地工作:

class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
    @Override
    public String arithmetic(String n) {
        String ss = n + " 2" + " ,Thread ID:" + Thread.currentThread().getId();
        return ss;
    }
}

class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
    @Override
    public String arithmetic(String n) {
        String ss = n + " 3" + " ,Thread ID:" + Thread.currentThread().getId();
        return ss;
    }
}

public class ArithmeticManagerStream {
    private List<String> integerList = null;
    private List<String> resultArray = null;


    public ArithmeticManagerStream(List<String> dataFromUser) {
        this.integerList = dataFromUser;
    }

    public List<String> invokerActions(List<ArithmeticAction> actions) throws InterruptedException {
        return integerList.parallelStream()
                          .flatMap(str -> actions.stream().map(action -> action.arithmetic(str)))
                          .collect(Collectors.toList());
    }

    public interface ArithmeticAction {
        String arithmetic(String n);
    }

    public static void main(String[] args) {
        List<ArithmeticAction> actions = new ArrayList();
        actions.add(new ArithmeticBBB());
        actions.add(new ArithmeticCCC());
        List<String> intData = new ArrayList<>();
        intData.add("A");
        intData.add("B");
        intData.add("C");

        ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);

        try {
            List<String> result = arithmeticManagerStream.invokerActions(actions);
            System.out.println("***********************************************");
            for (String i : result) {
                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这会导致:

***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
C 2 ,Thread ID:1
C 3 ,Thread ID:1

这对我来说似乎是正确的。每个源项目都会经过每个操作,生成它自己的输出,该输出被平面映射回原始流。

最后一点。我一直在寻求简化代码,并注意到您的界面是一个

Functional
构造。因此,您实际上可以放弃实现类,并在加载
actions
数组时使用 lambda 声明它们。然后,当然,很明显你的界面基本上是一个
Function<String, String>
。所以我完全放弃了界面,然后清理了代码,使用 Stream 作为最终输出。

结果本质上是微不足道的,我添加了更多动作只是为了表明一点:

public class ArithmeticManagerStream {
    private List<String> integerList = null;

    public ArithmeticManagerStream(List<String> dataFromUser) {
        this.integerList = dataFromUser;
    }

    public List<String> invokerActions(List<Function<String, String>> actions) {
        return integerList.parallelStream().flatMap(str -> actions.stream().map(action -> action.apply(str))).collect(Collectors.toList());
    }

    public static void main(String[] args) {
        List<Function<String, String>> actions = new ArrayList();
        actions.add(n -> n + " 2" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 3" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 4" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 5" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 6" + " ,Thread ID:" + Thread.currentThread().getId());
        List<String> intData = new ArrayList<>();
        intData.add("A");
        intData.add("B");
        intData.add("C");

        ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
        List<String> result = arithmeticManagerStream.invokerActions(actions);
        System.out.println("***********************************************");
        result.forEach(System.out::println);
    }
}

输出为:

***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
A 4 ,Thread ID:13
A 5 ,Thread ID:13
A 6 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
B 4 ,Thread ID:1
B 5 ,Thread ID:1
B 6 ,Thread ID:1
C 2 ,Thread ID:14
C 3 ,Thread ID:14
C 4 ,Thread ID:14
C 5 ,Thread ID:14
C 6 ,Thread ID:14
© www.soinside.com 2019 - 2024. All rights reserved.