如何在其他工作完成后才开始工作(Hangfire)

问题描述 投票:0回答:1

我有以下代码

public string ManterRequisicoesComprasJob(string routingKey, string message)
{
    try
    {
        var request = message.FromJson<HookOutDto>();
        var keys = request.GetKeys();
        var empresa = keys[0];
        var sistema = keys[1].ToInteger();
        var fornecedor = keys[2].ToInteger();
        var negociacao = keys[3];
        var periodo = keys[4].ToInteger();

        return BatchJob.StartNew(batch =>
        {
            var jobId = batch.Enqueue<IHookTargetService>(x => x.Adicionar(null, request.Id, TargetHook.PROCUREMENT));
            jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetGrupoRequisicoes(null, empresa, fornecedor, sistema, periodo, negociacao));
            jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.ProcessarRequisicoes(null, empresa, fornecedor, request.Id));
        },
          request.GetJobDescription()
      );
    }
    catch (Exception exception)
    {
        throw exception.Failin();
    }
}

在 ProcessarRequisicoes 工作中,我正在继续代码

public async Task<RequisicaoCompraRequest> ProcessarRequisicoes(PerformContext context, string empresa, int fornecedor, long parentId)
{
    try
    {
        var result = context.GetAntecedentResult<List<ReqFornecSistemaDto>>();
        var jobId = context.BackgroundJob.Id;
        context.SetJobParameter("PROCESSED", jobId);
        var requisicaoJobs = new List<string>();
        BatchJob.Attach(context.GetBatchId(), batch =>
        {
           foreach (var item in result)
                batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetRequisicao(null, empresa, item.Sistema, item.Local, item.Produto)); //duvida

            jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetCachedRequisicoes(null));
            jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetIdFornecedorProcurement(null, empresa, fornecedor));
            jobId = batch.ContinueJobWith<IMapperService>(jobId, x => x.MapFromAntecedentResult(null, typeof(Lar.Procurement.Client.RequisicaoCompra.RequestWrapper), null));
            jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.Send(null));
            batch.ContinueJobWith<IHookTargetService>(jobId, x => x.AtualizarFinalizado(null, parentId, TargetHook.PROCUREMENT));
        });

        return null;
    }
    catch (Exception exception)
    {
        throw exception.LimTargetException(context, TargetHook.PROCUREMENT);
    }
}

我想处理所有 GetRequisicao 作业,并在它们全部成功完成后,我想转到下一个作业 GetCachedRequisicoes。如果我按照以下方式执行,我什至可以做到这一点,但问题是执行是连续的,一项接着一项。

foreach (var item in result)
                jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetRequisicao(null, empresa, item.Sistema, item.Local, item.Produto));

但我想一次运行多个 GetRequisicao 作业,并且只有当它们全部完成时,才转到下一个 GetCachedRequisicoes 作业。有人可以帮助我吗?

c# jobs hangfire
1个回答
-3
投票

要同时处理多个

GetRequisicao
作业,然后仅在所有
GetCachedRequisicoes
作业完成后才继续处理
GetRequisicao
作业,您可以在 Hangfire 中使用批处理延续的概念。 Hangfire 的批处理功能允许您将作业分组在一起,并定义在批处理中的所有作业完成后运行的延续。

以下是如何修改代码来实现此目的:

  1. GetRequisicao
    作业创建新批次:

    • 不要将每个
      GetRequisicao
      作业链接到前一个作业,而是为这些作业创建一个新批次。这允许它们同时运行。
    • 使用
      BatchJob.StartNew
      在现有批次中启动新批次。
  2. GetRequisicao
    职位添加到新批次:

    • 将所有
      GetRequisicao
      作业添加到这个新批次中。
  3. 定义新批次的延续:

    • 添加所有
      GetRequisicao
      作业后,为将启动
      GetCachedRequisicoes
      作业的新批次定义延续。此延续仅在所有
      GetRequisicao
      作业完成后才会执行。

这是一个基于您的代码的示例:

public async Task<RequisicaoCompraRequest> ProcessarRequisicoes(PerformContext context, string empresa, int fornecedor, long parentId)
{
    try
    {
        var result = context.GetAntecedentResult<List<ReqFornecSistemaDto>>();
        var jobId = context.BackgroundJob.Id;
        context.SetJobParameter("PROCESSED", jobId);

        // Start a new batch for GetRequisicao jobs
        var getRequisicaoBatchId = BatchJob.StartNew(batch =>
        {
            foreach (var item in result)
            {
                batch.Enqueue<IRequisicaoCompraService>(x => x.GetRequisicao(null, empresa, item.Sistema, item.Local, item.Produto));
            }
        });

        // Define a continuation for the new batch
        BatchJob.ContinueBatchWith(getRequisicaoBatchId, batch =>
        {
            jobId = batch.Enqueue<IRequisicaoCompraService>(x => x.GetCachedRequisicoes(null));
            jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.GetIdFornecedorProcurement(null, empresa, fornecedor));
            jobId = batch.ContinueJobWith<IMapperService>(jobId, x => x.MapFromAntecedentResult(null, typeof(Lar.Procurement.Client.RequisicaoCompra.RequestWrapper), null));
            jobId = batch.ContinueJobWith<IRequisicaoCompraService>(jobId, x => x.Send(null));
            batch.ContinueJobWith<IHookTargetService>(jobId, x => x.AtualizarFinalizado(null, parentId, TargetHook.PROCUREMENT));
        });

        return null;
    }
    catch (Exception exception)
    {
        throw exception.LimTargetException(context, TargetHook.PROCUREMENT);
    }
}

在此修订后的方法中:

  • 所有
    GetRequisicao
    作业都添加到单独的批次中并同时运行。
  • GetCachedRequisicoes
    作业和后续作业被设置为仅在所有
    GetRequisicao
    作业完成后才运行的延续。
  • 这可确保
    GetCachedRequisicoes
    作业仅在所有
    GetRequisicao
    作业成功完成后启动,同时仍允许
    GetRequisicao
    作业同时运行。
© www.soinside.com 2019 - 2024. All rights reserved.