Reputation: 17108
Me again, hopefully with more specific threading question.
I note that if I run the fine demos that Chris Rolliston provides here:
http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
with FastMM turned on to report memory leaks, that the threads themselves are leaked.
That's no problem for a small demo, but for my app, with tens of thousands of iterations of using the thread, it runs my humble 32-bit app out of memory. (I can't compile for 64 bit because I'm using the 32-bit only CrossTalk).
How do you ensure that the thread is freed when used with a threaded queue?
NEW CODE ADDED
program SimpleThreadQueueConsole;
{$APPTYPE CONSOLE}
{$R *.res}
uses
FastMM4,
System.SysUtils,
System.SyncObjs,
uPrimeThread in 'uPrimeThread.pas',
uPrimeThreadRunner in 'uPrimeThreadRunner.pas';
var
PR: TPrimeThreadRunner;
begin
Randomize;
ReportMemoryLeaksOnShutdown := True;
PR := TPrimeThreadRunner.Create;
try
PR.DoIt;
finally
PR.Free;
end;
end.
uPrimeThread.pas
unit uPrimeThread;
interface
uses
System.Classes
, Generics.Collections
;
type
TPrimeThread = class(TThread)
private
FOutQueue: TThreadedQueue<string>;
FInQueue: TThreadedQueue<string>;
function IsPrime(const NumberToCheck: integer): boolean;
public
constructor Create(aCreateSuspended: Boolean; aInQueue: TThreadedQueue<string>; aOutQueue: TThreadedQueue<string>);
procedure Execute; override;
end;
implementation
uses
System.SysUtils
, System.SyncObjs
;
const
MaxPrime = 999;
{ TPrimeThread }
constructor TPrimeThread.Create(aCreateSuspended: Boolean; aInQueue, aOutQueue: TThreadedQueue<string>);
begin
inherited Create(aCreateSuspended);
FOutQueue := aOutQueue;
FInQueue := aInQueue;
FreeOnTerminate := True;
end;
procedure TPrimeThread.Execute;
var
S: string;
ThreadID: TThreadID;
NumberToCheck: integer;
begin
ThreadID := TThread.CurrentThread.ThreadID;
FOutQueue.PushItem(Format('Thread %d started...', [ThreadID]));
while (FInQueue.PopItem(S) = wrSignaled) do
begin
NumberToCheck := Random(MaxPrime);
if IsPrime(NumberToCheck) then
begin
FOutQueue.PushItem(Format('%s using thread %d: %d is prime', [S, ThreadID, NumberToCheck]));
end else
begin
FOutQueue.PushItem(Format('%s using thread %d: %d is NOT prime', [S, ThreadID, NumberToCheck]));
end;
end;
end;
function TPrimeThread.IsPrime(const NumberToCheck: Integer): boolean;
// This is really bad on purpose to make the threads work a little harder
var
i: integer;
begin
Result := True;
if NumberToCheck in [0, 1] then
begin
Result := False;
Exit;
end;
for i := 2 to NumberToCheck - 1 do
begin
if NumberToCheck mod i = 0 then
begin
Result := False;
Exit;
end;
end;
end;
end.
uPrimeThreadRunner.pas
unit uPrimeThreadRunner;
interface
uses
System.SyncObjs
, Generics.Collections
, System.SysUtils
, uPrimeThread
;
const
ThreadCount = 4;
type
TPrimeThreadRunner = class
private
FTotalThreads: TCountdownEvent;
FInQueue, FOutQueue: TThreadedQueue<string>;
FCurrentEntry: integer;
procedure DrainTheQueue;
procedure AddEntry;
public
ThreadArray: array[1..ThreadCount] of TPrimeThread;
procedure DoIt;
end;
implementation
const
NumberOfEntries = 10;
procedure TPrimeThreadRunner.DrainTheQueue;
var
S: string;
begin
while FOutQueue.PopItem(S) = wrSignaled do
WriteLn(S);
end;
procedure TPrimeThreadRunner.AddEntry;
var
S: string;
begin
Inc(FCurrentEntry);
S := Format('Entry %d:', [FCurrentEntry]) ;
FInQueue.PushItem(S);
end;
procedure TPrimeThreadRunner.DoIt;
var
i: integer;
begin
FCurrentEntry := 0;
FTotalThreads := TCountdownEvent.Create(1);
FInQueue := TThreadedQueue<string>.Create(10, 1000, 1000);
FOutQueue := TThreadedQueue<string>.Create(10, 1000, 1000);
for i := 1 to ThreadCount do
begin
FTotalThreads.AddCount;
try
ThreadArray[i] := TPrimeThread.Create(True, FInQueue, FOutQueue);
ThreadArray[i].Start;
finally
FTotalThreads.Signal;
end;
end;
for I := 1 to NumberOfEntries do
begin
AddEntry;
end;
DrainTheQueue;
FTotalThreads.Signal;
FTotalThreads.WaitFor;
FTotalThreads.Free;
FInQueue.Free;
FOutQueue.Free;
Readln;
end;
end.
Upvotes: 0
Views: 760
Reputation: 34899
There is an error in your code, I can replicate it in XE3 and XE6.
The critical error is here:
for I := 1 to NumberOfEntries do
begin
AddEntry;
end;
DrainTheQueue; // <-- All threads may not be ready when this call finish
FTotalThreads.Signal; // This makes nothing
FTotalThreads.WaitFor; // This makes nothing
FTotalThreads.Free; // This makes nothing
FInQueue.Free; // You may now free a queue in operation
FOutQueue.Free; // You may now free a queue in operation
You can't rely on DrainTheQueue
to collect all items until the threads are done.
When freeing the queues while one or more primethreads are running, the threads will operate on freed objects.
The easiest way to syncronize the finish of the threads, is to finish them with an empty string.
Call this before DrainTheQueue
:
for i := 1 to ThreadCount do
FInQueue.PushItem('');
And change the PrimeThread.Execute like this:
procedure TPrimeThread.Execute;
var
S: string;
ThreadID: TThreadID;
NumberToCheck: integer;
begin
ThreadID := TThread.CurrentThread.ThreadID;
FOutQueue.PushItem(Format('Thread %d started...', [ThreadID]));
try
while NOT Terminated do
begin
if (FInQueue.PopItem(S) = wrSignaled) then
begin
if (S = '') then // Stop executing
Exit;
NumberToCheck := Random(MaxPrime);
if IsPrime(NumberToCheck) then
begin
FOutQueue.PushItem(Format('%s using thread %d: %d is prime', [S, ThreadID, NumberToCheck]));
end else
begin
FOutQueue.PushItem(Format('%s using thread %d: %d is NOT prime', [S, ThreadID, NumberToCheck]));
end;
end;
end;
finally
FOutQueue.PushItem(Format('Thread %d ended ...', [ThreadID]));
end;
end;
You should also pass the FTotalThreads
object to the worker threads and let them do the signal when they finish executing.
DrainTheQueue
must be called after FTotalThreads.Waitfor
.
Upvotes: 2