LuFang
LuFang

Reputation: 190

How to know the state of Pipeline stages in OmniThreadLibrary?

gabr's answer to another question shows an example of using Parallel.Pipeline for data processing.
At the moment I need to know when the Pipeline was started and when all its stages are completed. I read the other gabr's answer for this problem How to monitor Pipeline stages in OmniThreadLibrary?. I tried to do it like this (modified according to the answer):

unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, superobject,
  OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;

const
  WM_STARTED = WM_USER;
  WM_ENDED = WM_USER + 1;

type
  TForm1 = class(TForm)
    btnStart: TButton;
    btnStop: TButton;
    lbLog: TListBox;
    procedure btnStartClick(Sender: TObject);
    procedure btnStopClick(Sender: TObject);
  private
    FCounterTotal: IOmniCounter;
    FCounterProcessed: IOmniCounter;
    FIsBusy: boolean;
    FPipeline: IOmniPipeline;
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
  strict protected
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

var
  Form1: TForm1;

  procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';

implementation

uses IOUtils;

{$R *.dfm}

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  i, cnt: integer;
  f: string;
begin
  while not input.IsCompleted do begin

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min
    cnt := 0;

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
    begin
      output.TryAdd(f);
      Inc(cnt);
      Sleep(1000); // simulate a work
    end;
    FCounterTotal.Value := cnt;

    // I need to continously check a specified folder for new files, with
    // a period of 1 minute (60 sec) for an unlimited period of time.
    i := 60;
    repeat
      Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
      if input.IsCompleted then Break;
      dec(i);
    until i < 0;
  end;
end;

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
  sl: TStringList;
  ws: WideString;
begin
  sl := TStringList.Create;
  try
    sl.LoadFromFile(input.AsString);
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
    output := SO(ws);
//     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
  finally
    sl.Free;
  end;
end;

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  value: TOmniValue;
  JSON: ISuperObject;
  cnt: integer;
begin
  for value in input do begin
    JSON := value.AsInterface as ISuperObject;
    // do something with JSON

    cnt := FCounterProcessed.Increment;
    if FCounterTotal.Value = cnt then
      task.Comm.Send(WM_ENDED); // !!! message is not sent
  end;
end;

//
procedure TForm1.btnStartClick(Sender: TObject);
begin
  btnStart.Enabled := False;

  FCounterTotal := CreateCounter(-1);
  FCounterProcessed := CreateCounter(0);

  FPipeline := Parallel.Pipeline
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
    .Stage(Async_Parse)
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
    .Run;
end;

procedure TForm1.btnStopClick(Sender: TObject);
begin
  if Assigned(FPipeline) then begin
    FPipeline.Input.CompleteAdding;
    FPipeline := nil;
  end;

  btnStart.Enabled := True;
end;

//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
  FIsBusy := False;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;

procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
  FIsBusy := True;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;

end.

With task.Comm.Send(WM_STARTED) all is OK, but the line task.Comm.Send(WM_ENDED) is never executed. How do I know when the last stage has been completed? What is the correct way?

Upvotes: 1

Views: 450

Answers (2)

LuFang
LuFang

Reputation: 190

I give thanks to gabr whose advice use a special sentinel value helped me find a solution for my problem. This code works as expected:

unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, superobject,
  OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;

const
  WM_STARTED = WM_USER;
  WM_ENDED = WM_USER + 1;

type
  TForm1 = class(TForm)
    btnStart: TButton;
    btnStop: TButton;
    lbLog: TListBox;
    procedure btnStartClick(Sender: TObject);
    procedure btnStopClick(Sender: TObject);
  private
    FIsBusy: boolean;
    FPipeline: IOmniPipeline;
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
  strict protected
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

var
  Form1: TForm1;

  procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';

implementation

uses IOUtils;

{$R *.dfm}

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  i: integer;
  f: string;
begin
  while not input.IsCompleted do begin

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
    begin
      output.TryAdd(f);
      Sleep(1000); // simulate a work
    end;
    output.TryAdd(0); // to send a special 'sentinel' value

    // I need to continously check a specified folder for new files, with
    // a period of 1 minute (60 sec) for an unlimited period of time.
    i := 60;
    repeat
      Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
      if input.IsCompleted then Break;
      dec(i);
    until i < 0;
  end;
end;

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
  sl: TStringList;
  ws: WideString;
begin
  if input.IsInteger and (input.AsInteger = 0) then begin
    output := 0; // if we got 'sentinel' value send it to the next stage
    Exit;
  end;

  sl := TStringList.Create;
  try
    sl.LoadFromFile(input.AsString);
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
    output := SO(ws);
//     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
  finally
    sl.Free;
  end;
end;

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  value: TOmniValue;
  JSON: ISuperObject;
begin
  for value in input do begin

    if value.IsInteger and (value.AsInteger = 0) then begin
      task.Comm.Send(WM_ENDED); // if we got 'sentinel' value
      Continue;
    end;

    JSON := value.AsInterface as ISuperObject;
    // do something with JSON
  end;
end;

//
procedure TForm1.btnStartClick(Sender: TObject);
begin
  btnStart.Enabled := False;

  FPipeline := Parallel.Pipeline
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
    .Stage(Async_Parse)
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
    .Run;
end;

procedure TForm1.btnStopClick(Sender: TObject);
begin
  if Assigned(FPipeline) then begin
    FPipeline.Input.CompleteAdding;
    FPipeline := nil;
  end;

  btnStart.Enabled := True;
end;

//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
  FIsBusy := False;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;

procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
  FIsBusy := True;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;

end.

An alternative with using Exception as a sentinel (not worked yet, but I'm probably doing something wrong):

unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, superobject,
  OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;

const
  WM_STARTED = WM_USER;
  WM_ENDED = WM_USER + 1;

type
  ESentinelException = class(Exception);

  TForm1 = class(TForm)
    btnStart: TButton;
    btnStop: TButton;
    lbLog: TListBox;
    procedure btnStartClick(Sender: TObject);
    procedure btnStopClick(Sender: TObject);
  private
    FIsBusy: boolean;
    FPipeline: IOmniPipeline;
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
  strict protected
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

var
  Form1: TForm1;

  procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';

implementation

uses IOUtils;

{$R *.dfm}

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  i: integer;
  f: string;
begin
  while not input.IsCompleted do begin

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
    begin
      output.TryAdd(f);
      Sleep(1000); // simulate a work
    end;

    raise ESentinelException.Create('sentinel');

    // I need to continously check a specified folder for new files, with
    // a period of 1 minute (60 sec) for an unlimited period of time.
    i := 60;
    repeat
      Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
      if input.IsCompleted then Break;
      dec(i);
    until i < 0;
  end;
end;

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
  sl: TStringList;
  ws: WideString;
begin
  sl := TStringList.Create;
  try
    sl.LoadFromFile(input.AsString);
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
    output := SO(ws);
//     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
  finally
    sl.Free;
  end;
end;

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  value: TOmniValue;
  JSON: ISuperObject;
begin
  for value in input do begin

    if value.IsException and (value.AsException is ESentinelException) then begin
      task.Comm.Send(WM_ENDED); // if we got 'sentinel' Exception
      value.AsException.Free;
    end
    else begin
      JSON := value.AsInterface as ISuperObject;
      // do something with JSON
    end;
  end;
end;

//
procedure TForm1.btnStartClick(Sender: TObject);
begin
  btnStart.Enabled := False;

  FPipeline := Parallel.Pipeline
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
    .Stage(Async_Parse)
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
    .HandleExceptions
    .Run;
end;

procedure TForm1.btnStopClick(Sender: TObject);
begin
  if Assigned(FPipeline) then begin
    FPipeline.Input.CompleteAdding;
    FPipeline := nil;
  end;

  btnStart.Enabled := True;
end;

//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
  FIsBusy := False;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;

procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
  FIsBusy := True;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;

end.

Upvotes: 2

gabr
gabr

Reputation: 26830

Your approach (which I initially proposed) has a race condition which prevents it from working. (Sorry, that was a flaw in my initial design.)

Basically, what happens is:

  • Async_Files sends last file to the pipeline.
  • Async_Files block (simulating some workload).
  • Async_JSON receives and processes the last file.
  • Async_Files now sets the FCounterTotal counter.

At that moment, Async_JSON is already waiting for the next data, which never comes, and is not checking the FCounterTotal value anymore.

Alternative approach would be to send a special sentinel value into the pipeline as a last item.

An exception could also be used as a sentinel. If you raise exception in the first stage, it will 'flow' through the pipeline to the end where you can process it. No special work has to be done into any specific stage - by default a stage will just reraise an exception.

Upvotes: 2

Related Questions