Kiritonito
Kiritonito

Reputation: 424

Multithread queue in delphi?

This is my second question about this, im having some troubles with this >.<

Well, I just want to create a limited number of threads (in this case, I want 10 threads), and then each thread will pick up a name in my list and get some data in my site.

My system works pretty well, but my multi thread system still fails =(

--

I tried the code posted by LU RD, but the main thread don't wait the threads finish the queue, and just stops =(

The code:

uses
Classes,SyncObjs,Generics.Collections;

Type
TMyConsumerItem = class(TThread)
private
 FQueue : TThreadedQueue<TProc>;
 FSignal : TCountDownEvent;
protected
 procedure Execute; override;
public
 constructor Create( aQueue : TThreadedQueue<TProc>; aSignal : TCountdownEvent);
end;

constructor TMyConsumerItem.Create(aQueue: TThreadedQueue<TProc>; aSignal : TCountDownEvent);
begin
 Inherited Create(false);
 Self.FreeOnTerminate := true;
 FQueue := aQueue;
 FSignal := aSignal;
end;

procedure TMyConsumerItem.Execute;
var
aProc : TProc;
begin
 try
 repeat
  FQueue.PopItem(aProc);
  if not Assigned(aProc) then
   break; // Drop this thread
  aProc();
 until Terminated;
 finally
  FSignal.Signal;
 end;
end;

procedure DoSomeJob(myListItems : TStringList);
const
 cThreadCount = 10;
 cMyQueueDepth = 100;
var
i : Integer;
aQueue : TThreadedQueue<TProc>;
aCounter : TCountDownEvent;
function CaptureJob( const aString : string) : TProc;
begin
 Result :=
  procedure
  begin
    // Do some job with aString
  end;
end;
begin
aQueue := TThreadedQueue<TProc>.Create(cMyQueueDepth);
aCounter := TCountDownEvent.Create(cThreadCount);
try
 for i := 1 to cThreadCount do
  TMyConsumerItem.Create(aQueue,aCounter);
 for i := 0 to myListItems.Count-1 do begin
  aQueue.PushItem( CaptureJob( myListItems[i]));
 end;
finally
 for i := 1 to cThreadCount do
  aQueue.PushItem(nil);
 aCounter.WaitFor;  // Wait for threads to finish
 aCounter.Free;
 aQueue.Free;
end;
end;

My other question: Multi Thread Delphi

Im using Delphi XE3.

Upvotes: 0

Views: 8010

Answers (1)

LU RD
LU RD

Reputation: 34899

  • First, if you want to call the procedure DoSomeJob() and block until ready from the main thread, there is a caveat. If your worker threads are synchronizing with the main thread, there is a dead-lock situation with aCounter.WaitFor and TThread.Synchronize().

I am assuming that this is what is happening to you, guessing here.

There is a way to handle that as I will show in this answer.

  • Second, normally the worker threads should be handled by a thread pool, to avoid create/destroy threads all the time. Pass your job to the thread pool, so everything is run and waited for inside a thread. This avoids blocking the main thread. I will leave this up to you. Once that framework is written, threading will be easier. If this seems complex, try OTL threading framework instead.

Here is an example where the main thread can wait for DoSomeJob() in a safe manner. An anonymous thread is created to wait for the aCounter to signal. This example uses a TMemo and a TButton. Just create a form with these components and connect the button OnClick event to the ButtonClick method.

unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants,
  System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls;

type
  TForm1 = class(TForm)
    Button1: TButton;
    Memo1: TMemo;
    procedure Button1Click(Sender: TObject);
  private
    { Private declarations }
    procedure DoSomeJob( myListItems : TStringList);
  public
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

uses
  SyncObjs, Generics.Collections;

{- Include TMyConsumerItem class here }

procedure TForm1.Button1Click(Sender: TObject);
var
  aList : TStringList;
  i : Integer;
begin
  aList := TStringList.Create;
  Screen.Cursor := crHourGlass;
  try
    for i := 1 to 20 do aList.Add(IntToStr(i));
    DoSomeJob(aList);
  finally
    aList.Free;
    Screen.Cursor := crDefault;
  end;
end;

procedure TForm1.DoSomeJob(myListItems: TStringList);
const
  cThreadCount = 10;
  cMyQueueDepth = 100;
var
  i: Integer;
  aQueue: TThreadedQueue<TProc>;
  aCounter: TCountDownEvent;

  function CaptureJob(const aString: string): TProc;
  begin
    Result :=
      procedure
      var
        i,j : Integer;
      begin
        // Do some job with aString
        for i := 0 to 1000000 do
          j := i;
        // Report status to main thread
        TThread.Synchronize(nil,
          procedure
          begin
            Memo1.Lines.Add('Job with:'+aString+' done.');
          end
        );

      end;
  end;
var
  aThread : TThread;
begin
  aQueue := TThreadedQueue<TProc>.Create(cMyQueueDepth);
  aCounter := TCountDownEvent.Create(cThreadCount);
  try
    for i := 1 to cThreadCount do
      TMyConsumerItem.Create(aQueue, aCounter);
    for i := 0 to myListItems.Count - 1 do
    begin
      aQueue.PushItem(CaptureJob(myListItems[i]));
    end;
    // Kill the worker threads
    for i := 1 to cThreadCount do
      aQueue.PushItem(nil);
  finally
    // Since the worker threads synchronizes with the main thread,
    // we must wait for them in another thread.
    aThread := TThread.CreateAnonymousThread(
      procedure
      begin
        aCounter.WaitFor; // Wait for threads to finish
        aCounter.Free;
        aQueue.Free;
      end
    );
    aThread.FreeOnTerminate := false;
    aThread.Start;
    aThread.WaitFor;  // Safe to wait for the anonymous thread
    aThread.Free;
  end;
end;

end.  

Upvotes: 5

Related Questions