Reputation: 190
gabr's answer to another question shows an example of using Parallel.Pipeline for data processing.
At the moment I need to know when the Pipeline was started and when all its stages are completed. I read the other gabr's answer for this problem How to monitor Pipeline stages in OmniThreadLibrary?. I tried to do it like this (modified according to the answer):
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FCounterTotal: IOmniCounter;
FCounterProcessed: IOmniCounter;
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i, cnt: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
cnt := 0;
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Inc(cnt);
Sleep(1000); // simulate a work
end;
FCounterTotal.Value := cnt;
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
cnt: integer;
begin
for value in input do begin
JSON := value.AsInterface as ISuperObject;
// do something with JSON
cnt := FCounterProcessed.Increment;
if FCounterTotal.Value = cnt then
task.Comm.Send(WM_ENDED); // !!! message is not sent
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FCounterTotal := CreateCounter(-1);
FCounterProcessed := CreateCounter(0);
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
With task.Comm.Send(WM_STARTED)
all is OK, but the line task.Comm.Send(WM_ENDED)
is never executed. How do I know when the last stage has been completed? What is the correct way?
Upvotes: 1
Views: 450
Reputation: 190
I give thanks to gabr whose advice use a special sentinel
value helped me find a solution for my problem. This code works as expected:
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Sleep(1000); // simulate a work
end;
output.TryAdd(0); // to send a special 'sentinel' value
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
if input.IsInteger and (input.AsInteger = 0) then begin
output := 0; // if we got 'sentinel' value send it to the next stage
Exit;
end;
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
begin
for value in input do begin
if value.IsInteger and (value.AsInteger = 0) then begin
task.Comm.Send(WM_ENDED); // if we got 'sentinel' value
Continue;
end;
JSON := value.AsInterface as ISuperObject;
// do something with JSON
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
An alternative with using Exception as a sentinel (not worked yet, but I'm probably doing something wrong):
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
ESentinelException = class(Exception);
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Sleep(1000); // simulate a work
end;
raise ESentinelException.Create('sentinel');
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
begin
for value in input do begin
if value.IsException and (value.AsException is ESentinelException) then begin
task.Comm.Send(WM_ENDED); // if we got 'sentinel' Exception
value.AsException.Free;
end
else begin
JSON := value.AsInterface as ISuperObject;
// do something with JSON
end;
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.HandleExceptions
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
Upvotes: 2
Reputation: 26830
Your approach (which I initially proposed) has a race condition which prevents it from working. (Sorry, that was a flaw in my initial design.)
Basically, what happens is:
At that moment, Async_JSON is already waiting for the next data, which never comes, and is not checking the FCounterTotal value anymore.
Alternative approach would be to send a special sentinel
value into the pipeline as a last item.
An exception could also be used as a sentinel. If you raise exception in the first stage, it will 'flow' through the pipeline to the end where you can process it. No special work has to be done into any specific stage - by default a stage will just reraise an exception.
Upvotes: 2