john_who_is_doe
john_who_is_doe

Reputation: 389

Thread pool, Threadsafe Queue, OOP

Lets say there are some non-PC devices on an ethernet network and based on the given API, i can communicate with them using UDP.

I would like to use a thread pool and a thread safe queue for the UDP communication. Every worker thread would have its own indy TIdUDPclient instance. The communication is like sending one UDP datagram to a device then waiting for the answer. The answer is a UDP datagram with >2 bytes at the data segment.

The device is represented by the TDevice class instances.

TDevice = class(TObject)
private
  ...
  FStatus: byte;
  ...
public
  ...
  function GetStatus(): integer; //API commandID = 68 (Getting device Status)
end;

thPool is for creating/managing the threads and pushing the job to the queue:

TthPool = class(TObject)
private
  FQueue: TThreadedQueue<TrUDPdirectJob*>;
  FthWorkers: TList<TthWorker>;
public
  constructor Create( AthCount, AQueueDepth: integer); //creating the pool here
  function SendCommand( ArSendJob: TrSendJob );
end;

function SendCommand(ArSendJob TrSendJob): integer; 
begin
  ...
  FQueue.PushItem( ArSendJob );
end;

One function of TDevice is for getting the status of the hardware it represents and set the value of it's FStatus based on the received answer:

function TDevice.GetStatus(): integer;  //command byte: 68
const
  PARAMSLENGTH = 4;
var
  rSendJob: TrSendJob*        
begin
  rSendJob.IP := self.FIP;
  rSendJob.port := self.Fport;
  rSendJob.commandID := 68;
  rSendJob.paramslength := PARAMLENGTH ; //need to send the length (API req) 
  SetLength( rSendJob.params, PARAMSLENGTH  );  
  rSendJob.params[0] := $A; //just some parameters along the commandID
  rSendJob.params[1] := $B;
  rSendJob.params[2] := $C;
  rSendJob.params[3] := $D;
  ...
  thPool.SendCommand( rSendJob );   //pushing the job to the queue   
end;

*TrSendJob = record  //Job define
  ip: string;
  port: integer;
  commandID: byte;        
  params: Tparams; //Array of byte
  paramslength: byte;
end;

Sending out the UDP datagram from the worker thread:

procedure TthWorker.Execute;
var
  sendBuffer: TIDbytes;
  rBuffer: TIdBytes;
  rSendJob: TrSendJob;
begin
  inherited;
  repeat
    FQueue.PopItem(rSendJob); //Getting the job from the queue
    //Building the sending buffer
    sendBuffer[0] := rSendJob. ... ; 
    ...
    FIdUDPclient.SendBuffer( rSendJob.IP, rSendJob.port, sendBuffer );
    if FIdUDPclient.receiveBuffer(rbuffer, RECTIMEOUT_GLOBAL) >= 1 then
    begin
      //
    end;
  until Terminated;
end;

In sum, thPool's SendCommand is pushing the jobs into the queue, the worker thread pulling the Job from the queue and sending out the UDP datagram, then its getting back the answer bytes in rbuffer. But at this point, how to send the contents of receivebuffer back to TDevice instance for processing? What is the proper (OOP) way of doing this? I need to do the processing inside a TDevice method.

Is that a proper solution, if i define a plus method pointer field at TrSendJob so the worker thread knows what method to call for processing the buffer and set FState value of TDevice?

TProcessReceiveBuffer = function(ArBuffer: TIdBytes): integer;

TrSendJob = record  //Job define
  ip: string;
  port: integer;
  commandID: byte;        
  params: Tparams; //Tparams array of byte
  paramslength: byte;
  *ProcessReceiveBuffer: TProcessReceiveBuffer; //pointer to a TDevice method thats processing the receivebuffer
end;

Or, the whole concept is wrong.

Upvotes: 2

Views: 256

Answers (1)

Remy Lebeau
Remy Lebeau

Reputation: 596332

But at this point, how to send the contents of receivebuffer back to TDevice instance for processing?

You could include a TDevice object pointer in TrSendJob, then TthWorker will know which TDevice to pass the buffer to.

Or, you could put a buffer and TEvent into TrSendJob, then have TthWorker fill the buffer and signal the event, then have TDevice.GetStatus() wait for the event signal and process the buffer.

Upvotes: 2

Related Questions