我有这段适用于 Arraylist 的代码。数组列表中的每个元素都需要由 ArithmeticBBB 和 ArithmeticCCC 处理,并创建 2 个元素返回到流中。
我在循环中使用
stream = stream.map(a::work)
,或使用 flatMap。
class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n+" 2" + " ,Thread ID:" +Thread.currentThread().getId();
return ss;
}
}
class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n+" 3" + " ,Thread ID:" +Thread.currentThread().getId();
return ss;
}
}
public class ArithmeticManagerStream {
private List<String> integerList = null;
private List<String> resultArray = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<ArithmeticAction> actions) throws
InterruptedException {
Stream<String> stream = integerList.parallelStream();
for (final ArithmeticManagerStream.ArithmeticAction a : actions) {
stream = stream.flatMap(str -> actions.stream().map(b -> b.arithmetic(str)));
}
return resultArray = stream.collect(Collectors.toList());
}
public interface ArithmeticAction {
String arithmetic(String n);
}
public static void main(String[] args) {
List<ArithmeticAction> actions = new ArrayList();
actions.add(new ArithmeticBBB());
actions.add(new ArithmeticCCC());
List<String> intData = new ArrayList<>();
intData.add("1");
intData.add("2");
intData.add("3");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
try {
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
for(String i : result) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
目前,如果您运行此程序,您将看到结果已连接到字符串。
1 2 ,Thread ID:12 2 ,Thread ID:12
1 2 ,Thread ID:12 3 ,Thread ID:12
1 3 ,Thread ID:12 2 ,Thread ID:12
1 3 ,Thread ID:12 3 ,Thread ID:12
2 2 ,Thread ID:1 2 ,Thread ID:1
2 2 ,Thread ID:1 3 ,Thread ID:1
2 3 ,Thread ID:1 2 ,Thread ID:1
2 3 ,Thread ID:1 3 ,Thread ID:1
3 2 ,Thread ID:13 2 ,Thread ID:13
3 2 ,Thread ID:13 3 ,Thread ID:13
3 3 ,Thread ID:13 2 ,Thread ID:13
3 3 ,Thread ID:13 3 ,Thread ID:13
我需要每个元素都在自己的元素中,例如:
1 2 ,Thread ID:12
1 2 ,Thread ID:12
1 3 ,Thread ID:12
1 3 ,Thread ID:12
2 2 ,Thread ID:1
2 2 ,Thread ID:1
2 3 ,Thread ID:1
2 3 ,Thread ID:1
3 2 ,Thread ID:13
3 2 ,Thread ID:13
3 3 ,Thread ID:13
3 3 ,Thread ID:13
显然,由于拼写错误和其他问题的数量,所发布问题中的代码尚未从实际运行的代码中剥离。所以不可能说出真正的问题是什么。
但是,以下代码确实会运行:
public class ParallelStreams {
public static void main(String[] args) {
List<String> strList = List.of("a1", "a2", "a3", "a4", "a5");
List<IWork> w = List.of(new Work1(), new Work2());
Stream<String> stream = strList.parallelStream();
for (final IWork a : w) {
stream = stream.map(a::work);
}
List<String> finalArray = stream.collect(Collectors.toList());
for (String s : finalArray) {
System.out.println(s);
}
}
}
interface IWork {
String work(String s);
}
class Work1 implements IWork {
@Override
public String work(String s) {
return s + " * " + Thread.currentThread().getId();
}
}
class Work2 implements IWork {
@Override
public String work(String s) {
return s + " ! " + Thread.currentThread().getId();
}
}
并产生以下输出:
a1 * 13 ! 13
a2 * 13 ! 13
a3 * 1 ! 1
a4 * 1 ! 1
a5 * 1 ! 1
为了更清楚地了解哪个 IWork 实现正在运行,我添加了“*”或“!”来代替原始实现中的空格。
首先,感谢您将其更改为可运行的代码,以便我们可以看到发生了什么。
你让它变得有点难以理解,因为数据只是个位数,而你的操作也只是个位数,线程 ID 也可以是个位数。所以我将数据更改为“A”、“B”和“C”以使其更容易。
在
for
循环中,您正在循环 actions
,但实际上并不将 action
用于循环中的任何内容。因此,它本质上只是一个循环,它会根据您的操作运行多次,但每次迭代都没有不同。由于您将循环变量称为 a
,因此使捕获变得更加困难。尝试将其命名为更有意义的名称。
真正令人困惑的是,您正在循环
actions
,然后在循环内流式传输操作。所以这就是所应用的操作数量的平方。我不敢相信这就是您正在寻找的东西,但您似乎并不对输出的这方面感到不安,只是它针对每个输入进行了合并。
此外,还不清楚您对线程本身有多关心。输出中有它们,但为什么呢?为什么要使用
ParallelStream()
?
通过将 Stream 视为变量并逐步移动它,也使事情变得更加复杂。在我发布的第一个答案中,我发现将它们组合在一起会导致整个流通过单个线程运行,因此我没有在该答案中这样做,因为我不确定线程对您来说有多重要(并且他们不应该是)。
最后,如果您转储
for
循环,然后将其全部合并到一个 Stream 语句中,它就会完美地工作:
class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n + " 2" + " ,Thread ID:" + Thread.currentThread().getId();
return ss;
}
}
class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
@Override
public String arithmetic(String n) {
String ss = n + " 3" + " ,Thread ID:" + Thread.currentThread().getId();
return ss;
}
}
public class ArithmeticManagerStream {
private List<String> integerList = null;
private List<String> resultArray = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<ArithmeticAction> actions) throws InterruptedException {
return integerList.parallelStream()
.flatMap(str -> actions.stream().map(action -> action.arithmetic(str)))
.collect(Collectors.toList());
}
public interface ArithmeticAction {
String arithmetic(String n);
}
public static void main(String[] args) {
List<ArithmeticAction> actions = new ArrayList();
actions.add(new ArithmeticBBB());
actions.add(new ArithmeticCCC());
List<String> intData = new ArrayList<>();
intData.add("A");
intData.add("B");
intData.add("C");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
try {
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
for (String i : result) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这会导致:
***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
C 2 ,Thread ID:1
C 3 ,Thread ID:1
这对我来说似乎是正确的。每个源项目都会经过每个操作,生成它自己的输出,该输出被平面映射回原始流。
最后一点。我一直在寻求简化代码,并注意到您的界面是一个
Functional
构造。因此,您实际上可以放弃实现类,并在加载 actions
数组时使用 lambda 声明它们。然后,当然,很明显你的界面基本上是一个Function<String, String>
。所以我完全放弃了界面,然后清理了代码,使用 Stream 作为最终输出。
结果本质上是微不足道的,我添加了更多动作只是为了表明一点:
public class ArithmeticManagerStream {
private List<String> integerList = null;
public ArithmeticManagerStream(List<String> dataFromUser) {
this.integerList = dataFromUser;
}
public List<String> invokerActions(List<Function<String, String>> actions) {
return integerList.parallelStream().flatMap(str -> actions.stream().map(action -> action.apply(str))).collect(Collectors.toList());
}
public static void main(String[] args) {
List<Function<String, String>> actions = new ArrayList();
actions.add(n -> n + " 2" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 3" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 4" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 5" + " ,Thread ID:" + Thread.currentThread().getId());
actions.add(n -> n + " 6" + " ,Thread ID:" + Thread.currentThread().getId());
List<String> intData = new ArrayList<>();
intData.add("A");
intData.add("B");
intData.add("C");
ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
List<String> result = arithmeticManagerStream.invokerActions(actions);
System.out.println("***********************************************");
result.forEach(System.out::println);
}
}
输出为:
***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
A 4 ,Thread ID:13
A 5 ,Thread ID:13
A 6 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
B 4 ,Thread ID:1
B 5 ,Thread ID:1
B 6 ,Thread ID:1
C 2 ,Thread ID:14
C 3 ,Thread ID:14
C 4 ,Thread ID:14
C 5 ,Thread ID:14
C 6 ,Thread ID:14