abilash
abilash

Reputation: 897

Synchronization output streams from different threads

I'm writing an app that writes to xml file some data from different threads. I try to synchronized it using Event core object, but in file I get wrong data. I get next result

<file path="somePath" />
<file path="somePath" <file path="somePath" /> />....

but I expect to get

<file path="somePath" />
<file path="somePath" />
<file path="somePath" />

See below my pseudo code. What wrong in it?

unsigned int WINAPI MyThread(void *p)
{
    std::wofstream outstr;
    outstr.open("indexingtest.xml", std::ios::app);
    do
    {
        if(somePredicat1)
        {
            WaitForSingleObject(hEvent, INFINITE);
            outstr <<"<file path=\""<< sFileName << "\"\n";
            outstr <<"\tsize=\""<< fileSize << "\" />\n";           
            ReleaseMutex(hMutex);
        }
         if(somePredicat3)
         {
             MyThread(sFileName);
         }
    }while(somePredicat2);
    outstr.close();
    FindClose( hSearch );
    return 0;
}

int _tmain(int argc, TCHAR *argv[])
{
    hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    //hMutex = CreateMutex(NULL, FALSE, 0);
    unsigned int ThreadID;
    HANDLE hThread1 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"D:\\*", 0, &ThreadID);
    HANDLE hThread2 = (HANDLE)_beginthreadex(NULL, 0, MyThread, L"C:\\*", 0, &ThreadID);
    SetEvent(hEvent);
    std::wcout << "\a" << std::endl;
    WaitForSingleObject( hThread1, INFINITE );
    return 0;
}

More concrete code

HANDLE hMutex = CreateMutex(NULL,FALSE, 0);
wchar_t** GetAllFilesImpl( wchar_t const* folder, wchar_t** res, size_t* pAllocated, size_t* pUsed )
{
    HANDLE hSearch;
    WIN32_FIND_DATAW fileinfo;
    size_t allocatedMemory = 0;


    hSearch = FindFirstFileW( folder, &fileinfo );
    if( hSearch != INVALID_HANDLE_VALUE ) {
        do {

            wchar_t* sFileName, ** tmp, sTmp[ 1024 ];
            long fileSize = 0;
            long creationDate;
            /* ignore ., .. */
            if( !wcscmp(fileinfo.cFileName, L".") ||
                !wcscmp(fileinfo.cFileName, L"..") )
                continue;
            sFileName = PathCreator( folder, fileinfo.cFileName );
            fileSize = fileinfo.nFileSizeLow;
            creationDate = fileinfo.ftCreationTime.dwHighDateTime;


            if(fileSize)
            {
                WaitForSingleObject(hMutex, INFINITE);
                std::wofstream outstr;
                            outstr.open("indexingtest.xml", std::ios::app);
                outstr.seekp(std::ios_base::end);
                outstr <<"<file path=\""<< sFileName << "\"\n";
                outstr <<"\tsize=\""<< fileSize << "\" />\n";
                outstr.seekp(std::ios_base::end);
                outstr.close();
                wprintf( L"%s\n", sFileName);
                ReleaseMutex(hMutex);
            }

            tmp = AddToArray( res, pAllocated, pUsed, sFileName );
            if( !tmp ) return FreeAllFilesMemory(res), NULL;
            res = tmp;

            if( fileinfo.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY ) {
                wcscpy_s( sTmp, sFileName );
                wcscat_s( sTmp, L"\\*" );
                tmp = GetAllFilesImpl( sTmp, res, pAllocated, pUsed );
                if( !tmp ) return NULL;
                res = tmp;
            }
        } while( FindNextFileW(hSearch, &fileinfo) );

        FindClose( hSearch );
    }
    return res;
}

unsigned int WINAPI GetAllFiles( void* folder )
{
    size_t nAllocated = 0, nUsed = 0;
    wchar_t** res = GetAllFilesImpl( (wchar_t *)folder, NULL, &nAllocated, &nUsed );
    if( res ) {
        /* to indicate end of result add a NULL string */
        wchar_t** tmp = AddToArray( res, &nAllocated, &nUsed, NULL );
        if( !tmp ) return FreeAllFilesMemory(res), -1;
        res = tmp;
    }
    std::wcout << "\a" << std::endl;
    return 0;
}
int _tmain(int argc, TCHAR *argv[])
{

    Sleep(1000);
    unsigned int ThreadID;
    HANDLE hThreads[3];
    hThreads[0] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"D:\\*", 0, &ThreadID);
    hThreads[1] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"C:\\Users\\Andrew\\Desktop\\*", 0, &ThreadID);
    hThreads[2] = (HANDLE)_beginthreadex(NULL, 0, GetAllFiles, L"E:\\*", 0, &ThreadID);
    unsigned int dw = WaitForMultipleObjects(3, hThreads, TRUE, INFINITE);
    CloseHandle(hFile);
    printf("finished\n");
    return 0;
}

Upvotes: 0

Views: 415

Answers (2)

Arno
Arno

Reputation: 5194

  1. You should wait for all threads before ending the code

    HANDLE aThread[2];
    
    ...
    
    aThread[0] =  (HANDLE)_beginthreadex(...
    aThread[1] =  (HANDLE)_beginthreadex(...
    
    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
    
  2. You're waiting for the the event before you do your output. When done with the output you release the mutex. This does not make sense. You should wait for the mutex and for the event. Both thread are getting their wait released when the event is set. Thus the mutex does not do anything. When you put the event handle and the mutex handle into one array, you can use WaitForMultipleObjects for this purpose too:

    HANDLE hVarious[2];
    hVarious[0] = CreateEvent(NULL, TRUE, FALSE, NULL);
    // Note: this is a manual reset event. 
    // Thus is stays set until explicitly reset
    
    hVarious[1] = CreateMutex(NULL, FALSE, 0);
    
    // and now start the threads:
    aThread[0] =  (HANDLE)_beginthreadex(...
    aThread[1] =  (HANDLE)_beginthreadex(...
    
    // and set the event:
    SetEvent(hEvent);
    
    
    WaitForMultipleObjects(2, aThread, TRUE, INFINITE);
    

    The thread then should look like:

    unsigned int WINAPI MyThread(void *p)
    {
      do
      {
        if(somePredicat1)
        {
          // wait for the mutex AND the event
          WaitForMultipleObjects(2, hVarious, TRUE, INFINITE);
    
          // do the file stuff in the mutex protected part       
          std::wofstream outstr;
          outstr.open("indexingtest.xml", std::ios::app);
    
          outstr <<"<file path=\""<< sFileName << "\"\n";
          outstr <<"\tsize=\""<< fileSize << "\" />\n";           
    
          outstr.close();
          FindClose( hSearch );
          ReleaseMutex(hVarious[1]);
        }
      }while(somePredicat2);
      return 0;
    }
    

Remember: The mutex is established to protect resources in concurrent applications.

I have no idea about somePredicat1 and somePredicat1. These parameters may also be in trouble when used in different threads. However, the faulty output you observed is caused by your wrong mutex usage.

Edit after comment:

    if(somePredicat3)
    {
         MyThread(sFileName);
    }

a. The thread is called by itself as a function without closing the file.

b. You should provide more details about what somePredicat3, somePredicat2, and somePredicat1 are for.

c. You'd have to protect the output file with some sort of exclusivity because it is used by more than one thread. You may also use a Critical Section Object to do so.

Upvotes: 0

Some programmer dude
Some programmer dude

Reputation: 409166

The big problem you have is that each thread is opening the file separately. Instead open the file before you create the threads, and then use the mutex to synchronize the writes to the file.

In pseudo code:

std::wofstream output_file;

void my_thread()
{
    do
    {
        if (some_condition)
        {
            lock_mutex();
            do_output();
            unlock_mutex();
        }
    } while (condition);
}

int main()
{
    output_file.open(...);

    create_thread();
    create_thread();

    output_file.close();
}

Upvotes: 2

Related Questions