Nick Hodges
Nick Hodges

Reputation: 17108

How do you ensure that a thread is freed when using a threaded queue in Delphi?

Me again, hopefully with more specific threading question.

I note that if I run the fine demos that Chris Rolliston provides here:

http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/

with FastMM turned on to report memory leaks, that the threads themselves are leaked.

That's no problem for a small demo, but for my app, with tens of thousands of iterations of using the thread, it runs my humble 32-bit app out of memory. (I can't compile for 64 bit because I'm using the 32-bit only CrossTalk).

How do you ensure that the thread is freed when used with a threaded queue?

NEW CODE ADDED

program SimpleThreadQueueConsole;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  FastMM4,
  System.SysUtils,
  System.SyncObjs,
  uPrimeThread in 'uPrimeThread.pas',
  uPrimeThreadRunner in 'uPrimeThreadRunner.pas';

var
  PR: TPrimeThreadRunner;

begin
  Randomize;
  ReportMemoryLeaksOnShutdown := True;
  PR := TPrimeThreadRunner.Create;
  try
    PR.DoIt;
  finally
    PR.Free;
  end;

end.

uPrimeThread.pas

    unit uPrimeThread;

    interface

    uses
          System.Classes
        , Generics.Collections
        ;

    type

      TPrimeThread = class(TThread)
      private
        FOutQueue: TThreadedQueue<string>;
        FInQueue: TThreadedQueue<string>;
        function IsPrime(const NumberToCheck: integer): boolean;
      public
        constructor Create(aCreateSuspended: Boolean; aInQueue:  TThreadedQueue<string>; aOutQueue:  TThreadedQueue<string>);
        procedure Execute; override;
      end;

    implementation

    uses
          System.SysUtils
        , System.SyncObjs
        ;

    const
      MaxPrime = 999;


    { TPrimeThread }

    constructor TPrimeThread.Create(aCreateSuspended: Boolean; aInQueue, aOutQueue: TThreadedQueue<string>);
    begin
      inherited Create(aCreateSuspended);
      FOutQueue := aOutQueue;
      FInQueue := aInQueue;
      FreeOnTerminate := True;
    end;

    procedure TPrimeThread.Execute;
    var
      S: string;
      ThreadID: TThreadID;
      NumberToCheck: integer;
    begin

        ThreadID := TThread.CurrentThread.ThreadID;
        FOutQueue.PushItem(Format('Thread %d started...', [ThreadID]));

        while (FInQueue.PopItem(S) = wrSignaled) do
        begin
          NumberToCheck := Random(MaxPrime);
          if IsPrime(NumberToCheck) then
          begin
            FOutQueue.PushItem(Format('%s using thread %d: %d is prime', [S, ThreadID, NumberToCheck]));
          end else
          begin
            FOutQueue.PushItem(Format('%s using thread %d: %d is NOT prime', [S, ThreadID, NumberToCheck]));
          end;
        end;
    end;

    function TPrimeThread.IsPrime(const NumberToCheck: Integer): boolean;
    // This is really bad on purpose to make the threads work a little harder
    var
      i: integer;
    begin
      Result := True;
      if NumberToCheck in [0, 1] then
      begin
        Result := False;
        Exit;
      end;

      for i := 2 to NumberToCheck - 1 do
      begin
        if NumberToCheck mod i = 0 then
        begin
          Result := False;
          Exit;
        end;
      end;
    end;

    end.

uPrimeThreadRunner.pas

unit uPrimeThreadRunner;

interface

uses
      System.SyncObjs

    , Generics.Collections
    , System.SysUtils
    , uPrimeThread
    ;

const
  ThreadCount = 4;

type
  TPrimeThreadRunner = class
  private
    FTotalThreads: TCountdownEvent;
    FInQueue, FOutQueue: TThreadedQueue<string>;
    FCurrentEntry: integer;
    procedure DrainTheQueue;
    procedure AddEntry;
  public
    ThreadArray: array[1..ThreadCount] of TPrimeThread;
    procedure DoIt;
  end;

implementation

const
  NumberOfEntries = 10;

procedure TPrimeThreadRunner.DrainTheQueue;
var
  S: string;
begin
  while FOutQueue.PopItem(S) = wrSignaled do
    WriteLn(S);
end;

procedure TPrimeThreadRunner.AddEntry;
var
  S: string;
begin
  Inc(FCurrentEntry);
  S := Format('Entry %d:', [FCurrentEntry]) ;
  FInQueue.PushItem(S);
end;


procedure TPrimeThreadRunner.DoIt;
var
  i: integer;
begin
  FCurrentEntry := 0;

  FTotalThreads := TCountdownEvent.Create(1);
  FInQueue := TThreadedQueue<string>.Create(10, 1000, 1000);
  FOutQueue := TThreadedQueue<string>.Create(10, 1000, 1000);
  for i := 1 to ThreadCount do
  begin
    FTotalThreads.AddCount;
    try
      ThreadArray[i] := TPrimeThread.Create(True, FInQueue, FOutQueue);
      ThreadArray[i].Start;
    finally
      FTotalThreads.Signal;
    end;
  end;


  for I := 1 to NumberOfEntries do
  begin
    AddEntry;
  end;
  DrainTheQueue;

  FTotalThreads.Signal;
  FTotalThreads.WaitFor;
  FTotalThreads.Free;




  FInQueue.Free;
  FOutQueue.Free;

  Readln;
end;

end.

Upvotes: 0

Views: 760

Answers (1)

LU RD
LU RD

Reputation: 34899

There is an error in your code, I can replicate it in XE3 and XE6.

The critical error is here:

for I := 1 to NumberOfEntries do
  begin
    AddEntry;
  end;
DrainTheQueue;  // <-- All threads may not be ready when this call finish

FTotalThreads.Signal;  // This makes nothing
FTotalThreads.WaitFor; // This makes nothing
FTotalThreads.Free;    // This makes nothing

FInQueue.Free;         // You may now free a queue in operation
FOutQueue.Free;        // You may now free a queue in operation

You can't rely on DrainTheQueue to collect all items until the threads are done. When freeing the queues while one or more primethreads are running, the threads will operate on freed objects.

The easiest way to syncronize the finish of the threads, is to finish them with an empty string.

Call this before DrainTheQueue:

for i := 1 to ThreadCount do
  FInQueue.PushItem('');

And change the PrimeThread.Execute like this:

procedure TPrimeThread.Execute;
var
  S: string;
  ThreadID: TThreadID;
  NumberToCheck: integer;
begin

  ThreadID := TThread.CurrentThread.ThreadID;
  FOutQueue.PushItem(Format('Thread %d started...', [ThreadID]));
  try
    while NOT Terminated do
    begin
      if (FInQueue.PopItem(S) = wrSignaled) then
      begin
        if (S = '') then  // Stop executing 
          Exit;
        NumberToCheck := Random(MaxPrime);
        if IsPrime(NumberToCheck) then
        begin
          FOutQueue.PushItem(Format('%s using thread %d: %d is prime', [S, ThreadID, NumberToCheck]));
        end else
        begin
          FOutQueue.PushItem(Format('%s using thread %d: %d is NOT prime', [S, ThreadID, NumberToCheck]));
        end;
      end;
    end;
  finally
    FOutQueue.PushItem(Format('Thread %d ended ...', [ThreadID]));
  end;
end;

You should also pass the FTotalThreads object to the worker threads and let them do the signal when they finish executing. DrainTheQueue must be called after FTotalThreads.Waitfor.

Upvotes: 2

Related Questions