如何在 Java 中并行处理对象列表?

问题描述 投票:0回答:3

我有一个大约一千个 Java 对象的列表,并且正在迭代一个

List
容器来处理它们,对每个对象进行相同的处理。这种顺序方法需要花费大量时间进行处理,因此我想尝试通过并行处理来加快速度。我检查了 Java 执行器框架,但卡住了。

我想到了一种方法来实现我的要求。我想指定每个线程要处理的最小固定数量的对象,以便每个线程都能快速处理。我怎样才能实现这个目标?我应该使用其他方法吗?

例如:

List<Object> objects = new List<Object>(); 

for (Object object : objects) {
  // Doing some common operation for all Objects 
}
java multithreading concurrency multiprocessing executorservice
3个回答
10
投票

并行处理列表有很多选项:

使用并行流:

objects.stream().parallel().forEach(object -> {
    //Your work on each object goes here, using object
})
如果您想使用比 fork-join 池更多线程的池,请使用执行器服务来提交任务:

ExecutorService es = Executors.newFixedThreadPool(10); for(Object o: objects) { es.submit(() -> { //code here using Object o... } }

前面的示例本质上与传统的执行器服务相同,在单独的线程上运行任务。

作为这些的替代方案,您还可以使用完整的未来提交:

//You can also just run a for-each and manually add each //feature to a list List<CompletableFuture<Void>> futures = objects.stream().map(object -> CompletableFuture.runAsync(() -> { //Your work on each object goes here, using object })

如果需要,您可以使用
futures
对象检查每个执行的状态。


您可以使用

9
投票
,它将负责负载平衡。任务将分布在不同的线程上。

这是一个例子:

import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test { public static void main(String[] args) { // Fixed thread number ExecutorService service = Executors.newFixedThreadPool(10); // Or un fixed thread number // The number of threads will increase with tasks // ExecutorService service = Executors.newCachedThreadPool(10); List<Object> objects = new ArrayList<>(); for (Object o : objects) { service.execute(new MyTask(o)); } // shutdown // this will get blocked until all task finish service.shutdown(); try { service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } public static class MyTask implements Runnable { Object target; public MyTask(Object target) { this.target = target; } @Override public void run() { // business logic at here } } }


  

2
投票

public class ParallelProcessListElements { public void processList (int numberofthreads,List<Object>tempList, Object obj, Method method){ final int sizeofList=tempList.size(); final int sizeofsublist = sizeofList/numberofthreads; List<Thread> threadlist = new ArrayList<Thread>(); for(int i=0;i<numberofthreads;i++) { int firstindex = i*sizeofsublist; int lastindex = i*sizeofsublist+sizeofsublist; if(i==numberofthreads-1) lastindex=sizeofList; List<Object> subList=tempList.subList(firstindex,lastindex ); Thread th = new Thread(()->{ try{method.invoke(obj, subList);}catch(Exception e) {e.printStackTrace();} }); threadlist.add(th); } threadlist.forEach(th->{th.start();try{Thread.sleep(10);}catch(Exception e) {}}); } }

public class Demo { public static void main(String[] args) { List<Object> tempList= new ArrayList<Object>(); /** * Adding values to list... For Demo purpose.. */ for(int i=0;i<500;i++) tempList.add(i); ParallelProcessListElements process = new ParallelProcessListElements(); final int numberofthreads = 5; Object obj = new Demo(); Method method=null; try{ method=Demo.class.getMethod("printList", List.class);}catch(Exception e) {} /** * Method Call... */ process.processList(numberofthreads,tempList,obj,method); } public void printList(List<Integer>list) { /** * Business logic to process the list... */ list.forEach(item->{ try{Thread.sleep(1000);}catch(Exception e) {} System.out.println(item); }); } }
    

© www.soinside.com 2019 - 2024. All rights reserved.