Synchronizing output to std::cout

I have an application wherein several threads write to std::cout and I was looking for a simple solution to prevent data being sent to std::cout from being garbled up.

For example, if I have 2 threads and both output:

std::cout << "Hello" << ' ' << "from" << ' ' << "thread" << ' ' << n << '\n';

I might see something like:

HelloHello from  fromthread  2
thread 1

What I would like to see is:

Hello from thread 2
Hello from thread 1

The order in which the lines are displayed is not very important, as long as they don't get intermixed.

I came up with the following fairly simple implementation:

class syncstream : public std::ostringstream {
public:
    using std::ostringstream::ostringstream;

    syncstream& operator<<(std::ostream&  (*pf)(std::ostream&) ) { pf(*this); return *this; }
    syncstream& operator<<(std::ios&      (*pf)(std::ios&)     ) { pf(*this); return *this; }
    syncstream& operator<<(std::ios_base& (*pf)(std::ios_base&)) { pf(*this); return *this; }

    template<typename T>
    syncstream& operator<<(T&& token) {
        static_cast<std::ostringstream&>(*this) << std::forward<T>(token);
        return *this;
    }
};

inline std::ostream& operator&&(std::ostream& s, const syncstream& g) { return s << g.str(); }
#define synced(stream) stream && syncstream()

Sorry about the macro.

So, now in my threads I can do:

synced(std::cout) << "Hello"  << ' ' << "from" << ' ' << "thread" << ' ' << n << '\n';

I wrote the above because of my initial misunderstanding of §27.4.1. But, surprisingly it works very well.

I wrote the following test case:

void proc(int n) {
    synced(std::cout) << "Hello" << ' ' << "world" << ' ' << "from" << ' ' << "thread" << ' ' << n << '\n';
}

int main() {
    std::vector<std::thread> threads;
    for(int n = 0; n < 1000; ++n) threads.push_back(std::thread(std::bind(proc, n)));

    for(std::thread& thread: threads) thread.join();
    return 0;
}

(full version here) and ran it with both g++ 4.8.3 and clang++ 3.5.1 (with libstdc++ and libc++) on my system.

Testing was done with a script, which runs the test case 1000 times generating 1 million output lines and then parses the output for any garbled lines.

I cannot make it not work (ie, produce garbled lines).

So my question is:

Why does the above implementation work?

Upvotes: 2

Views: 1221

Answers (2)

Walter
Walter

Reputation: 45484

This appears thread-safe in the sense of not producing garbled lines, provided each output ends with a new line. However, it does change the nature of the stream output, in particular with respect to flushing.

1 synced(std::cerr) will be buffered (into your syncstream), while std::cerr is never buffered.

2 there is no guarantee that

synced(std::cout) << "a=" << 128 << std::endl;

actually flushes the buffer of std::cout, since all std::cout gets is the string "a=128\n".

A stronger interpretation of thread-safe would be that the order of output reflects the order, if any, of output calls. That is if

synced(std::cout) << "a=" << 128 << std::endl;

on thread A is guaranteed (by means of locks for example) to preceed the same call on thread B, then the output of A should always preceed that of B. I don't think that your code achieves that.

Upvotes: 2

James Kanze
James Kanze

Reputation: 154047

With regards to thread safety: it's thread safe in the sense that it won't cause a data race. But only as long as the target is one of the standard stream objects (std::cout, etc.), and only as long as they remain synched with stdio. That's all the standard guarantees. And even then, you can still end up with interleaved characters.

I've had to deal with this problem a lot in the past. My solution has always been a wrapper class, with a pointer to the actual std::ostream, and a template:

template <typename T>
SynchedOutput& operator<<( T const& obj )
{
    if ( myStream != nullptr ) {
        (*myStream) << obj;
    }
    return *this;
}

The constructor of SynchedOutput then acquires a mutex lock, and the destructor frees it, so you can write:

SynchedOutput( stream, mutex ) << ...;

(In my case, I was returning the temporary from a function, and was doing so before C++11 and its move semantics, so my code was a bit more complicated; I had to support copy, and keep track of the count of the copies, so that I could unlock when the last one was destructed. Today, just implement move semantics, and no copy, if you want to return the instance from a function.))

The issue here is ensuring that everyone is using the same mutex. One possibility might be to have the constructor look up the mutex in an std::map indexed on the address of the stream object. This lookup requires a global lock, so you can even construct a new mutex if the stream object doesn't have one. The real issue is ensuring that the mutex is removed from the map when the stream is destructed.

Upvotes: 2

Related Questions