Reputation: 89
I am new to multithreading and I have used the thread pool mechanism to synchronize interactions amongst two classes.
While compiling i am unable to understand the error.
The classes are as below:
InstrumentProcessor.h
1 #ifndef INSTRUMENTPROCESSOR_H
2 #define INSTRUMENTPROCESSOR_H
3
4 #include <boost/thread/thread.hpp>
5 #include <boost/thread/mutex.hpp>
6 #include <boost/algorithm/string.hpp>
7 #include <boost/lexical_cast.hpp>
8 #include "Semaphore.h"
9 #include "Instrument.h"
10 #include <queue>
11 #include <string>
12 #include <vector>
13
14
15 class InstrumentProcessor
16 {
17 public:
18 InstrumentProcessor(unsigned int const numThreads);
19 ~InstrumentProcessor();
20 bool Start();
21 bool Stop();
22 void AddLine(const std::string& LineRead); // Producer
23
24 private:
25 void Process(); // Worker Function
26 void ProcessLine(std::string& line); // Consumer
27
28 private:
29 unsigned int const numThreads_;
30 boost::mutex ProcessorMutex_;
31 boost::thread_group ProcessorGroup_;
32 Semaphore taskCount_;
33 std::queue<std::string> Line_;
34 bool stop_;
35 std::vector<std::string> elements_;
36 std::vector<Instrument> Instruments_;
37 };
38
39 #endif /* INSTRUMENTPROCESSOR_H */
InstrumentProcessor.cpp
1 #include "InstrumentProcessor.h"
2
3 InstrumentProcessor::InstrumentProcessor(unsigned int const numThreads)
4 : numThreads_(numThreads), stop_(false)
5 {
6 Instruments_.reserve(25);
7 }
8
9 InstrumentProcessor::~InstrumentProcessor()
10 {
11
12 }
13
14 bool InstrumentProcessor::Start()
15 {
16 std::cout << "\nParallel Processing Started ...." << std::endl;
17 for(unsigned int i=0; i<numThreads_; i++)
18 {
19 ProcessorGroup_.create_thread(boost::bind(&InstrumentProcessor ::Process, this));
20 }
21
22 return true;
23 }
24
25 bool InstrumentProcessor::Stop()
26 {
27 stop_ = true;
28 ProcessorGroup_.join_all();
29 std::cout << "\n Parallel Processing Stopped ...." << std::endl;
30 return stop_;
31 }
32
33
34 void InstrumentProcessor::Process()
35 {
36 while(!stop_)
37 {
38 --taskCount_;
39 std::string currentLine;
40 {
41 boost::mutex::scoped_lock lock(ProcessorMutex_);
42 currentLine = Line_.front();
43 Line_.pop();
44 }
45 ProcessLine(currentLine);
46 }
47 }
48
49 void InstrumentProcessor::AddLine(const std::string& LineRead)
50 {
51 boost::mutex::scoped_lock lock(ProcessorMutex_);
52 Line_.push(LineRead);
53 ++taskCount_;
54 }
55
56 void InstrumentProcessor::ProcessLine(std::string& line)
57 {
58 boost::algorithm::split(elements_, line, boost::is_any_of("\t "), boos t::token_compress_on);
59 for(unsigned int i=1; i<elements_.size(); i++)
60 {
61 if (elements_[i].compare("nan") == 0)
62 continue;
63 else
64 {
65 double data = boost::lexical_cast<double>(elements_[i] );
66 Instruments_[i-1].CleanData(data); // This function makes a call to the Insrument Class member function
67 }
68 }
69 }
Instrument.h
1 #ifndef INSTRUMENT_H
2 #define INSTRUMENT_H
3
4 #include <iostream>
5 #include <boost/thread/mutex.hpp>
6 #include <cmath>
7
8 class Instrument
9 {
10 public:
11 Instrument();
12 ~Instrument();
13 bool PopulateData(double& data);
14 bool CleanData(double& data);
15 bool PrintData();
16
17 private:
18 bool getMeanAndStandardDeviation();
19
20 private:
21 double data_;
22 int N_;
23 double Mean_;
24 double M2_;
25 double stdDev_;
26 double InitialValue_;
27 bool Init_;
28 boost::mutex InstrumentMutex_;
29 };
30
31 #endif /* INSTRUMENT_H */
Instrument.cpp
1 #include "Instrument.h"
2
3 Instrument::Instrument()
4 : Init_(false), data_(0.0), Mean_(0.0), stdDev_(0.0), InitialValue_(0.0), N_(0 ), M2_(0.0)
5 {
6
7 }
8
9 Instrument::~Instrument()
10 {
11
12 }
13
14 bool Instrument::PopulateData(double& data)
15 {
16 data_ = data;
17 if(!Init_)
18 {
19 InitialValue_ = data_;
20 Init_ = true;
21 std::cout << "The initial value is: " << InitialValue_ << std: :endl;
22 }
23 return true;
24 }
25
26 /* Cleaning Part 2:
27 * Each data point will be represented as the % change from the initial value.
28 * This will then be added by 101 so as to get uniform positive value with the
29 * origin of the data shifted to init_origin + 101
30 */
31
32 bool Instrument::CleanData(double& data)
33 {
34
35 std::cout << "\nData begin inserted: " << data << std::endl;
36 boost::mutex::scoped_lock lock(InstrumentMutex_);
37 PopulateData(data);
38 data_ = ((data_-InitialValue_)/InitialValue_)*100;
39 data_ += 101;
40 getMeanAndStandardDeviation();
41 return true;
42 }
43
44 // Welford recurrence relation for tick based mean and standard deviation calc ulation
45 bool Instrument::getMeanAndStandardDeviation()
46 {
47 ++N_;
48 double delta = data_ - Mean_;
49 Mean_ += delta/N_;
50 M2_ += delta*(data_ - Mean_);
51
52 if(N_ >= 2)
53 {
54 double variance = M2_/(N_-1);
55 stdDev_ = std::sqrt(variance);
56 }
57 return true;
58 }
59
60 bool Instrument::PrintData()
61 {
62 std::cout << "\nMean: " << Mean_ << "Std Dev: " << stdDev_ << std::end l;
63 return true;
64 }
I am compiling the above with the following compilation line:
g++ -std=c++11 -I /usr/lib/x86_64-linux-gnu -lboost_thread -c InstrumentProcessor.cpp
which yields in the below error.
In file included from /usr/include/c++/4.8/memory:64:0,
from /usr/include/boost/config/no_tr1/memory.hpp:21,
from /usr/include/boost/smart_ptr/shared_ptr.hpp:27,
from /usr/include/boost/shared_ptr.hpp:17,
from /usr/include/boost/date_time/time_clock.hpp:17,
from /usr/include/boost/thread/thread_time.hpp:9,
from /usr/include/boost/thread/lock_types.hpp:18,
from /usr/include/boost/thread/pthread/thread_data.hpp:12,
from /usr/include/boost/thread/thread_only.hpp:17,
from /usr/include/boost/thread/thread.hpp:12,
from InstrumentProcessor.h:4,
from InstrumentProcessor.cpp:1:
/usr/include/c++/4.8/bits/stl_construct.h: In instantiation of ‘void std::_Construct(_T1*, _Args&& ...) [with _T1 = Instrument; _Args = {Instrument}]’:
/usr/include/c++/4.8/bits/stl_uninitialized.h:75:53: required from ‘static _ForwardIterator std::__uninitialized_copy<_TrivialValueTypes>::__uninit_copy(_InputIterator, _InputIterator, _ForwardIterator) [with _InputIterator = std::move_iterator<Instrument*>; _ForwardIterator = Instrument*; bool _TrivialValueTypes = false]’
/usr/include/c++/4.8/bits/stl_uninitialized.h:117:41: required from ‘_ForwardIterator std::uninitialized_copy(_InputIterator, _InputIterator, _ForwardIterator) [with _InputIterator = std::move_iterator<Instrument*>; _ForwardIterator = Instrument*]’
/usr/include/c++/4.8/bits/stl_uninitialized.h:258:63: required from ‘_ForwardIterator std::__uninitialized_copy_a(_InputIterator, _InputIterator, _ForwardIterator, std::allocator<_Tp>&) [with _InputIterator = std::move_iterator<Instrument*>; _ForwardIterator = Instrument*; _Tp = Instrument]’
/usr/include/c++/4.8/bits/stl_vector.h:1142:29: required from ‘std::vector<_Tp, _Alloc>::pointer std::vector<_Tp, _Alloc>::_M_allocate_and_copy(std::vector<_Tp, _Alloc>::size_type, _ForwardIterator, _ForwardIterator) [with _ForwardIterator = std::move_iterator<Instrument*>; _Tp = Instrument; _Alloc = std::allocator<Instrument>; std::vector<_Tp, _Alloc>::pointer = Instrument*; std::vector<_Tp, _Alloc>::size_type = long unsigned int]’
/usr/include/c++/4.8/bits/vector.tcc:75:70: required from ‘void std::vector<_Tp, _Alloc>::reserve(std::vector<_Tp, _Alloc>::size_type) [with _Tp = Instrument; _Alloc = std::allocator<Instrument>; std::vector<_Tp, _Alloc>::size_type = long unsigned int]’
InstrumentProcessor.cpp:6:25: required from here
/usr/include/c++/4.8/bits/stl_construct.h:75:7: error: use of deleted function ‘Instrument::Instrument(const Instrument&)’
{ ::new(static_cast<void*>(__p)) _T1(std::forward<_Args>(__args)...); }
^
In file included from InstrumentProcessor.h:9:0,
from InstrumentProcessor.cpp:1:
Instrument.h:8:7: note: ‘Instrument::Instrument(const Instrument&)’ is implicitly deleted because the default definition would be ill-formed:
class Instrument
^
Instrument.h:8:7: error: use of deleted function ‘boost::mutex::mutex(const boost::mutex&)’
In file included from /usr/include/boost/thread/lock_guard.hpp:11:0,
from /usr/include/boost/thread/pthread/thread_data.hpp:11,
from /usr/include/boost/thread/thread_only.hpp:17,
from /usr/include/boost/thread/thread.hpp:12,
from InstrumentProcessor.h:4,
from InstrumentProcessor.cpp:1:
/usr/include/boost/thread/pthread/mutex.hpp:96:9: error: declared here
BOOST_THREAD_NO_COPYABLE(mutex)
^
Any help on this will be appreciated.
Upvotes: 0
Views: 572
Reputation: 66118
Implementation of method vector::reserve()
requires either move-constructor or copy-constructor from Instrument
class.
But both of these constructors may not generated for the class automatically, because its member InstrumentMutex_
is neither copiable nor movable (mutexes can neither be copied nor moved).
You need to declare move-constructor for Instrument
class manually for possibility to use it in vector operations. Becauase of mutex member, this won't be true move-constructor (mutex cannot be moved), but simple initializing of mutex is sufficient for given usage.
Alternatively, instead of calling .reserve() method after creating the vector _Instruments
, you may initialize this vector with precise size:
InstrumentProcessor::InstrumentProcessor(unsigned int const numThreads)
: numThreads_(numThreads), stop_(false), _Instruments(25)
{
}
This initialization requires only default constructor from Instrument
class, which is already declared.
Sized vector constructor differs from .reserve()
in that all vector's elements are created (as opposite to allocated), which may be not suitable for your case. But if creation of all elements is OK, then this initialization should be preffered for previous one, because it avoids definition of move-constructor without well-defined semantic (mutex member are not moved but recreated).
Upvotes: 2