Reputation: 5301
Are concurrent appends to a file support feature?
I tested this with concurrent threads + fstream for each thread. I see that data is not corrupt, but some writes are lost. The file size is less than expected after the writes finish. Writes don't overlap.
If I write with custom seeks with each fstream where I coordinate which offset each thread will write, no writes are lost.
Here is the sample code :
#include <fstream>
#include <vector>
#include <thread>
#include "gtest/gtest.h"
void append_concurrently(string filename, const int data_in_gb, const int num_threads, const char start_char,
bool stream_cache = true) {
const int offset = 1024;
const long long num_records_each_thread = (data_in_gb * 1024 * ((1024 * 1024) / (num_threads * offset)));
{
auto write_file_fn = [&](int index) {
// each thread has its own handle
fstream file_handle(filename, fstream::app | fstream::binary);
if (!stream_cache) {
file_handle.rdbuf()->pubsetbuf(nullptr, 0); // no bufferring in fstream
}
vector<char> data(offset, (char)(index + start_char));
for (long long i = 0; i < num_records_each_thread; ++i) {
file_handle.write(data.data(), offset);
if (!file_handle) {
std::cout << "File write failed: "
<< file_handle.fail() << " " << file_handle.bad() << " " << file_handle.eof() << std::endl;
break;
}
}
// file_handle.flush();
};
auto start_time = chrono::high_resolution_clock::now();
vector<thread> writer_threads;
for (int i = 0; i < num_threads; ++i) {
writer_threads.push_back(std::thread(write_file_fn, i));
}
for (int i = 0; i < num_threads; ++i) {
writer_threads[i].join();
}
auto end_time = chrono::high_resolution_clock::now();
std::cout << filename << " Data written : " << data_in_gb << " GB, " << num_threads << " threads "
<< ", cache " << (stream_cache ? "true " : "false ") << ", size " << offset << " bytes ";
std::cout << "Time taken: " << (end_time - start_time).count() / 1000 << " micro-secs" << std::endl;
}
{
ifstream file(filename, fstream::in | fstream::binary);
file.seekg(0, ios_base::end);
// This EXPECT_EQ FAILS as file size is smaller than EXPECTED
EXPECT_EQ(num_records_each_thread * num_threads * offset, file.tellg());
file.seekg(0, ios_base::beg);
EXPECT_TRUE(file);
char data[offset]{ 0 };
for (long long i = 0; i < (num_records_each_thread * num_threads); ++i) {
file.read(data, offset);
EXPECT_TRUE(file || file.eof()); // should be able to read until eof
char expected_char = data[0]; // should not have any interleaving of data.
bool same = true;
for (auto & c : data) {
same = same && (c == expected_char) && (c != 0);
}
EXPECT_TRUE(same); // THIS PASSES
if (!same) {
std::cout << "corruption detected !!!" << std::endl;
break;
}
if (file.eof()) { // THIS FAILS as file size is smaller
EXPECT_EQ(num_records_each_thread * num_threads, i + 1);
break;
}
}
}
}
TEST(fstream, file_concurrent_appends) {
string filename = "file6.log";
const int data_in_gb = 1;
{
// trunc file before write threads start.
{
fstream file(filename, fstream::in | fstream::out | fstream::trunc | fstream::binary);
}
append_concurrently(filename, data_in_gb, 4, 'B', false);
}
std::remove(filename.c_str());
}
Edit:
I moved fstream to be shared by all threads. Now, for 512 byte buffer size, i see 8 writes totalling 4 KB lost consistently.
const int offset = 512;
const long long num_records_each_thread = (data_in_gb * 1024 * ((1024 * 1024) / (num_threads * offset)));
fstream file_handle(filename, fstream::app | fstream::binary);
if (!stream_cache) {
file_handle.rdbuf()->pubsetbuf(nullptr, 0); // no bufferring in fstream
}
Problem does not reproduce with 4KB buffer size.
Running main() from gtest_main.cc
Note: Google Test filter = *file_conc*_*append*
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from fstream
[ RUN ] fstream.file_concurrent_appends
file6.log Data written : 1 GB, 1 threads , cache true , size 512 bytes Time taken: 38069289 micro-secs
d:\projs\logpoc\tests\test.cpp(279): error: Expected: num_records_each_thread * num_threads * offset
Which is: 1073741824
To be equal to: file.tellg()
Which is: 1073737728
d:\projs\logpoc\tests\test.cpp(301): error: Expected: num_records_each_thread * num_threads
Which is: 2097152
To be equal to: i + 1
Which is: 2097145
Edit 2:
Close file_handle
after joining all threads to flush the data from the internal buffer. This resolved above issue.
Upvotes: 1
Views: 204
Reputation: 25261
According to §29.4.2 ¶7 of the official ISO C++20 standard, the functions provided by std::fstream
are generally thread-safe.
However, if every thread has its own std::fstream
object, then, as far as the C++ standard library is concerned, these are distinct streams and no synchronization will take place. Only the operating system's kernel will be aware that all file handles point to the same file. Therefore, any synchronization will have to be done by the kernel. But the kernel possibly isn't even aware that a write is supposed to go to the end of the file. Depending on your platform, it is possible that the kernel only receives write requests for certain file positions. If the end of file has meanwhile been moved by an append from another thread, then the position for a thread's previous write request may no longer be to the end of the file.
According to the documentation on std::fstream::open
, opening a file in append mode will cause the stream to seek to the end of the file before every write. This behavior seems to be exactly what you want. But, for the reasons stated above, this will probably only work if all threads share the same std::fstream
object. In that case, the std::fstream
object should be able to synchronize all writes. In particular, it should be able to perform the seeks to the end of file and the subsequent writes atomically.
Upvotes: 2