Osama Ahmad
Osama Ahmad

Reputation: 2096

What wrong with this example of Active Object pattern in C++?

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <any>
#include <condition_variable>

std::vector<std::function<void(std::any)>> functions;

enum FunctionCodes {
    EXIT = -1, NOTIFY = 0, PRINT
};

class Message
{
public:
    int code;
    std::any data;
};

void perform_function(const Message& message) {
    return functions[message.code](message.data);
}

namespace X {

    class MessageQueue
    {
        friend class Thread;

        class Node
        {
        public:
            Node(MessageQueue::Node *next, MessageQueue::Node *prev, const Message& message)
                : next(next), prev(prev), message(message) {}

        private:
            Node* next;
            Node* prev;
            Message message;
            friend class MessageQueue;
        };

        Node *head, *tail;
        std::mutex mutex;
        std::condition_variable cv;

    public:

        MessageQueue(Node *head, Node *tail) : head(head), tail(tail) {}
        MessageQueue() : MessageQueue(nullptr, nullptr) {}

        void push(const Message& message)
        {
            {
                std::lock_guard<std::mutex> lock(mutex);
                auto new_node = new Node(tail, nullptr, message);
                if (!tail) {
                    tail = head = new_node;
                } else {
                    tail->prev = new_node;
                    tail = new_node;
                }
            }
            cv.notify_one();
        }

        Message pop()
        {
            std::unique_lock<std::mutex> lock(mutex);
            cv.wait(lock, [this](){ return (bool)head; });
            auto head_to_delete = head;
            auto message = head->message;
            if (head->prev) {
                head->prev->next = nullptr;
            }
            head = head->prev;
            if (!head) {
                tail = nullptr;
            }
            delete head_to_delete;
            return message;
        }
    };

    class Thread
    {
        std::thread thread;
    public:
        MessageQueue queue;
        Thread() : queue(nullptr, nullptr),
            thread([this]() {
                while (true) {
                    auto message = queue.pop();
                    if (message.code == EXIT) {
                        break;
                    }
                    perform_function(message);
                }
            }) {}
        void join() { thread.join(); }
        ~Thread() { if (thread.joinable()) thread.join(); }
    };
}

X::Thread printer;
X::Thread notifier;

void cout(const std::string& text) {
    static std::mutex cout_mutex;
    std::lock_guard<std::mutex> guard(cout_mutex);
    std::cout << text << '\n';
}

void push_notify(std::any data) {
    notifier.queue.push({NOTIFY, std::move(data)} );
}

void notify(std::any data) {
    printer.queue.push({PRINT, std::move(data)});
}

void print(std::any data) {
    cout(std::any_cast<std::string>(std::move(data)));
}

void init_functions() {
    functions.resize(2);
    functions[NOTIFY] = notify;
    functions[PRINT] = print;
}

void exit_threads() {
    printer.queue.push({EXIT, {}});
    notifier.queue.push({EXIT, {}});
}

void join_threads() {
    printer.join();
    notifier.join();
}

int main()
{
    init_functions();

    int n;
    std::cin >> n;

    while (n--) {
        // FIXME why doesn't it print correctly?..
        std::string text;
        std::cin >> text;
        std::any data = std::move(text);
        push_notify(std::move(data));
    }

    exit_threads();
    join_threads();
}

Here, I have 3 threads. The main thread, a notifier thread, and a printer thread. This is redundant but is only for the example.

Here, the main thread pushes a message into the notifier's queue telling it to notify the printer to print. The notifier then notifies the printer by pushing a message into its queue telling it to print.

Here, if I input 3 a b c (all in one line), the output is nothing. But, if I input them one after the other, it prints all of them except for the last one.

The case of printing all except for the last one is possibly because of the time delay between each input and the next one.

Why is this happening?

Upvotes: 0

Views: 184

Answers (2)

Osama Ahmad
Osama Ahmad

Reputation: 2096

A quick fix:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <any>
#include <condition_variable>

std::vector<std::function<void(std::any)>> functions;

enum FunctionCodes {
    EXIT = -1, NOTIFY = 0, PRINT
};

class Message
{
public:
    int code;
    std::any data;
};

void perform_function(const Message& message) {
    return functions[message.code](message.data);
}

namespace X {

    class MessageQueue
    {
        friend class Thread;

        class Node
        {
        public:
            Node(MessageQueue::Node *next, MessageQueue::Node *prev, const Message& message)
                : next(next), prev(prev), message(message) {}

        private:
            Node* next;
            Node* prev;
            Message message;
            friend class MessageQueue;
        };

        Node *head, *tail;
        std::mutex mutex;
        std::condition_variable cv;

    public:

        MessageQueue(Node *head, Node *tail) : head(head), tail(tail) {}
        MessageQueue() : MessageQueue(nullptr, nullptr) {}

        void push(const Message& message)
        {
            {
                std::lock_guard<std::mutex> lock(mutex);
                auto new_node = new Node(tail, nullptr, message);
                if (!tail) {
                    tail = head = new_node;
                } else {
                    tail->prev = new_node;
                    tail = new_node;
                }
            }
            cv.notify_one();
        }

        Message pop()
        {
            std::unique_lock<std::mutex> lock(mutex);
            cv.wait(lock, [this](){ return (bool)head; });
            auto head_to_delete = head;
            auto message = head->message;
            if (head->prev) {
                head->prev->next = nullptr;
            }
            head = head->prev;
            if (!head) {
                tail = nullptr;
            }
            delete head_to_delete;
            return message;
        }
    };

    class Thread
    {
        std::thread thread;
    public:
        MessageQueue queue;
        Thread() : queue(nullptr, nullptr),
            thread([this]() {
                while (true) {
                    auto message = queue.pop();
                    if (message.code == EXIT) {
                        break;
                    }
                    perform_function(message);
                }
            }) {}
        void join() { thread.join(); }
        ~Thread() { if (thread.joinable()) thread.join(); }
    };
}

X::Thread printer;
X::Thread notifier;

void cout(const std::string& text) {
    static std::mutex cout_mutex;
    std::lock_guard<std::mutex> guard(cout_mutex);
    std::cout << text << '\n';
}

void push_notify(std::any data) {
    notifier.queue.push({NOTIFY, std::move(data)} );
}

void notify(std::any data) {
    printer.queue.push({PRINT, std::move(data)});
}

void print(std::any data) {
    cout(std::any_cast<std::string>(std::move(data)));
}

void init_functions() {
    functions.resize(2);
    functions[NOTIFY] = notify;
    functions[PRINT] = print;
}

void exit_threads() {
    notifier.queue.push({EXIT, {}});
    notifier.join();
    printer.queue.push({EXIT, {}});
    printer.join();
}

int main()
{
    init_functions();

    int n;
    std::cin >> n;

    while (n--) {
        std::string text;
        std::cin >> text;
        std::any data = std::move(text);
        push_notify(std::move(data));
    }

    exit_threads();
}

The difference here is that I exit the notifier and join it first to ensure that there are no more messages to send, then exit and join the printer.

Also, an std::queue might be used:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <any>
#include <condition_variable>
#include <queue>

std::vector<std::function<void(std::any)>> functions;

enum FunctionCodes {
    EXIT = -1, NOTIFY = 0, PRINT
};

class Message
{
public:
    int code;
    std::any data;
};

void perform_function(const Message& message) {
    return functions[message.code](message.data);
}

namespace X {
    
    class Thread
    {
        std::queue<Message> message_queue;
        std::condition_variable queue_cv;
        std::mutex queue_mutex;
        std::thread thread;

        Message pop()
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            queue_cv.wait(lock, [this](){ return !message_queue.empty(); });
            Message message = message_queue.front();
            message_queue.pop();
            return message;
        }

    public:
        Thread() :
            thread([this]() {
                while (true) {
                    auto message = pop();
                    if (message.code == EXIT) {
                        break;
                    }
                    perform_function(message);
                }
            }) {}
        void join() { thread.join(); }
        ~Thread() { if (thread.joinable()) thread.join(); }

        void push(Message message)
        {
            {
                std::lock_guard<std::mutex> lock(queue_mutex);
                message_queue.push(std::move(message));
            }
            queue_cv.notify_one();
        }
    };
}

X::Thread printer;
X::Thread notifier;

void cout(const std::string& text) {
    static std::mutex cout_mutex;
    std::lock_guard<std::mutex> guard(cout_mutex);
    std::cout << text << '\n';
}

void push_notify(std::any data) {
    notifier.push({NOTIFY, std::move(data)} );
}

void notify(std::any data) {
    printer.push({PRINT, std::move(data)});
}

void print(std::any data) {
    cout(std::any_cast<std::string>(std::move(data)));
}

void init_functions() {
    functions.resize(2);
    functions[NOTIFY] = notify;
    functions[PRINT] = print;
}

void exit_threads() {
    notifier.push({EXIT, {}});
    notifier.join();
    printer.push({EXIT, {}});
    printer.join();
}

int main()
{
    init_functions();

    int n;
    std::cin >> n;

    while (n--) {
        std::string text;
        std::cin >> text;
        std::any data = std::move(text);
        push_notify(std::move(data));
    }

    exit_threads();
}

Upvotes: 0

Kevin
Kevin

Reputation: 7324

In the normal case your main thread only communicates with the notifier, which then communicates with the printer. But for exit your main thread communicates directly to both the notifier and the printer.

This means that the EXIT message can be added to the printer's queue before the notifier has sent all the previous messages to it. So given these operations:

main sends 'a' to notifier
main sends 'b' to notifier
main sends 'c' to notifier
notifier sends 'a' to printer
notifier sends 'b' to printer
main sends EXIT to notifier
main sends EXIT to printer
notifier sends 'c' to printer

the printer queue looks like a, b, EXIT, c. It exits before printing c. It's worse when you enter all the input on a single line, since the main thread is able to send the EXIT to the printer before the notifier can send anything to it at all.

The solution is to have the exit_threads function only send a message to the notifier. When the notifier sees the EXIT message it should forward it to the printer and break. That ensures that all messages sent to the notifier are sent to the printer before the EXIT:

main sends 'a' to notifier
main sends 'b' to notifier
main sends 'c' to notifier
notifier sends 'a' to printer
notifier sends 'b' to printer
main sends EXIT to notifier
notifier sends 'c' to printer
notifier sends EXIT to printer

Upvotes: 2

Related Questions