接收单个任务完成通知 OmniThreadLibrary Parallel.ForEach

问题描述 投票:0回答:1

我使用 Delphi 10.4 和 OmniThreadLibrary,利用 Parallel.ForEach 构造来执行任务。但是,我遇到一个问题,只有在所有任务完成后我才会收到通知。有没有一种方法可以在单个任务完成时接收通知,类似于调度机制或 async foreach?以下是我的代码片段供参考:

`procedure TfrmForEachOutput.MSGNewData(var msg: TOmniMessage);
begin
  OutputDebugString('Finish');
end;

procedure TfrmForEachOutput.ProcessTransactions(input: TList<TSSRINTBWTH>; output: TList<TTransaction>);
var
  outQueue   : IOmniBlockingCollection;
  transaction: TOmniValue;
begin
  outQueue := TOmniBlockingCollection.Create;
  Parallel.ForEach(0, input.Count - 1)
    .TaskConfig(Parallel.TaskConfig.OnMessage(Self))
    .NoWait
    .Into(outQueue)
    .Execute(
    procedure(const task: IOmniTask;const value: integer; var result: TOmniValue)
    begin
     result := TTransaction.Create(input[value],task);
    end
  );
  for transaction in outQueue do
  begin
   output.Add(transaction.AsObject as TTransaction);
  end;
end;
    ....
    ....
begin
  task.Comm.Send(MSG_NEWDATA, '');//this executed only when all tasks has been completed
end;`

谢谢你

我尝试了时间表但没有成功。

multithreading delphi omnithreadlibrary
1个回答
0
投票

要接收每个任务的通知,请在任务执行后使用

task.comm.send
中的
Execute()
发送消息:

.Execute(
  procedure(const task: IOmniTask; const value: integer; var result: TOmniValue)
  begin
    result := TTransaction.Create(input[value], task);
    // => send notification after task's execution
    task.Comm.Send(MSG_NEWDATA, Format('task input %d', [value]));
  end
);

另请注意,在 OTL 中,您需要对要调用的方法的特定引用。

.TaskConfig(Parallel.TaskConfig.OnMessage(Self.MSGNewData)) // => call Self.MSGNewData on notification

检查并尝试使用更正后的代码:

procedure TfrmForEachOutput.MSGNewData(var msg: TOmniMessage);
begin
  OutputDebugString('msg: ' + msg.MsgData.AsString); // added task/msg result for debugging purpose
end;

procedure TfrmForEachOutput.ProcessTransactions(input: TList<TSSRINTBWTH>; output: TList<TTransaction>);
var
  outQueue   : IOmniBlockingCollection;
  transaction: TOmniValue;
begin
  outQueue := TOmniBlockingCollection.Create;
  Parallel.ForEach(0, input.Count - 1)
    .TaskConfig(Parallel.TaskConfig.OnMessage(Self.MSGNewData))
    .NoWait
    .Into(outQueue)
    .Execute(
      procedure(const task: IOmniTask; const value: integer; var result: TOmniValue)
      begin
        result := TTransaction.Create(input[value], task);
        // => send notification after task's execution
        task.Comm.Send(MSG_NEWDATA, Format('task value %d', [value]));
      end
    );
    
  for transaction in outQueue do
  begin
    output.Add(transaction.AsObject as TTransaction);
  end;
end;

    ....
begin
  task.Comm.Send(MSG_NEWDATA, 'all tasks completed'); // this executed only when all tasks has been completed
end;
© www.soinside.com 2019 - 2024. All rights reserved.