Reputation: 2441
I have a vector entities
containing 44 million names. I want to split it into 4 parts and process each part in parallel. Class Freebase
contains the function loadData()
which is used to split the vector and call function multiThread
in order to do the processing.
loadEntities()
reads a text file containing the names. I didn't put the implementation in the class because it's not importantloadData()
splits the vector entities
that was initialized in the constructor into 4 parts and adds every part the vector<thread> threads
as follows:threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
i
and i+right
are the indices used in the for loop of multithread to loop through entitiesreturnValues
is a subfunction of multiThread
and is used to call an external function.cout <<"Entity " << entities[i] << endl;
is showing the following results:
The last 2 outputs are wrong. The output should be:
Entity name
not entity entity name
nor entity name name
This is causing a segmentation fault when the input is being sent to function returnValues
. How can I solve it?
#ifndef FREEBASE_H
#define FREEBASE_H
class Freebase
{
public:
Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
void loadData();
private:
std::string _serverURL;
std::string _entities;
std::string _xmlFile;
void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
//private data members
std::vector<std::string> entities;
};
#endif
#include "Freebase.h"
#include "queries/SparqlQuery.h"
Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
entities = loadEntities();
}
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
data.push_back(make_pair(entities[i], desc));
}
}
void Freebase::loadData()
{
vector<pair<string, string>> data;
vector<thread> threads;
int Size = entities.size();
//split database into 4 parts
int p = 4;
int right = round((double)Size / (double)p);
int left = Size % p;
float totalduration = 0;
vector<pair<int, int>> coordinates;
int counter = 0;
for(int i = 0; i < Size; i += right)
{
if(i < Size - right)
{
threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
}
else
{
threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
}
}//end outer for
for(auto &t : threads)
{
t.join();
}
}
vector<pair<string, string>> Freebase::returnValues(const string & query)
{
vector<pair<string, string>> data;
SparqlQuery sparql(query, _serverURL);
string result = sparql.retrieveInformations();
istringstream str(result);
string line;
//skip first line
getline(str,line);
while(getline(str, line))
{
vector<string> values;
line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
boost::split(values, line, boost::is_any_of("\t"));
if(values.size() == 2)
{
pair<string,string> fact = make_pair(values[0], values[1]);
data.push_back(fact);
}
else
{
data.push_back(make_pair(line, ""));
}
}
return data;
}//end function
Upvotes: 1
Views: 4945
Reputation: 2242
EDIT:
Arnon Zilca is correct in his comments. You are writing to a single vector from multiple threads (in Freebase::multiThread()
), a recipe for disaster. You can use a mutex as described below to protect the push_back operation.
For more info on thread safety on containers see Is std::vector or boost::vector thread safe?.
So:
mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();
Another option is using the same strategy as you do in returnValues, creating a local vector in multiThread and only pushing the contents to the data vector when thread is done processing.
So:
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
vector<pair<string,string>> threadResults;
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
threadResults.push_back(make_pair(entities[i], desc));
}
mtx.lock()
data.insert(data.end(), threadResults.begin(), threadResults.end());
mtx.unlock()
}
Note: I would suggest using a different mutex than the one you use for the cout. The overall result vector data
is a different resource than cout
. So threads who want to use cout
, should not have to wait for another thread to finish with data
.
/EDIT
You could use a mutex around
cout <<"Entity " << entities[i] << endl;
That would prevent multiple threads using cout at "the same time". That way you can be sure that an entire message is printed by a thread before another thread gets to print a message. Note that this will impact your performance since threads will have to wait for the mutex to become available before they are allowed to print.
Note: Protecting the cout will only cleanup your output on the stream, it will not influence the behavior of the rest of the code, see above for that.
See http://www.cplusplus.com/reference/mutex/mutex/lock/ for an example.
// mutex::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_thread_id (int id) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
std::cout << "thread #" << id << '\n';
mtx.unlock();
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
Upvotes: 3