himbeerHummus
himbeerHummus

Reputation: 45

Clean way to stop/terminating a thread waiting on stdin in C++

I have got a thread which is waiting for user input. If my main function terminates, I want to notify the user-input-thread to terminate, so that I can join that thread.

Is there a cleaner/better way than using interrupts?

Upvotes: 2

Views: 2279

Answers (4)

bazza
bazza

Reputation: 8444

Some general thoughts:

Actor Model

It's a good thing to block waiting for input from stdin. The same applies to the UDP socket you've hinted at.

Both stdin and a socket are streams, which you're probably breaking up into discrete messages. But the way to wait for data from file descriptors is select(), or epoll(), or similar (depending on your platform; note, Windows gives you a select that works only on sockets...).

Anyway, my point is that by heading towards use of select() or epoll(), you're heading towards an Actor Model architecture. The question really is, does one go for this whole-hog, across the entire application? I've often found that the easiest answer is "yes".

This is nice, because then your thread can select() on both stdin and, say, a pipe into which the main thread will write a "quit" message. When the thread spots that the pipe is ready to read, it reads it, sees the "quit" message, and cleanly closes down, all neat and tidy. The thread will only read stdin when select() says there's something to read.

The "everything is a file descriptor" approach taken by *nixes makes it very easy to encompass almost any conceivable data source in a select() or epoll().

Linux even makes signals available on a fd these days. It's far, far easier to read signals out of an fd and handle them synchronously in one's main loop, rather than asynchronously in a handler (that can't really do much).

Reactor vs Proactor

Actor Model architectures are Reactors. Your process / threads have a loop, with something like select() at the top of the loop, and you read input from whichever fd comes ready-to-read and process that input accordingly. (Other architectures that are Reactors includes Communicating Sequential Processes, as found in Rust and Go-lang, erlang too).

The alternative is Proactors, i.e. deciding proactively what to do in advance of any input turning up. This is firmly in the land of callbacks, futures, asyncs, etc. Your non-blocking cin answer above is proactor.

An awful lot of stuff these days is Proactor; Windows is, Boost ASIO is (because Windows is), and so on.

Try Not to Mix Proactor and Reactor

The thing is, one can get into an awful pickle trying to blend a Reactor architecture with a Proactor architecture. It's generally dreadful. So, my advise is, pick one (reactor or proactor), stick with it, and do not try to blend the two together to any significant extent.

This means thinking very carefully up front (e.g. am I having to do this on Windows, or will I ever, ever have to port it to Windows, etc).

My Preference

Generally I prefer Reactor. With Proactor one has to initiate things like socket reads in advance of knowing whether or not there is anything to read, leading to the exact problem you're trying to avoid (an input read function that is blocked and ain't ever going to unblock in certain circumstances, necessitating a dirty shutdown).

In a Reactor, the application isn't ever going call that read function until it knows there's something to read and the thread state is such that the read should happen (e.g. no "quit" command has been received).

Pro/Reactor Inequivalency

Another aspect is that it's possible to implement Proactor behaviours on top of a Reactor framework, but it's not really possible the other way round.

Good examples of this include ZMQ. ZMQ is fundamentally a Actor model architecture, a Reactor built around zmq_poll(). There are plenty of Windows bindings for ZMQ that present a Proactor-style facade on top.

ZMQ is only able to work on Windows for sockets (because Windows gives you a select() for sockets only), but will not work on Windows pipes (there's no select() for pipes). In Unix, pipes are supported by the ipc:// transport. The lack of pipes isn't too bad within a process on Windows, as the inproc:// transport does work.

Similarly, Boost ASIO is proactor simply because of the difficulty of implementing reactor on Windows whilst encompassing the full panoply of IPC like pipes, sockets and serial ports on Windows (something ZMQ chose not to do). On *nix, Boost ASIO presents a proactor front, but underneath the hood it's implemented using epoll()...

ZMQ

Speaking of ZMQ, if you are working on *nix I can strongly recommend using ZMQ as your IPC library. It does a fantastic job of making things like ipc, sockets really easy to use, and you can easily integrate things like waiting on ordinary file descriptors (like stdin) into a call to zmq_poll(). Further more a lot of other frameworks, e.g. GUI frameworks, allow you to have a fd as an input into their own event loops, and this can include a "something's happened" fd that ZMQ gives you. So it's relatively easy to incorporate ZMQ handling comms integrated into a gui app.

If you use it on Windows, it'll be impossible to integrate it with stdin in a true reactor style.

General History

When the cygwin guys came to implement select() for their library on Windows, they ran into the horror that was the impossibility of having a proper blocking select() that could wait for sockets, serial ports, stdin, pipes, etc. In the end they implemented it by having a thread per non-socket file descriptor, spinning in a loop testing the Windows device handle to see if anything had turned up. This was massively inefficient.

Boost ASIO came out proactor specifically because of the desire to have Boost work on Windows.

Windows has (apart from sockets) been unreformedly proactor since the very beginning, probably all the way down to the kernel and device drivers. However, the first version of WSL (WSL is currently quite popular) implemented a Linux system call shim; i.e. a Linux programme making a call to select() would end up making an NT kernel call to an equivalent function.

This means that pipes, at least inside WSL 1.0, would work in select(), yet it'd be an NT kernel call implementing it. Which means that some elements of reactor have made it into Windows somewhere, but hasn't been exposed at the Win32 / C,C++ level.

Being resolutely proactor makes Windows very reminiscent of ancient Unixes, which also couldn't do select().

Upvotes: 1

himbeerHummus
himbeerHummus

Reputation: 45

Accoring to this question I tried to build a non-blocking cin using std::async:

#include <iostream>
#include <future>
#include <thread>
#include <chrono>

static std::string getAnswer()
{    
    std::string answer;
    std::cout << "waiting on stdin" << std::endl;
    std::cin >> answer;
    return answer;
}

int main()
{

    std::chrono::seconds timeout(5);
    std::string answer = "default"; //default to maybe
    while(true) {
        std::cout << "new loop" << std::endl << std::flush;
        std::future<std::string> future = std::async(getAnswer);
        if (future.wait_for(timeout) == std::future_status::ready) {
            answer = future.get();
        }
        std::cout << "Input was: " << answer << std::endl;
    }

    exit(0);
}

What I get on stdout is:

new loop                                                                                                      
waiting on stdin                                                                                              
Input was: default // after 5 sec

The loop does not start again. Instead, when I enter something on the keyboard, the loop starts again. Could someone explain me this behaviour?

Upvotes: 0

Maxim Egorushkin
Maxim Egorushkin

Reputation: 136515

If my main function terminates, I want to notify the user-input-thread to terminate, so that I can join that thread.

When main function returns the C++ run-time invokes std::exit. std::exit terminates the entire process with all of its threads. std::exit can be called from any to terminate the entire process.

Upvotes: 0

sanitizedUser
sanitizedUser

Reputation: 2115

What about a task queue?

Pseudo code:

user-input-thread

void user_input_thread_function()
{
    Task* task;

    for(;;)
    {
        queue.wait_dequeue(task);

        if (!task)
        {
            // task is nullptr, a signal to stop gracefully
            break;
        }

        // main thread did not yet ended and sent a valid task
        // do something with task

        delete task;
    }

    // do necessary things before stopping
}

main thread

// queue should be visible to both threads

QueueType queue;

int main()
{
    thread user_input_thread(user_input_thread_function);

    for(;;)
    {
        queue.enqueue(new Task("data"));

        if (input exhausted)
            queue.enqueue(nullptr);
    }

    // join all threads at the end

    user_input_thread.join();

    // you might want to create similar communication with the UDP thread using another queue

    return 0;
}

For queue, you might want to use an existing solution. Here is one I recently used for 1-1 thread communication. GitHub Link It's licensed under Simplified BSD License so I think you will be fine.

Upvotes: 0

Related Questions