guinalz
guinalz

Reputation: 47

Cloning / Copying / Duplicating Streams in Lazarus

I developed a procedure that receives a TStream; but the basic type, to allow the sending of all the types of stream heirs.

This procedure is intended to create one thread to each core, or multiple threads. Each thread will perform detailed analysis of stream data (read-only), and as Pascal classes are assigned by reference, and never by value, there will be a collision of threads, since the reading position is intercalará.

To fix this, I want the procedure do all the work to double the last TStream in memory, allocating it a new variable. This way I can duplicate the TStream in sufficient numbers so that each thread has a unique TStream. After the end of the very thread library memory.

Note: the procedure is within a DLL, the thread works.

Note 2: The goal is that the procedure to do all the necessary service, ie without the intervention of code that calls; You could easily pass an Array of TStream, rather than just a TStream. But I do not want it! The aim is that the service is provided entirely by the procedure.

Do you have any idea how to do this?

Thank you.

Addition:

I had a low-level idea, but my knowledge in Pascal is limited.

  1. Identify the object's address in memory, and its size.
  2. create a new address in memory with the same size as the original object.
  3. copy the entire contents (raw) object to this new address.
  4. I create a pointer to TStream that point to this new address in memory.

This would work, or is stupid?? If yes, how to operate? Example Please!

2º Addition:

Just as an example, suppose the program perform brute force attacks on encrypted streams (just an example, because it is not applicable):

Scene: A 30GB file in a CPU with 8 cores:

1º - TMemoryStream:

Create 8 TMemoryStream and copy the entire contents of the file for each of TMemoryStreams. This will result in 240GB RAM in use simultaneously. I consider this broken idea. In addition it would increase the processing time to the point of fastest not use multithreading. I would have to read the entire file into memory, and then loaded, begin to analyze it. Broke!

 * A bad alternative to TMemoryStream is to copy the file slowly to TMemoryStream in lots of 100MB / core (800MB), not to occupy the memory. So each thread looks only 100MB, frees the memory until you complete the entire file. But the problem is that it would require Synchronize() function in DLL, which we know does not work out as I open question in Synchronize () DLL freezes without errors and crashes

2º - TFileStream:

This is worse in my opinion. See, I get a TStream, create 8 TFileStream and copy all the 30GB for each TFileStream. That sucks because occupy 240GB on disk, which is a high value, even to HDD. The read and write time (copy) in HD will make the implementation of multithreaded turns out to be more time consuming than a single thread. Broke!

Conclusion: The two approaches above require use synchronize() to queue each thread to read the file. Therefore, the threads are not operating simultaneously, even on a multicore CPU. I know that even if he could simultaneous access to the file (directly creating several TFileStream), the operating system still enfileiraria threads to read the file one at a time, because the HDD is not truly thread-safe, he can not read two data at the same time . This is a physical limitation of the HDD! However, the queuing management of OS is much more effective and decrease the latent bottleneck efficiently, unlike if I implement manually synchronize(). This justifies my idea to clone TStream, would leave with S.O. all the working to manage file access queue; without any intervention - and I know he will do it better than me.

Example

In the above example, I want 8 Threads analyze differently and simultaneously the same Stream, knowing that the threads do not know what kind of Stream provided, it can be a file Stream, a stream from the Internet, or even a small TStringStream . The main program will create only one Strean, and will with configuration parameters. A simple example:

TModeForceBrute = (M1, M2, M3, M4, M5...)
TModesFB = set of TModeForceBrute;

TService = record
  stream: TStream;
  modes: array of TModesFB;
end;

For example, it should be possible to analyze only the Stream M1, M2 only, or both [M1, M2]. The TModesFB composition changes the way the stream is analyzed. Each item in the array "modes", which functions as a task list, will be processed by a different thread. An example of a task list (JSON representation):

{
  Stream: MyTstream,
  modes: [
    [M1, m5],
    [M1],
    [M5, m2],
    [M5, m2, m4, m3],
    [M1, m1, m3]
  ]
}

Note: In analyzer [m1] + [m2] <> [m1, m2].

In Program:

function analysis(Task: TService; maxCores: integer): TMyResultType; external 'mydll.dll';

In DLL:

// Basic, simple and fasted Exemple! May contain syntax errors or logical.
function analysis(Task: TService; maxCores: integer): TMyResultType; 
var 
  i, processors : integer;

begin
  processors := getCPUCount();

  if (maxCores < processors) and (maxCores > 0) then
    processors := maxCores;

  setlength (globalThreads, processors);

  for i := 0 to processors - 1 do
    // It is obvious that the counter modes in the original is not the same counter processors.
    if i < length(Task.modes) then begin
      globalThreads[i] := TAnalusysThread.create(true, Task.stream, Task.modes[i])
      globalThreads[i].start();
    end;

  [...]
end;

Note: With a single thread the program works beautifully, with no known errors.

I want each thread to take care of a type of analysis, and I can not use Synchronize() in DLL. Understand? There is adequate and clean solution?

Upvotes: -1

Views: 923

Answers (2)

guinalz
guinalz

Reputation: 47

I'm answering my question, because I figured that no one had a really good solution. Perhaps because there is none!

So I adapted the idea of Marco van de Voort and Ken White, for a solution that works using TMemoryStream with partial load in memory batch 50MB, using TRTLCriticalSection for synchronization.

The solution also contains the same drawbacks mentioned in addition 2; are they:

  1. Queuing access to HDD is the responsibility of my program and not of the operating system;
  2. A single thread carries twice the same data in memory.
  3. Depending on the processor speed, it may be that the thread analyze well the fast 50MB of memory; On the other hand, to load memory can be very slow. That would make the use of multiple threads are run sequentially, losing the advantage of using multithreaded, because every thread are congested access to the file, running sequentially as if they were a single thread.

So I consider this solution a dirty solution. But for now it works!

Below I give a simple example. This means that this adaptation may contain obvious errors of logic and / or syntax. But it is enough to demonstrate.

Using the same example of the issue, instead of passing a current to the "analysis" is passed a pointer to the process. This procedure is responsible for making the reading of the stream batch 50MB in sync.

Both DLL and Program:

TLotLoadStream = function (var toStm: TMemoryStream; lot, id: integer): int64 of object;

TModeForceBrute = (M1, M2, M3, M4, M5...)
TModesFB = set of TModeForceBrute;

TaskTService = record
  reader: TLotLoadStream; {changes here <<<<<<< } 
  modes: array of TModesFB;
end;

In Program:

type
{ another code here }
TForm1 = class(TForm)
  { another code here }

  CS    : TRTLCriticalSection;  
  stream: TFileStream;
  function MyReader(var toStm: TMemoryStream; lot: integer): int64 of object;

  { another code here }
end;

function analysis(Task: TService; maxCores: integer): TMyResultType; external 'mydll.dll';

{ another code here }

implementation

{ another code here }

function TForm1.MyReader(var toStm: TMemoryStream; lot: integer): int64 of object;
const
  lotSize = (1024*1024) * 50; // 50MB

var
  ler: int64;

begin
  result := -1;
  { 
    MUST BE PERFORMED PREVIOUSLY - FOR EXAMPLE IN TForm1.create()
    InitCriticalSection (self.CriticalSection);
  }   

  toStm.Clear;
  ler    := 0;

  { ENTERING IN CRITICAL SESSION  }
  EnterCriticalSection(self.CS);

  { POSITIONING IN LOT OF BEGIN}
  self.streamSeek(lot * lotSize, soBeginning);

  if (lot = 0) and (lotSize >= self.stream.size) then
    ler := self.stream.size
  else
    if self.stream.Size >= (lotSize + (lot * lotSize)) THEN
      ler := lotSize
    else
      ler := (self.stream.Size) - self.stream.Position; // stream inicia em 0?

  { COPYNG }
  if (ler > 0) then
    toStm.CopyFrom(self.stream, ler);

  { LEAVING THE CRITICAL SECTION }
  LeaveCriticalSection(self.CS);

  result := ler; 
end;

In DLL:

{ another code here }
// Basic, simple and fasted Exemple! May contain syntax errors or logical.
function analysis(Task: TService; maxCores: integer): TMyResultType; 
var 
  i, processors : integer;

begin
  processors := getCPUCount();

  if (maxCores < processors) and (maxCores > 0) then
    processors := maxCores;

  setlength (globalThreads, processors);

  for i := 0 to processors - 1 do
    // It is obvious that the counter modes in the original is not the same counter processors.
    if i < length(Task.modes) then begin
      globalThreads[i] := TAnalusysThread.create(true, Task.reader, Task.modes[i])
      globalThreads[i].start();
    end;

  { another code here }
end;

In DLL Thread Class:

type
{ another code here }
MyThreadAnalysis = class(TThread)
  { another code here }

  reader: TLotLoadStream;
  procedure Execute;

  { another code here }
end;

{ another code here }

implementation

{ another code here }

procedure MyThreadAnalysis.Execute;
var
  Stream: TMemoryStream;
  lot: integer;     

  {My analyzer already all written using buff, the job of rewriting it is too large, then it is so, two readings, two loads in memory, as I already mentioned in the question!}   
  buf: array[1..$F000] of byte; // 60K

begin         
  lot    := 0;  
  Stream := TMemoryStream.Create;     
  self.reader(stream, lot);

  while  (assigned(Stream)) and (Stream <> nil) and (Stream.Size > 0) then begin
    Stream.Seek(0, soBeginning);      

    { 2º loading to memory buf }        
    while (Stream.Position < Stream.Size) do begin
      n := Stream.read(buf, sizeof(buf));     

      { MY CODE HERE }
    end;

    inc(lot);
    self.reader(stream, lot, integer(Pchar(name)));     
  end;
end;

So as seen this is a stopgap solution. I still hope to find a clean solution that allows me to double the flow controller in such a way that access to data is the operating system's responsibility and not my program.

Upvotes: 0

Marco van de Voort
Marco van de Voort

Reputation: 26401

Cloning a stream is code like this:

streamdest:=TMemoryStream.create; 
streamsrc.position:=0; 
streamdest.copyfrom(streamdest); 
streamsrc.position:=0; 
streamdest.position:=0;

However doing things over DLL borders is hard, since the DLL has an own copy of libraries and library state. This is currently not recommended.

Upvotes: 1

Related Questions