Hani Goc
Hani Goc

Reputation: 2441

Segmentation fault multithreading C++ 11

Introduction

I have a vector entities containing 44 million names. I want to split it into 4 parts and process each part in parallel. Class Freebase contains the function loadData() which is used to split the vector and call function multiThread in order to do the processing.


threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));

Problem

cout <<"Entity " << entities[i] << endl; is showing the following results:

The last 2 outputs are wrong. The output should be:

This is causing a segmentation fault when the input is being sent to function returnValues. How can I solve it?


Source Code

#ifndef FREEBASE_H
#define FREEBASE_H

class Freebase
{
 public:
    Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
    void loadData();
 private:
   std::string _serverURL;
   std::string _entities;
   std::string _xmlFile;
   void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
   //private data members
   std::vector<std::string> entities;
};

#endif

#include "Freebase.h"
#include "queries/SparqlQuery.h"

Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
  entities = loadEntities();
}

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     data.push_back(make_pair(entities[i], desc));
  }
}


void Freebase::loadData()
{
  vector<pair<string, string>> data;
  vector<thread> threads;
  int Size = entities.size();
  //split database into 4 parts
  int p = 4;
  int right = round((double)Size / (double)p);
  int left = Size % p;
  float totalduration = 0;
  
  vector<pair<int, int>> coordinates;
  int counter = 0;
  for(int i = 0; i < Size; i += right)
  {

      if(i < Size - right)
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
      }
      else
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
      }
      
  }//end outer for
  
   for(auto &t : threads)
   {
      t.join();
   }
   
}


vector<pair<string, string>>  Freebase::returnValues(const string & query)
{
  vector<pair<string, string>> data;
  SparqlQuery sparql(query, _serverURL);
  string result = sparql.retrieveInformations();
  istringstream str(result);
  string line;
  //skip first line
  getline(str,line);
  while(getline(str, line))
  {
    vector<string> values;
    line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
    
    boost::split(values, line, boost::is_any_of("\t"));
    if(values.size() == 2)
    {
      pair<string,string> fact = make_pair(values[0], values[1]);
      data.push_back(fact);
    }
    else
    {
      data.push_back(make_pair(line, ""));
    }
  }
  
  return data;
}//end function

Upvotes: 1

Views: 4945

Answers (1)

Timo
Timo

Reputation: 2242

EDIT: Arnon Zilca is correct in his comments. You are writing to a single vector from multiple threads (in Freebase::multiThread()), a recipe for disaster. You can use a mutex as described below to protect the push_back operation.

For more info on thread safety on containers see Is std::vector or boost::vector thread safe?.

So:

mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();

Another option is using the same strategy as you do in returnValues, creating a local vector in multiThread and only pushing the contents to the data vector when thread is done processing.

So:

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  vector<pair<string,string>> threadResults;
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     threadResults.push_back(make_pair(entities[i], desc));
  }
  mtx.lock()
  data.insert(data.end(), threadResults.begin(), threadResults.end());
  mtx.unlock()
}

Note: I would suggest using a different mutex than the one you use for the cout. The overall result vector data is a different resource than cout. So threads who want to use cout, should not have to wait for another thread to finish with data.

/EDIT

You could use a mutex around

cout <<"Entity " << entities[i] << endl;

That would prevent multiple threads using cout at "the same time". That way you can be sure that an entire message is printed by a thread before another thread gets to print a message. Note that this will impact your performance since threads will have to wait for the mutex to become available before they are allowed to print.

Note: Protecting the cout will only cleanup your output on the stream, it will not influence the behavior of the rest of the code, see above for that.

See http://www.cplusplus.com/reference/mutex/mutex/lock/ for an example.

// mutex::lock/unlock
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

std::mutex mtx;           // mutex for critical section

void print_thread_id (int id) {
  // critical section (exclusive access to std::cout signaled by locking mtx):
  mtx.lock();
  std::cout << "thread #" << id << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);

  for (auto& th : threads) th.join();

  return 0;
}

Upvotes: 3

Related Questions