对于这种情况,我应该如何使用线程在java中实现并行性?

问题描述 投票:0回答:1

所以我的场景是我收到一条执行作业的消息。该作业有一个 sourceId。现在,一次应运行具有一种类型的 sourceId 的一项作业,其他作业应排队。当一个作业启动时,它应该再次将自己的工作分解为多个小执行。对于每个小执行,我需要更新数据库以了解作业的多少部分已完成。一旦整个作业结束,我需要更新作业完成的数据库。如果出现错误,我需要将数据库中的作业标记为失败。 这是我想出的一个大概的草图。如果我遗漏了一些东西,你们能帮我吗?我应该如何进行数据库更新。是否有任何 Java 语言功能可以让我的生活变得轻松。 我还需要在 quarkus 应用程序中执行此操作。

class PrimaryWorker{
    ConcurrentHashMap<String,ArrayBlockingQueue> staging;

    public void submit(){
        staging.checkIfEntryExistForSource()
        if(yes){
            getTheEntry()
            createSecondaryWorkerWithQueue();
            pushTheJobToQueue();
        }else{
            createEntryForDataSource()
            createSecondaryWorkerWithQueue();
            pushTheJobToQueue();
        }
    }
}
class BusinessJobWorker{
    ArrayBlockingQueue input;
    ArrayBlockingQueue commonOutput;


    public SecondaryWorker(input,commonOutput){

    }

    public void run(){
        BusinessJob br = input.poll();
        ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
        if(canBeParallel()){
            queue = new ArrayBlockingQueue(1);
        }
        do{
            Batch batch = getEntities(br,batchSize,pageNumber);
            workerQueue.push(batch);
        }while(batch.hasNext);
        updateDatabase();
    }
    public void updateDatabase(){

    }
    public boolean canBeParallel(){

    }
}
class BatchWorker{
    ArrayBlockingQueue commonInput;
    ArrayBlockingQueue output;

    public BatchWorker(input){

    }
    
}

我仍然没有找到正确的方法来做到这一点,我希望获得关于如何完成多线程任务的专家意见。

java multithreading concurrency quarkus producer-consumer
1个回答
0
投票

你看过《叛变》吗?这是 Quarkus 中用于反应式编程的库,请参阅此链接:

https://smallrye.io/smallrye-mutiny/latest/tutorials/getting-mutiny/

无论如何,我还没有测试下面的代码,但它应该是这样的:

假设您有一个带有 MyJobEntity 的数据库(并且我在本示例中使用 Mongo):

public class MyJobEntity extends ReactivePanacheMongoEntity {
     public State state;
     public List<Execution> executions;     
}

然后您可以按如下方式处理作业:

public Uni<Void> processAllJobs() {
        return MyJobEntity.<MyJobEntity>streamAll()
                .call(job -> processJob(job))
                .call(job -> setJobState(job, "FINISHED"))
                .invoke(job -> Log.infof("Job(id=%s, state=COMPLETED) finished successfully", job.id))
                .onItem().ignoreAsUni();
    }

    public Uni<Void> processJob(MyJobEntity job) {
        return Multi.createFrom().iterable(job.executions)
                .onItem().transformToUniAndConcatenate(execution -> executePartOfJob(execution)
                        .onFailure()
                        .invoke(() -> new JobFailedException("Job(id=%s, executionId=%s) could not be completed".formatted(jobId, execution.id)))
                )
                .onItem().ignoreAsUni()
                .onFailure(JobFailedException.class).call(() -> setJobState(job, "FAILED"));
    }

此代码将并行处理每个作业,执行部分将按顺序处理。如果执行完成,作业的状态将更改为“COMPETED”之类的状态。否则,它将记录错误并将状态设置为 FAILED。

© www.soinside.com 2019 - 2024. All rights reserved.