Reputation: 197
I have an application that I have written, I have three thread. Thread 1, 2 and 3.
And two lists of numbers 1 and 2.
Thread No. 1 insert data into list No.1.
Thread No. 2 get data from list No.1.
Thread No. 2 insert data into list No.2.
Thread No. 3 get data from list No.2.
If threads are running simultaneously occupy a lot of CPU. Data may be entered in the list, number two and number three running thread is not needed.
How to notify that the threads in the list of new data is inserted and They need to start working on a new data processing do?
Techniques and efficient way to reach you?
thanks a lot, a lot.
Upvotes: 3
Views: 673
Reputation: 34889
Here is a sample that adds values in list1 and list2 from your three threads.
Each time a value is put into a list, an Event
is triggered and the thread handling this event pulls out the last value in the list and clears the event flag.
No new value can be put into a list until the event flag is cleared.
The middle thread makes an intermediate storage of a new value, not to stall the first thread.
All events are waitable and thus keeps the cpu at ease.
The lists are thread-safe.
program Project62;
{$APPTYPE CONSOLE}
uses
System.SysUtils,
System.Classes,
System.SyncObjs,
System.Generics.Collections;
Type
TMyThread1 = Class(TThread)
private
fMySyncAddList : TSimpleEvent;
fMyList : TThreadList<Integer>;
fAddVal : Integer;
public
Constructor Create(ASyncAddList: TSimpleEvent; AList: TThreadList<Integer>);
procedure Execute; override;
End;
TMyThread2 = Class(TThread)
private
fMySyncAddList1,fMySyncAddList2 : TSimpleEvent;
fMyList1,fMyList2 : TThreadList<Integer>;
fAddVal : Integer;
public
Constructor Create(ASyncAddList1,ASyncAddList2: TSimpleEvent; AList1,AList2 : TThreadList<Integer>);
procedure Execute; override;
End;
TMyThread3 = Class(TThread)
private
fMySyncAddList2 : TSimpleEvent;
fMyList2 : TThreadList<Integer>;
fAddVal : Integer;
public
Constructor Create(ASyncAddList2: TSimpleEvent; AList2 : TThreadList<Integer>);
procedure Execute; override;
End;
{ TMyThread1 }
constructor TMyThread1.Create( ASyncAddList : TSimpleEvent; AList: TThreadList<Integer>);
begin
Inherited Create(false);
fMySyncAddList := AsyncAddList;
fMyList := AList;
end;
procedure TMyThread1.Execute;
var
stateAcknowledged : boolean;
begin
stateAcknowledged := true;
while (not Terminated) do
begin
if stateAcknowledged then
begin // Do some work and adda a value to list1
fAddVal := Random(100);
fMyList.Add(fAddVal);
fMySyncAddList.SetEvent; // Signal a new addition
stateAcknowledged := false;
//ShowVal;
Sleep(1000);
end
else begin
stateAcknowledged := (fMySyncAddList.WaitFor(100) <> wrSignaled);
end;
end;
end;
{ TMyThread2 }
constructor TMyThread2.Create(ASyncAddList1, ASyncAddList2: TSimpleEvent;
AList1, AList2: TThreadList<Integer>);
begin
Inherited Create(false);
fMySyncAddList1 := AsyncAddList1;
fMySyncAddList2 := AsyncAddList2;
fMyList1 := AList1;
fMyList2 := AList2;
end;
procedure TMyThread2.Execute;
var
wr : TWaitResult;
list : TList<Integer>;
pulled : Boolean;
begin
pulled := false;
while (not Terminated) do
begin
if pulled then // Add a value to list2
begin
wr := fMySyncAddList2.WaitFor(0);
if (wr <> wrSignaled) then
begin
fMyList2.Add(fAddVal);
fMySyncAddList2.SetEvent; // Signal a new addition
pulled := false;
end
else Sleep(100);
end
else begin // Wait for a new value in list1
wr := fMySyncAddList1.WaitFor(INFINITE);
if Terminated then
Exit;
if (wr = wrSignaled) then
begin
// Pull out the value
list := fMyList1.LockList;
try
fAddVal := list.Last;
finally
fMyList1.UnlockList;
end;
// All clear
pulled := true;
fMySyncAddList1.ResetEvent;
//ShowVal;
end;
end;
end;
end;
{ TMyThread3 }
constructor TMyThread3.Create(ASyncAddList2: TSimpleEvent;
AList2: TThreadList<Integer>);
begin
Inherited Create(false);
fMySyncAddList2 := AsyncAddList2;
fMyList2 := AList2;
end;
procedure TMyThread3.Execute;
var
wr : TWaitResult;
list : TList<Integer>;
begin
while not Terminated do
begin
wr := fMySyncAddList2.WaitFor(INFINITE);
if Terminated then
Exit;
if (wr = wrSignaled) then // Wait for signal
begin
// Pull out the value
list := fMyList2.LockList;
try
fAddVal := list.Last;
//ShowVal;
finally
fMyList2.UnlockList;
end;
// Clear event
fMySyncAddList2.ResetEvent;
end;
end;
end;
var
list1,list2 : TThreadList<Integer>;
syncList1,syncList2 : TSimpleEvent;
thread1 : TMyThread1;
thread2 : TMyThread2;
thread3 : TMyThread3;
begin
list1 := TThreadList<Integer>.Create;
list2 := TThreadList<Integer>.Create;
syncList1 := TSimpleEvent.Create(Nil,True,False,'',false);
syncList2 := TSimpleEvent.Create(Nil,True,False,'',false);
thread3 := TMyThread3.Create(syncList2,list2);
thread2 := TMyThread2.Create(syncList1,syncList2,list1,list2);
thread1 := TMyThread1.Create(syncList1,list1);
Try
WriteLn('Press [Enter] key to stop.');
ReadLn;
Finally
thread3.Terminate;
syncList2.SetEvent; // Wake up call
thread3.Free;
thread2.Terminate;
syncList1.SetEvent; // Wake up call
thread2.Free;
thread1.Free;
syncList1.Free;
syncList2.Free;
list1.Free;
list2.Free;
End;
end.
Added an example where two TThreadedQueue
's transfer the information between the threads. The threads keeps internal lists of the integers instead. The code is much simpler as @DavidHeffernan points out.
program Project63;
{$APPTYPE CONSOLE}
uses
System.SysUtils,
System.Classes,
System.SyncObjs,
System.Generics.Collections;
Type
TMyThread1 = Class(TThread)
private
fMyList : TList<Integer>;
fMyQueue : TThreadedQueue<Integer>;
fAddVal : Integer;
public
Constructor Create(AQueue : TThreadedQueue<Integer>);
procedure Execute; override;
End;
TMyThread2 = Class(TThread)
private
fMyList1,fMyList2 : TList<Integer>;
fMyQueue1,fMyQueue2 : TThreadedQueue<Integer>;
fAddVal : Integer;
public
Constructor Create(AQueue1,AQueue2: TThreadedQueue<Integer>);
procedure Execute; override;
End;
TMyThread3 = Class(TThread)
private
fMyList : TList<Integer>;
fMyQueue : TThreadedQueue<Integer>;
fAddVal : Integer;
public
Constructor Create(AQueue : TThreadedQueue<Integer>);
procedure Execute; override;
End;
constructor TMyThread1.Create( AQueue : TThreadedQueue<Integer>);
begin
Inherited Create(false);
fMyQueue:= AQueue;
fMyList := TList<Integer>.Create;
end;
procedure TMyThread1.Execute;
begin
while (not Terminated) do
begin
Sleep(1000); // Simulate some work
fAddVal := Random(100);
fMyList.Add(fAddVal);
fMyQueue.PushItem(fAddVal); // Signal a new addition
end;
fMyList.Free;
end;
constructor TMyThread2.Create(AQueue1,AQueue2: TThreadedQueue<Integer>);
begin
Inherited Create(false);
fMyQueue1 := AQueue1;
fMyQueue2 := AQueue2;
fMyList1 := TList<Integer>.Create;
fMyList2 := TList<Integer>.Create;
end;
procedure TMyThread2.Execute;
var
queueSize : Integer;
begin
while (not Terminated) do
begin
if (fMyQueue1.PopItem(queueSize,fAddVal) = wrSignaled) and
(not Terminated) then
begin
fMyList1.Add(fAddVal);
// Do some work and send a new value to next thread
fMyQueue2.PushItem(fAddVal);
fMyList2.Add(fAddVal);
end;
end;
fMyList1.Free;
fMyList2.Free;
end;
constructor TMyThread3.Create(AQueue : TThreadedQueue<Integer>);
begin
Inherited Create(false);
fMyQueue := AQueue;
fMyList := TList<Integer>.Create;
end;
procedure TMyThread3.Execute;
var
queueSize : Integer;
begin
while not Terminated do
begin
if (fMyQueue.PopItem(queueSize,fAddVal) = wrSignaled) and
(not Terminated) then
begin
fMyList.Add(fAddVal);
// Do some work on list
end;
end;
fMyList.Free;
end;
var
queue1,queue2 : TThreadedQueue<Integer>;
thread1 : TMyThread1;
thread2 : TMyThread2;
thread3 : TMyThread3;
begin
queue1 := TThreadedQueue<Integer>.Create;
queue2 := TThreadedQueue<Integer>.Create;
thread3 := TMyThread3.Create(queue2);
thread2 := TMyThread2.Create(queue1,queue2);
thread1 := TMyThread1.Create(queue1);
Try
WriteLn('Press [Enter] key to stop.');
ReadLn;
Finally
thread3.Terminate;
queue2.PushItem(0); // Wake up call
thread3.Free;
thread2.Terminate;
queue1.PushItem(0); // Wake up call
thread2.Free;
thread1.Free;
queue1.Free;
queue2.Free;
End;
end.
Upvotes: 2
Reputation: 21507
First thing that comes to mind is semaphores. Semaphore is a synchronization primitive with a counter inside: one thread tries to decrement the counter, and if it's zero, the thread blocks (i.e. is not scheduled and doesn't eat CPU, just waits harmlessly). Another thread increments the counter, causing blocked thread(s) to continue.
You may have a semaphore for each list, so a consuming thread decrements the semaphore before reading, and a producer thread increments it after writing.
There is another thing to take care of: is it allowed for one thread to get data from list while another thread is modifying it? It depends on how lists (or collections in general) are implemented, so try to find something on "thread safety" of Lists in your documentation. Maybe you need another synchronization primite: mutex. Mutexes are "taken" and then "released" by threads, and when more than one thread attempts to lock a mutex, one of them has to wait. One mutex per list, with modifications and reads done with this mutex held, may be appropriate here.
Upvotes: 1