Waxo
Waxo

Reputation: 1976

Boost shared memory : vector of char *

I try to save an char* inside boost container but it fails. My char* is binary data on 2048 memory blocks. These binary data are a sound recorded with ALSA.
But when i save it in a vector of shared memory strings, it mutate somehow and i can't figure out how to fix it.

Edit and probably answer:
ALSA send void* buffers, so if i can create a shared memory vector of void* i may do the trick. So i need basically to create a vector of void* and each void* must be size fixed (in this case : 2048). I think that the boost::interprocess::basic_string is the problem
End Edit

Here the full explanation :

I'm trying to listen from direct sound input with ALSA with a program and then use another program to write it to a file (or process anything on it)

I started with this question : Create a shared-memory vector of strings

Now i'm stuck, i don't know boost very well. I've created a github (https://github.com/Waxo/nodetest) with the full project. Alsa control with the method listen with callback just call a method with a (char* , int) prototype.

After building the project you can launch the ./nodetest and ./nodetest arg when ./nodetest has said "Go"

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>

#include <iostream>
#include <alsa_control.h>

using std::cout;
using std::endl;

typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;


class lambda_class {
public:
    void lambda_callback(char *c, int rc) {
      this->sample_count_ += rc;
      this->output_file_.write(c, rc * 2);
    }

    lambda_class(std::string filename) {
      this->filename_ = filename;
      this->sample_count_ = 0;
      this->output_file_.open(this->filename_, std::ios::binary);
      write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
    }

    ~lambda_class() {
      this->output_file_.close();
      this->output_file_.open(this->filename_,
                              std::ios::binary | std::ios::in);
      write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
    }

private:
    std::string filename_;
    int sample_count_;
    std::ofstream output_file_;
    lambda_class(const lambda_class &a) = delete;
};

class input_class {
public:
    input_class() {
      boost::interprocess::shared_memory_object::remove("MySharedMemory");
      this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
                                                                 1000000);
      CharAllocator charallocator(this->shm->get_segment_manager());
      StringAllocator stringallocator(this->shm->get_segment_manager());
      this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
    };

    ~input_class() {
      lambda_class *lc = new lambda_class("listener_vector.wav");
      char *c = (char *) malloc(2048);
      for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
        strcpy(c, it->c_str());
        lc->lambda_callback(c, 2048);
      }
      delete lc;
      boost::interprocess::shared_memory_object::remove("MySharedMemory");
      this->shm->destroy_ptr(this->myshmvector);
    }

    void to_node(char *c, int rc) {
      CharAllocator charallocator(this->shm->get_segment_manager());
      StringAllocator stringallocator(this->shm->get_segment_manager());
      MyShmString mystring(charallocator);
      mystring = c;
      this->myshmvector->insert(this->myshmvector->begin(), mystring);
    }

private:
    boost::interprocess::managed_shared_memory *shm;
    MyShmStringVector *myshmvector;
};

void listener() {
  lambda_class *ctc = new lambda_class("writer.wav");
  char *c = (char *) malloc(2048);
  boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
  MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;


  for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
    strcpy(c, std::string(it->begin(), it->end()).c_str());
    ctc->lambda_callback(c, 2048);
  }

  delete ctc;
  return;
}

int main(int argc, char **argv) {
  alsa_control *ac = new alsa_control(16000, 2048, 16, MONO);

  if (argc == 1) {

    input_class *ic = new input_class();
    ac->listen_with_callback(std::bind(&input_class::to_node, ic, std::placeholders::_1, std::placeholders::_2),
                             "listener");
    sleep(5);
    ac->stop();
    cout << "Go" << endl;
    sleep(10);

    delete ic;
    delete ac;
  } else {
    auto th = std::async(std::launch::async, listener);
    th.get();
  }


  return 0;
}

I'm just trying to use my sounds in multiple process, with a structure where i can use and share it (i will create and organiser for all my programs). The char* can have a fixed size, if i can use it, it's all good.

Edit :
My problem is that the sounds recorded :

Are invalid, i think that the MyShmString the mutate them to invalid binary data.

Upvotes: 0

Views: 1342

Answers (4)

sehe
sehe

Reputation: 393134

In addition to what everyone else has been saying,

  • you had an error in the samples -> byte count conversion. This lead to half of the output being uninitialized data

    void lambda_callback(char *c, int rc) {
        this->sample_count_ += rc;
        this->output_file_.write(c, rc * 2);
    }
    

    becomes

    void lambda_callback(char *c, int rc) {
        this->sample_count_ += rc/2;
        this->output_file_.write(c, rc);
    }
    

    NOTE see also rc*2 in to_node below

  • you inserted the strings into the vector at the front - this leads to chunks of audio being output in reverse order. Also, the assignment to mystring disregarded the rc information

    void to_node(char *c, int rc) {
        CharAllocator charallocator(this->shm->get_segment_manager());
        StringAllocator stringallocator(this->shm->get_segment_manager());
        MyShmString mystring(charallocator);
        mystring = c; // OOPS! this cuts off at any NUL char
        this->myshmvector->insert(this->myshmvector->begin(), mystring);
    }
    

    becomes

    void to_node(char *c, int rc) {
        _myshmvector->emplace_back(c, rc*2, shm->get_segment_manager());
    }
    
  • instead of relying on the fixed buffer size, you should actually use the string size when iterating the shared vector. Besides don't use strcpy for binary data:

    void listener() {
        lambda_class ctc("writer.wav");
    
        char *c = (char *) malloc(2048);
    
        boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
        MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
    
    
        for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
            //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
            strcpy(c, it->c_str());
            ctc.lambda_callback(c, 2048);
        }
    }
    

    becomes:

    void listener() {
        lambda_class ctc("writer.wav");
    
        char c[4096];
    
        boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
        MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
    
        for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
            //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
            assert(it->size()<=sizeof(c));
            memcpy(c, it->c_str(), it->size());
            ctc.lambda_callback(c, it->size());
        }
    }
    
  • Don't malloc in c++. You had those leaked

  • your test scenario is racy (but I assume you knew that and this was for simplicity?)

  • Instead of "magic" file re-open, just seekp:

    this->output_file_.close();
    this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
    

    becomes

    output_file_.seekp(0ul);
    

Pull Request 1

Pull Request on GitHub

commit 660208ff3fe792112ccae70a61e8a2442a853664
Author: Seth Heeren <[email protected]>
Date:   Mon Aug 17 00:13:31 2015 +0200

    Fixes, now

        ./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav

    results in proper identical files, e.g.

        $ ./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav
        [1] 12350
        Go
        ./nodetest
        ff083a6344d7037da9c4f6d730239f30  listener_vector.wav
        ff083a6344d7037da9c4f6d730239f30  listener.wav
        ff083a6344d7037da9c4f6d730239f30  writer.wav

commit 04a5dec3da322dab196bfe568f52db6d9ed68b43
Author: Seth Heeren <[email protected]>
Date:   Sun Aug 16 23:30:40 2015 +0200

    we have repro

main.cc

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>

#include <iostream>
#include <alsa_control.h>

using std::cout;
using std::endl;

typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;


class lambda_class {
public:
    void lambda_callback(char *c, int rc) {
        this->sample_count_ += rc/2;
        this->output_file_.write(c, rc);
    }

    lambda_class(std::string filename) {
      this->filename_ = filename;
      this->sample_count_ = 0;
      this->output_file_.open(this->filename_, std::ios::binary);
      write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
    }

    ~lambda_class() {
      /*
       *this->output_file_.close();
       *this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
       */
      output_file_.seekp(0ul);
      write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
    }

private:
    std::string filename_;
    int sample_count_;
    std::ofstream output_file_;
    lambda_class(const lambda_class &a) = delete;
};

class input_class {
public:
    input_class() {
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
                1000000);
        CharAllocator charallocator(this->shm->get_segment_manager());
        StringAllocator stringallocator(this->shm->get_segment_manager());
        this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
    };

    ~input_class() {
        lambda_class lc("listener_vector.wav");
        char c[4096];
        for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
            assert(it->size()<=sizeof(c));
            memcpy(c, it->c_str(), it->size());
            lc.lambda_callback(c, it->size());
        }
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        shm->destroy_ptr(this->myshmvector);
    }

    void to_node(char *c, int rc) {
        this->myshmvector->emplace_back(c, rc*2, shm->get_segment_manager());
    }

private:
    boost::interprocess::managed_shared_memory *shm;
    MyShmStringVector *myshmvector;
};

void listener() {
    lambda_class ctc("writer.wav");

    char c[4096];

    boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
    MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;

    for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
        //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
        assert(it->size()<=sizeof(c));
        memcpy(c, it->c_str(), it->size());
        ctc.lambda_callback(c, it->size());
    }
}

int main(int argc, char **argv) {
    alsa_control ac(16000, 2048, 16, MONO);

    if (argc == 1) {
        input_class ic;
        ac.listen_with_callback(std::bind(&input_class::to_node, &ic, std::placeholders::_1, std::placeholders::_2), "listener");
        sleep(5);
        ac.stop();
        cout << "Go" << endl;
        sleep(10);
    } else {
        //    std::atomic<bool> done(false);
        auto th = std::async(std::launch::async, listener);
        //    done.store(true, std::memory_order_relaxed);
        th.get();
    }
    return 0;
}

Upvotes: 1

doqtor
doqtor

Reputation: 8484

I've modified your sample code to use interprocess vector of int8_t which can be seen as one of the ways to store the binary data properly.

Certainly the output file was wrong because of this line: this->output_file_.write(c, rc * 2); If you want to write twice as much of data you need to call write twice:

this->output_file_.write(c, rc);
this->output_file_.write(c, rc);

Passing rc * 2 makes it to write garbage data after first 2048 bytes.

Another potential issue is this line: this->myshmvector->insert(this->myshmvector->begin(), mystring); Here you store your data in reversed order. Was that your intention? In my fixed example I changed it to store data in order as it comes.

The 3rd potential issue could be zeroes in the binary data as mentioned by @sashoalm.

Here is the fixed part of the code I made changes:

typedef boost::interprocess::allocator<int8_t, boost::interprocess::managed_shared_memory::segment_manager>  ShmemAllocator;
typedef boost::interprocess::vector<int8_t, ShmemAllocator> MyVector;


class lambda_class {
public:
    void lambda_callback(int8_t *c, int rc) {
        this->sample_count_ += rc;
        //this->output_file_.write(c, rc * 2); // this is not going to work, to write twice as much data you need to call write() twice
        this->output_file_.write(reinterpret_cast<char*>(c), rc);
        //this->output_file_.write(reinterpret_cast<char*>(c), rc); // uncomment this line to write twice as much data
    }

    lambda_class(std::string filename) {
        this->filename_ = filename;
        this->sample_count_ = 0;
        this->output_file_.open(this->filename_, std::ios::binary);
        write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
    }

    ~lambda_class() {
        this->output_file_.close();
        this->output_file_.open(this->filename_,
            std::ios::binary | std::ios::in);
        write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
    }

private:
    std::string filename_;
    int sample_count_;
    std::ofstream output_file_;
    lambda_class(const lambda_class &a) = delete;
};

class input_class {
public:
    input_class() {
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
            1000000);
        const ShmemAllocator alloc_inst(this->shm->get_segment_manager());
        this->myshmvector = shm->construct<MyVector>("myshmvector")(alloc_inst);
    };

    ~input_class() {
        lambda_class *lc = new lambda_class("listener_vector.wav");
        int8_t *c = (int8_t *)malloc(2048);
        for (MyVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it+=2048) {
            memcpy(c, &*it, 2048);
            lc->lambda_callback(c, 2048);
        }
        delete lc;
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        this->shm->destroy_ptr(this->myshmvector);
    }

    void to_node(int8_t *c, int rc) {
        //this->myshmvector->insert(this->myshmvector->begin(), c, c+2048); // if your intention was to reverse the data then uncomment this line and comment out the line below
        this->myshmvector->insert(this->myshmvector->end(), c, c + 2048); // storing data in order as it comes
    }

private:
    boost::interprocess::managed_shared_memory *shm;
    MyVector *myshmvector;
};

void listener() {
    lambda_class *ctc = new lambda_class("writer.wav");
    int8_t *c = (int8_t *)malloc(2048);
    boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
    MyVector *myvector = segment.find<MyVector>("myshmvector").first;


    for (MyVector::iterator it = myvector->begin(); it != myvector->end(); it+=2048) {
        memcpy(c, &*it, 2048);
        ctc->lambda_callback(c, 2048);
    }

    delete ctc;
    return;
}

Upvotes: 1

sashoalm
sashoalm

Reputation: 79527

I haven't examined your wall of code, but in your question I noticed you say:

I try to save an char* inside boost container

and then

My char* is binary data

and then

I think that the boost::interprocess::basic_string si the problem

Now, it would have helped if you had specified which is the exact type instead of saying just "boost container" - which container, but if you're trying to store binary data in a string class - NOOOOOOO, that is not how you use a string. You store text in strings, not binary data. For binary data use std::vector<char>.

In particular, if you use something resembling this anywhere in your code

string s = (char*) binary_data;

The string will be truncated at the first encountered zero!

Upvotes: 1

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2934

The question is not clear, so i would advice to improve question.

Nevertheless, i see an error in your code: You allocate array with 2048 elements

char *c = (char *) malloc(2048);

and then you write twice as much in lambda_callback()

 void lambda_callback(char *c, int rc) {
   this->sample_count_ += rc;
   this->output_file_.write(c, rc * 2);
 }

you need to pass rc instead of rc * 2 to write();

Upvotes: -1

Related Questions