Jerry Dodge
Jerry Dodge

Reputation: 27296

How to make a thread finish its work before being free'd?

I'm writing a thread which writes event logs. When the application is closed (gracefully), I need to make sure this thread finishes its job saving the logs before it's free'd. If I call Free directly to the thread, it shouldn't immediately be destroyed, it should wait until the thread is done and there's no more work left to do.

Here is how I have my thread's execution laid out:

procedure TEventLogger.Execute;
var
  L: TList;
  E: PEventLog; //Custom record pointer
begin
  while not Terminated do begin //Repeat continuously until terminated
    try
      E:= nil;
      L:= LockList; //Acquire locked queue of logs to be written
      try
        if L.Count > 0 then begin //Check if any logs exist in queue
          E:= PEventLog(L[0]); //Get next log from queue
          L.Delete(0); //Remove log from queue
        end;
      finally
        UnlockList;
      end;
      if E <> nil then begin
        WriteEventLog(E); //Actual call to save log
      end;
    except
      //Handle exception...
    end;
    Sleep(1);
  end;
end;

And here's the destructor...

destructor TEventLogger.Destroy;
begin
  ClearQueue; //I'm sure this should be removed
  FQueue.Free;
  DeleteCriticalSection(FListLock);
  inherited;
end;

Now I already know that at the time when Free is called, I should raise a flag making it impossible to add any more logs to the queue - it just needs to finish what's already there. My issue is that I know the above code will forcefully be cut off when the thread is free'd.

How should I make this thread finish its work when Free has been called? Or if that's not possible, how in general should this thread be structured for this to happen?

Upvotes: 3

Views: 1667

Answers (4)

David Heffernan
David Heffernan

Reputation: 613511

This is my take on how to write a consumer thread. The first piece of the jigsaw is a blocking queue. Mine looks like this:

unit BlockingQueue;

interface

uses
  Windows, SyncObjs, Generics.Collections;

type
  TBlockingQueue<T> = class
  //see Duffy, Concurrent Programming on Windows, pp248
  private
    FCapacity: Integer;
    FQueue: TQueue<T>;
    FLock: TCriticalSection;
    FNotEmpty: TEvent;
    function DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean;
  public
    constructor Create(Capacity: Integer=-1);//default to unbounded
    destructor Destroy; override;
    function Enqueue(const Value: T): Boolean;
    procedure ForceEnqueue(const Value: T);
    function Dequeue: T;
  end;

implementation

{ TBlockingQueue<T> }

constructor TBlockingQueue<T>.Create(Capacity: Integer);
begin
  inherited Create;
  FCapacity := Capacity;
  FQueue := TQueue<T>.Create;
  FLock := TCriticalSection.Create;
  FNotEmpty := TEvent.Create(nil, True, False, '');
end;

destructor TBlockingQueue<T>.Destroy;
begin
  FNotEmpty.Free;
  FLock.Free;
  FQueue.Free;
  inherited;
end;

function TBlockingQueue<T>.DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean;
var
  WasEmpty: Boolean;
begin
  FLock.Acquire;
  Try
    Result := IgnoreCapacity or (FCapacity=-1) or (FQueue.Count<FCapacity);
    if Result then begin
      WasEmpty := FQueue.Count=0;
      FQueue.Enqueue(Value);
      if WasEmpty then begin
        FNotEmpty.SetEvent;
      end;
    end;
  Finally
    FLock.Release;
  End;
end;

function TBlockingQueue<T>.Enqueue(const Value: T): Boolean;
begin
  Result := DoEnqueue(Value, False);
end;

procedure TBlockingQueue<T>.ForceEnqueue(const Value: T);
begin
  DoEnqueue(Value, True);
end;

function TBlockingQueue<T>.Dequeue: T;
begin
  FLock.Acquire;
  Try
    while FQueue.Count=0 do begin
      FLock.Release;
      Try
        FNotEmpty.WaitFor;
      Finally
        FLock.Acquire;
      End;
    end;
    Result := FQueue.Dequeue;
    if FQueue.Count=0 then begin
      FNotEmpty.ResetEvent;
    end;
  Finally
    FLock.Release;
  End;
end;

end.

It is completely threadsafe. Any thread can enqueue. Any thread can dequeue. The dequeue function will block if the queue is empty. The queue can be operated in either bounded or unbounded modes.

Next up we need a thread that works with such a queue. The thread simply pulls jobs off the queue until it is told to terminate. My consumer thread looks like this:

unit ConsumerThread;

interface

uses
  SysUtils, Classes, BlockingQueue;

type
  TConsumerThread = class(TThread)
  private
    FQueue: TBlockingQueue<TProc>;
    FQueueFinished: Boolean;
    procedure SetQueueFinished;
  protected
    procedure TerminatedSet; override;
    procedure Execute; override;
  public
    constructor Create(Queue: TBlockingQueue<TProc>);
  end;

implementation

{ TConsumerThread }

constructor TConsumerThread.Create(Queue: TBlockingQueue<TProc>);
begin
  inherited Create(False);
  FQueue := Queue;
end;

procedure TConsumerThread.SetQueueFinished;
begin
  FQueueFinished := True;
end;

procedure TConsumerThread.TerminatedSet;
begin
  inherited;
  //ensure that, if the queue is empty, we wake up the thread so that it can quit
  FQueue.ForceEnqueue(SetQueueFinished);
end;

procedure TConsumerThread.Execute;
var
  Proc: TProc;
begin
  while not FQueueFinished do begin
    Proc := FQueue.Dequeue();
    Proc();
    Proc := nil;//clear Proc immediately, rather than waiting for Dequeue to return since it blocks
  end;
end;

end.

This has the very property that you are looking for. Namely that when the thread is destroyed, it will process all pending tasks before completing the destructor.

To see it in action, here's a short demonstration program:

unit Main;

interface

uses
  Windows, SysUtils, Classes, Controls, Forms, StdCtrls,
  BlockingQueue, ConsumerThread;

type
  TMainForm = class(TForm)
    Memo1: TMemo;
    TaskCount: TEdit;
    Start: TButton;
    Stop: TButton;
    procedure StartClick(Sender: TObject);
    procedure StopClick(Sender: TObject);
  private
    FQueue: TBlockingQueue<TProc>;
    FThread: TConsumerThread;
    procedure Proc;
    procedure Output(const Msg: string);
  end;

implementation

{$R *.dfm}

procedure TMainForm.Output(const Msg: string);
begin
  TThread.Synchronize(FThread,
    procedure
    begin
      Memo1.Lines.Add(Msg);
    end
  );
end;

procedure TMainForm.Proc;
begin
  Output(Format('Consumer thread ID: %d', [GetCurrentThreadId]));
  Sleep(1000);
end;

procedure TMainForm.StartClick(Sender: TObject);
var
  i: Integer;
begin
  Memo1.Clear;
  Output(Format('Main thread ID: %d', [GetCurrentThreadId]));
  FQueue := TBlockingQueue<TProc>.Create;
  FThread := TConsumerThread.Create(FQueue);
  for i := 1 to StrToInt(TaskCount.Text) do
    FQueue.Enqueue(Proc);
end;

procedure TMainForm.StopClick(Sender: TObject);
begin
  Output('Stop clicked, calling thread destructor');
  FreeAndNil(FThread);
  Output('Thread destroyed');
  FreeAndNil(FQueue);
end;

end.

object MainForm: TMainForm
  Caption = 'MainForm'
  ClientHeight = 560
  ClientWidth = 904
  object Memo1: TMemo
    Left = 0
    Top = 96
    Width = 904
    Height = 464
    Align = alBottom
  end
  object TaskCount: TEdit
    Left = 8
    Top = 8
    Width = 121
    Height = 21
    Text = '10'
  end
  object Start: TButton
    Left = 8
    Top = 48
    Width = 89
    Height = 23
    Caption = 'Start'
    OnClick = StartClick
  end
  object Stop: TButton
    Left = 120
    Top = 48
    Width = 75
    Height = 23
    Caption = 'Stop'
    OnClick = StopClick
  end
end

Upvotes: 6

Stijn Sanders
Stijn Sanders

Reputation: 36850

Modifying your code, I would suggest checking the last queue count in the while as well, notice variable LastCount I introduced here:

procedure TEventLogger.Execute;
var
  L: TList;
  E: PEventLog; //Custom record pointer
  LastCount: integer;
begin
  LastCount:=0;//counter warning
  while not (Terminated and (LastCount=0)) do begin //Repeat continuously until terminated
    try
      E:= nil;
      L:= LockList; //Acquire locked queue of logs to be written
      try
        LastCount:=L.Count;
        if LastCount > 0 then begin //Check if any logs exist in queue
          E:= PEventLog(L[0]); //Get next log from queue
          L.Delete(0); //Remove log from queue
        end;
      finally
        UnlockList;
      end;
      if E <> nil then begin
        WriteEventLog(E); //Actual call to save log
      end;
    except
      //Handle exception...
    end;
    Sleep(1);
  end;
end;

Upvotes: 3

Sir Rufo
Sir Rufo

Reputation: 19106

Here is a "lazy" EventLogger thread which will save all events in the queue.

unit EventLogger;

interface

uses
  Classes, SyncObjs, Contnrs;

type
  TEventItem = class
    TimeStamp : TDateTime;
    Info : string;
  end;

  TEventLogger = class( TThread )
  private
    FStream : TStream;
    FEvent :  TEvent;
    FQueue :  TThreadList;
  protected
    procedure TerminatedSet; override;
    procedure Execute; override;
    procedure WriteEvents;
    function GetFirstItem( out AItem : TEventItem ) : Boolean;
  public
    constructor Create; overload;
    constructor Create( CreateSuspended : Boolean ); overload;
    destructor Destroy; override;

    procedure LogEvent( const AInfo : string );
  end;

implementation

uses
  Windows, SysUtils;

{ TEventLogger }

constructor TEventLogger.Create( CreateSuspended : Boolean );
begin
  FEvent := TEvent.Create;
  FQueue := TThreadList.Create;

  inherited;
end;

constructor TEventLogger.Create;
begin
  Create( False );
end;

destructor TEventLogger.Destroy;
begin
  // first the inherited part
  inherited;
  // now freeing the internal instances
  FStream.Free;
  FQueue.Free;
  FEvent.Free;
end;

procedure TEventLogger.Execute;
var
  LFinished : Boolean;
begin
  inherited;
  LFinished := False;
  while not LFinished do
    begin

      // waiting for event with 20 seconds timeout
      // maybe terminated or full queue
      WaitForSingleObject( FEvent.Handle, 20000 );

      // thread will finished if terminated
      LFinished := Terminated;

      // write all events from queue
      WriteEvents;

      // if the thread gets terminated while writing
      // it will be still not finished ... and therefor one more loop

    end;
end;

function TEventLogger.GetFirstItem( out AItem : TEventItem ) : Boolean;
var
  LList : TList;
begin
  LList := FQueue.LockList;
  try
    if LList.Count > 0
    then
      begin
        AItem := TEventItem( LList[0] );
        LList.Delete( 0 );
        Result := True;
      end
    else
      Result := False;
  finally
    FQueue.UnlockList;
  end;
end;

procedure TEventLogger.LogEvent( const AInfo : string );
var
  LList : TList;
  LItem : TEventItem;
begin
  if Terminated
  then
    Exit;

  LItem           := TEventItem.Create;
  LItem.TimeStamp := now;
  LItem.Info      := AInfo;

  LList := FQueue.LockList;
  try

    LList.Add( LItem );

    // if the queue is "full" we will set the event

    if LList.Count > 50
    then
      FEvent.SetEvent;

  finally
    FQueue.UnlockList;
  end;

end;

procedure TEventLogger.TerminatedSet;
begin
  // this is called if the thread is terminated
  inherited;
  FEvent.SetEvent;
end;

procedure TEventLogger.WriteEvents;
var
  LItem :   TEventItem;
  LStream : TStream;
begin
  // retrieve the first event in list
  while GetFirstItem( LItem ) do
    try

      // writing the event to a file

      if not Assigned( FStream )
      then
        FStream := TFileStream.Create( ChangeFileExt( ParamStr( 0 ), '.log' ), fmCreate or fmShareDenyWrite );

      // just a simple log row
      LStream := 
        TStringStream.Create( 
          Format( 
            '[%s] %s : %s', 
             // when it is written to file
            [FormatDateTime( 'dd.mm.yyyy hh:nn:ss.zzz', now ),
             // when did it happend
             FormatDateTime( 'dd.mm.yyyy hh:nn:ss.zzz', LItem.TimeStamp ),
             // whats about 
             LItem.Info] ) + sLineBreak, 
          TEncoding.UTF8 );
      try
        LStream.Seek( 0, soFromBeginning );
        FStream.CopyFrom( LStream, LStream.Size );
      finally
        LStream.Free;
      end;

    finally
      LItem.Free;
    end;
end;

end.

Upvotes: 3

David Heffernan
David Heffernan

Reputation: 613511

If I call Free directly to the thread, it shouldn't immediately be destroyed, it should wait until the thread is done and there's no more work left to do.

I think you have a slight mis-understanding of what happens when you destroy a thread. When you call Free on a TThread, the following happens in the destructor:

  1. Terminate is called.
  2. WaitFor is called.
  3. The remainder of the thread's destructor then runs.

In other words, calling Free already does what you ask for, namely notifying the thread method that it needs to terminate, and then waiting for it to do so.

Since you are in control of the thread's Execute method, you can do as much or as little work there once you detect that the Terminated flag has been set. As Remy suggests, you could override DoTerminate and do your last pieces of work there.


For what it is worth, this is a poor way to implement a queue. That call to Sleep(1) jumps right out at me. What you need is a blocking queue. You empty the queue and then wait on an event. When the producer adds to the queue the event is signaled so that your thread can wake up.

Upvotes: 13

Related Questions