Reputation: 855
I want to develop a Filter Pipeline for my Application. The Pipeline should consist of any number of filters.
For the Filters i declare an abstract base class like this:
struct AbstractFilter {
virtual void execute(const std::string& message) = 0;
virtual ~AbstractFilter() = default;
}
Each Filter should inherit from this base class and implement the execute
Method.
Like so:
struct PrintMessage : public AbstractFilter {
void execute(const std::string& message) override {
std::cout << "Filter A " << message << '\n';
//hand over message to next Filter
}
}
struct Upper : public AbstractFilter {
void execute(const std::string& message) override {
std::string new_line;
for (char c : line)
new_line.push_back(std::toupper(c));
//hand over message to next Filter
}
}
struct WriteToFile : public AbstractFilter {
void execute(const std::string& message) override {
std::ofstream of{"test.txt"};
of << message;
of.close();
}
}
EDIT 1:
The Message should be send from one filter to the next in the Pipeline. If the pipeline for example is like this:
Upper -- PrintMessage -- WriteToFile
The Message should pass all the 3 Filters. (For example if Upper
finished his work the message should be send to PrintMessage
and so on)
In the example above if the Message Hello World
is send to the Pipeline the output should be:
Console:
HELLO WORLD
test.txt:
HELLO WORLD
EDIT 2:
The Filter only changes the content of the given Message. The Type is not changed. Every Filter should work with for example strings or a given class. The Message is only forwarded to one recipient.
My Question is now how to connect these Filters?
My First guess was to use Queues
. So every Filter gets an Input
and Output
Queue. For this i think every filter should run inside it's own Thread
and be notified if data is added to his Input
Queue. (The Output
Queue of for example FilterA is also the Input
Queue of FilterB)
My Second Guess was to use the Chain Of Responsibility Pattern and boost::signals2
So FilterB for example connects to the Signal of FilterA. FilterA calls these Filter when it finished it's work.
Which of the two solutions is the more flexible? Or is there even a better way to connect the Filters?
An additional Question is it also possible to run the whole Pipeline inside a Thread so that i can start multiple Pipelines? (In the Example have 3 of the FilterA-FilterB-FilterD Pipeline up and running?)
Upvotes: 0
Views: 1073
Reputation:
I believe the Chain of Responsibility pattern is simpler, allows for cleaner code and greater flexibility.
You do not need third-party libraries to implement it.
What you call filters are actually handlers. All handlers implement a common interface, defining a single method that could be named handle()
and could even take an object as parameter to share state. Each handler stores a pointer to the next handler. It may or may not call that method on it; in the latter case processing is halted, and it acts as a filter.
Running pipeline stages in parallel is more involved if some of them require the output of others as input. For different pipelines to run in parallel, each one would run on its own thread, and you could use a queue to pass inputs to it.
Upvotes: 2
Reputation: 409
I would procede in this way: create a List with all the implemented versions of the Abstract Filter. So, following your exmample, after reading the input file I will get a list with:
[0]:Upper
[1]:PrintMessage
[2]:WriteToFile
Then a single thread (or a thread poll if you need to process many string at time) waiting a string in an input queue. When a new string appears in the pool, the thread loops on the filter list and at the end posts the result in an output queue.
If you want to run it in parallel, you need to find a way to keep the order of the input strings anche nelle stringhe di output.
Upvotes: 1
Reputation: 556
I think AbstractFilter is not necessary and I'd suggest to use std::tuple to define a pipeline:
std::tuple<FilterA, FilterB> pipeline1;
std::tuple<FilterA, FilterB, FilterC ... > pipeline2;
To run a message through a pipeline do (using c++17):
template<typename Pipeline>
void run_in_pipeline(const std::string& message, Pipeline& pipeline){
std::apply([&message](auto&& ... filter) {
(filter.execute(message), ...);
}, pipeline);
}
If you care about performance and filters must be executed sequentially, I wouldn't suggest using multithreading or signal-slot patterns on a single pipeline. Consider instead run different pipelines on different threads if you are dealing with multithreading applications
Upvotes: 1