我正在审查多线程并尝试实现一个创建单独线程来运行收集过程的应用程序。这个过程中的主要方法需要一个变量arraylist,我试图想出一种方法将arraylist传递给每个线程。
ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) context.getBean("taskExecutor");
MainTask mxTask = (MainTask) context.getBean("MainTask");
mxTask.setName("Thread 1");
taskExecutor.execute(mxTask);
MainTask mxTask2 = (MainTask) context.getBean("MainTask");
mxTask2.setName("Thread 2");
taskExecutor.execute(mxTask2);
上面是声明线程的类方法,MainTask是执行run()方法的类,然后调用其他main方法。
@Override
public void run() {
System.out.println(name + " is running.");
getConfigurations();
try {
mainRun();
} catch (MessagingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
getconfigurations()方法读入所有必需的配置变量。下面是每个线程需要运行的主要过程。
public static void mainRun() throws IOException {
ArrayList<String> filterList = new ArrayList<String>();
ArrayList<String> brokerList = new ArrayList<String>();
if(kafkaServers1 != null) {
brokerList.add(kafkaServers1);
} else if(!kafkaServers2.isEmpty()) {
brokerList.add(kafkaServers2);
} else if(!kafkaServers3.isEmpty()) {
brokerList.add(kafkaServers3);
}
ArrayList<String> serverList = new ArrayList<String>();
for(int x=0;x<brokerList.size();x++){
String[] serverBrokers = brokerList.get(x).split(",");
serverList.add(serverBrokers[0]);
serverList.add(serverBrokers[1]);
serverList.add(serverBrokers[2]);
}
try {
while(true){
for (String temp : serverList) {
kafkaServer = temp;
hostName = kafkaServer;
InetAddress addr = InetAddress.getByName(hostName);
hostName = addr.getHostName();
kafkaServer= hostName;
retrieveData(hostName);
...
变量kafkaServers1包含3个ips的列表,它们被拆分并且每个都添加到serverList arrayList。我想要做的只是为每个线程分配一个ip。这可能吗?有人可以建议吗?
您可以检索Kafka配置,然后为每个主机创建一个新的Runnable,而不是让您的Runnable对象成为bean。将其提交给遗嘱执行人。
MyRunnable infoGetter = new MyRunnable(hostInfo);
taskExecutor.execute(infoGetter);
如果你需要对结果做一些事情,CompletableFuture可能是更好的选择,如果你需要合并它们,可以选择fork / join RecursiveTask<V>
。