0n1
0n1

Reputation: 33

Passing non-thread-safe objects through thread-safe containers

I have a thread-safe object queue which is designed to model a pipeline of work moving between a chain of threads. In some cases, I want to pass non-thread-safe objects (e.g., std::vectors, or other STL containers) as part of these work items.

Now, in the case you have a shared object between threads, there is an obvious problem of load/store ordering in ensuring object consistency. Since the thread-safe queue ensures that only one thread has ownership of the object there is no chance for multiple threads trying to modify or read the object at the same time.. the only possible problem I see is ensuring memory consistency in previously issued loads/stores on the object by previous owner threads.

The queue ensures thread safety by creating a lock_guard<...> on queue operations. Wouldn't memory consistency of the object being moved between threads be guaranteed since the memory fencing and synchronization would be taken care by the lock_guard?

Part of me wants to ensure I am only passing thread-safe objects between threads, but I feel like this case there should be no problem. Is this true?

Upvotes: 3

Views: 322

Answers (2)

Francis Cugler
Francis Cugler

Reputation: 7905

I use a slightly different method to block threads I do not know if this would help in any way or not but I don't mind sharing what I have to offer different opinions or approaches.

BlockThread.h

#ifndef BLOCK_THREAD_H
#define BLOCK_THREAD_H

namespace vmk {

class BlockThread sealed {
private:
    CRITICAL_SECTION* m_pCriticalSection;

public:
    explicit BlockThread( CRITICAL_SECTION& criticalSection );
    ~BlockThread();

private:
    BlockThread( const BlockThread& c ); // Not Implemented
    BlockThread& operator=( const BlockThread& c ); // Not Implemented
    // void swap( BlockThread& other ) throw();

}; // BlockThread

} // namespace vmk

#endif // BLOCK_THREAD_H

BlockThread.cpp

#include "stdafx.h"
#include "BlockThread.h"

namespace vmk {

// ----------------------------------------------------------------------------
// BlockThread()
BlockThread::BlockThread( CRITICAL_SECTION& criticalSection ) {
    m_pCriticalSection = &criticalSection;
    EnterCriticalSection( m_pCriticalSection );
} // BlockThread

// ----------------------------------------------------------------------------
// ~BlockThread()
BlockThread::~BlockThread() {
    LeaveCriticalSection( m_pCriticalSection );
} // ~BlockThread    

} // namespace vmk

VolatileLocker.h

#ifndef VOLATILE_LOCKER_H
#define VOLATILE_LOCKER_H

namespace vmk {

template <typename T>
class VolatileLocker {
private:
    T*                  m_pObject;
    CRITICAL_SECTION*   m_pCriticalSection;

public:
    VolatileLocker( volatile T& objectToLock, CRITICAL_SECTION& criticalSection );
    ~VolatileLocker();

    T* operator->();

private:
    VolatileLocker( const VolatileLocker& c ); // Not Implemented
    VolatileLocker& operator=( const VolatileLocker& c ); // Not Implemented

}; // VolatileLocker

#include "VolatileLocker.inl"

} // namespace vmk

#endif // VOLATILE_LOCKER_H

// reference: http://drdobbs.com/cpp/184403766

VolatileLocker.inl

// ----------------------------------------------------------------------------
// VolatileLocker()
// Locks A Volatile Variable So That It Can Be Used Across Multiple Threads Safely
template<typename T>
VolatileLocker<T>::VolatileLocker( volatile T& objectToLock, CRITICAL_SECTION& criticalSection ) :
m_pObject( const_cast<T*>( &objectToLock ) ),
m_pCriticalSection( &criticalSection ) {
    EnterCriticalSection( m_pCriticalSection );
} // VolatileLocker

// ----------------------------------------------------------------------------
// ~VolatileLocker()
template<typename T>
VolatileLocker<T>::~VolatileLocker() {
    LeaveCriticalSection( m_pCriticalSection );
} // ~VolatileLocker

// ----------------------------------------------------------------------------
// operator->()
// Allow The Locked Object To Be Used Like A Pointer
template <typename T>
T* VolatileLocker<T>::operator->() {
    return m_pObject;
} // operator->

Here is a class object that uses BlockThread - This class depends on other classes not shown here and I'll only include the parts of this class that uses the BlockThread Object.

AudioManager.cpp

#include "stdafx.h"
#include "AudioManager.h"

#include "AudioBuffer.h"
#include "AudioSource.h"
#include "BlockThread.h"
#include "Logger.h"

namespace vmk {

static AudioManager*        s_pAudioManager = nullptr;
static CRITICAL_SECTION     s_csChangeSources;

// ----------------------------------------------------------------------------
// AudioManager()
AudioManager::AudioManager() :
Singleton( Singleton::TYPE_AUDIO_MANAGER ) {
    InitializeCriticalSection( &s_csChangeSources );

    if ( !alutInit( NULL, NULL ) ) {
        std::ostringstream strStream;
        strStream << __FUNCTION__ << " ALUT error: " << alutGetErrorString( alutGetError() );
        throw ExceptionHandler( strStream );
    }

    alGetError(); // Clear Errors

    alListenerf( AL_GAIN, 1.0f ); // Master Volume
    alListener3f( AL_POSITION, 0.0f, 0.0f, 0.0f );
    alListener3f( AL_VELOCITY, 0.0f, 0.0f, 0.0f );

    float f6Orient[] = { 0.0f, 0.0f, -1.0f,   // Forward(X, Y, Z)
                         0.0f, 1.0f,  0.0f }; // Up(X,Y,Z)

    alListenerfv( AL_ORIENTATION, f6Orient );
    if ( alGetError() != AL_NO_ERROR ) {
        std::ostringstream strStream;
        strStream << __FUNCTION__ << " Failed to initialize Listener";
        throw ExceptionHandler( strStream );
    }

    s_pAudioManager = this;
} // AudioManager

// ----------------------------------------------------------------------------
// ~AudioManager()
AudioManager::~AudioManager() {
    s_pAudioManager = nullptr;

    m_mAudioSources.clear();
    m_lAudioBuffers.clear();

    alutExit();

    DeleteCriticalSection( &s_csChangeSources );
} // ~AudioManager

// ----------------------------------------------------------------------------
// get()
AudioManager* const AudioManager::get() {
    if ( nullptr == s_pAudioManager ) {
        throw ExceptionHandler( __FUNCTION__ + std::string( " failed, AudioManager has not been constructed yet" ) );
    }
    return s_pAudioManager;
} // get    

// ----------------------------------------------------------------------------
// Create A Sound Source Using The Passed In Filename. If The File Is Not
// Already Loaded Into A Memory Buffer, Then A New Buffer Is Created.
void AudioManager::createSource( SoundSource eSoundSource, const std::string& strFilename, bool bAttachToListener ) {
    BlockThread blockThread( s_csChangeSources );

    if ( !isAvailable( eSoundSource ) ) {
        return;
    }

    std::shared_ptr<AudioBuffer> pAudioBuffer = nullptr;

    // Check If This File Has Already Been Loaded Into A Buffer
    for ( ListAudioBuffers::iterator itBuffer = m_lAudioBuffers.begin(); itBuffer != m_lAudioBuffers.end(); ++itBuffer ) {
        if ( (*itBuffer)->isThisFile( strFilename ) ) {
            // The Requested File Is Already Loaded Into Memory
            pAudioBuffer = (*itBuffer);
            break;
        }
    }

    try {
        if ( nullptr == pAudioBuffer ) {
            // Need To Load The Desired File Into Memory
            pAudioBuffer.reset( new AudioBuffer( strFilename ) );

            // Store The Buffer
            m_lAudioBuffers.push_back( pAudioBuffer );
        }

        // Create New Source Attached To The Desired Audio Buffer
        m_mAudioSources[eSoundSource] = std::shared_ptr<AudioSource>( new AudioSource( eSoundSource, pAudioBuffer, bAttachToListener ) );

    } catch ( ... ) {
        std::ostringstream strStream;
        strStream << __FUNCTION__ << " failed for SoundSource(" << eSoundSource << ")";
        Logger::log( strStream, Logger::TYPE_ERROR );
    }
} // createSource

// ----------------------------------------------------------------------------
// Removes Source From Map And If Buffer Is No Longer Needed,
// It Will Also Be Deleted
void AudioManager::deleteSource( SoundSource eSoundSource ) {
    BlockThread blockThread( s_csChangeSources );

    MapAudioSources::iterator it = m_mAudioSources.find( eSoundSource );
    if ( it == m_mAudioSources.end() ) {
        std::ostringstream strStream;
        strStream << __FUNCTION__ << " could not find SoundSource(" << eSoundSource << ")";
        Logger::log( strStream, Logger::TYPE_ERROR );
        return; // Nothing To Delete
    }

    // Get bufferId And Delete Source
    unsigned uBufferId = it->second->getBufferId();
    m_mAudioSources.erase( it );

    // Find Buffer In List
    if ( uBufferId != INVALID_UNSIGNED ) {
        for ( ListAudioBuffers::iterator itBuffer = m_lAudioBuffers.begin(); itBuffer != m_lAudioBuffers.end(); ++itBuffer ) {
            if ( (*itBuffer)->getId() == uBufferId ) {
                if ( (*itBuffer)->getNumberSourcesAttached() < 1 ) {
                    // Buffer No Longer Needed
                    m_lAudioBuffers.erase( itBuffer );
                    return;
                } // If Buffer Not Needed
            } // If Found Buffer
        } // For All Buffers
    } // If Buffer Is Loaded
} // deleteSource

// ----------------------------------------------------------------------------
// getAudioObject()
AudioObject* AudioManager::getAudioObject( SoundSource eSoundSource ) const {
    BlockThread blockThread( s_csChangeSources );

    MapAudioSources::const_iterator it = m_mAudioSources.find( eSoundSource );
    if ( it != m_mAudioSources.cend() ) {
        return it->second.get();
    }

    std::ostringstream strStream;
    strStream << __FUNCTION__ << " SoundSource(" << eSoundSource << ") has not been found";
    Logger::log( strStream, Logger::TYPE_ERROR );
    return nullptr;
} // getAudioObject


} // namespace vmk

And the use of a VolatileLocker would be as follows: NOTE: I also have an OpenglThread class object not shown here to allow OpenGL to work with multiple threads.

Game.cpp

#include "Game.h"
#include "OpenglThread.h"
#include "VolatileLocker.h"

... other class object includes

namespace vmk {

static CRITICAL_SECTION s_criticalSection; 

// ----------------------------------------------------------------------------
// Game()
Game::Game() :
Engine( glm::uvec2( 3, 3 ) ),
m_maxSpeechArea( 250, INVALID_UNSIGNED ),
m_eLastSpeechSound( SS_INTRO_SHOT ),
m_eSoundSourceToPlay( SS_INTRO_SHOT ) {
    InitializeCriticalSection( &s_criticalSection );

    const glm::uvec2 gamePixelSize = m_pSettings->getGameSize();

    // (0,0) In Top Left Corner Not Bottom Left Which Is The Default
    m_m4Projection = glm::ortho( 0.0f, static_cast<float>( gamePixelSize.x ), static_cast<float>( gamePixelSize.y ), 0.0f, -10.0f, 10.0f );

    // Set Background Color
    glClearColor( 22.0f / 255.0f, 22.0f / 255.0f, 22.0f / 255.0f, 1.0f );

    // Turn Transparencies On
    glEnable( GL_BLEND );

    // Initialize Shaders
    std::string strVertexShader( "Shaders/gui.vert" );
    std::string strFragmentShader( "Shaders/gui.frag" );

    ShaderProgramSettings shaderProgramSettings( P_MAIN, strVertexShader, strFragmentShader );
    shaderProgramSettings.addVariable( ShaderAttribute( A_POSITION,             AT_FLOAT_VEC2 ), "inPosition" );
    shaderProgramSettings.addVariable( ShaderAttribute( A_COLOR,                AT_FLOAT_VEC4 ), "inColor" );
    shaderProgramSettings.addVariable( ShaderAttribute( A_TEXTURE_COORD0,       AT_FLOAT_VEC2 ), "inTextureCoord0" );

    shaderProgramSettings.addVariable( ShaderUniform( U_MVP_MATRIX,             UT_FLOAT_MAT4 ), "modelViewProjectionMatrix" );
    shaderProgramSettings.addVariable( ShaderUniform( U_TEXTURE0_SAMPLER_2D,    UT_SAMPLER_2D ), "texture0Sampler2d" ); 
    shaderProgramSettings.addVariable( ShaderUniform( U_USING_TEXTURE,          UT_BOOL ),       "usingTexture" );  
    shaderProgramSettings.addVariable( ShaderUniform( U_ALPHA,                  UT_FLOAT ),      "inAlpha" );

    m_pShaderManager->create( shaderProgramSettings );
    m_pShaderManager->enable( P_MAIN );

    m_pBatchManager.reset( new BatchManager( 10, 10000 ) );

    // Must Be Called Before Any GuiElements Are Loaded
    GuiElement::initialize();   

    // Load Game Logo - Title Screen
    m_pTitleScreen = new GuiScreen( std::string( "TitleScreen" ) );

    TextureFileReader titleTextureFileReader( "Assets/images/titleScreen.png" );
    m_titleTextureInfo =  titleTextureFileReader.getOrCreateTextureInfo( TextureInfo::FILTER_NONE, false, false );

    // Start Worker Thread
    _beginthread( loadAssets, 0, this );

    // Game Logo
    GuiLayoutAbsolute* pTitleCoverLayout = new GuiLayoutAbsolute( glm::ivec2(), Gui::LEFT, m_pSettings->getGameSize(), "title cover" );
    pTitleCoverLayout->setColor( glm::vec4( 0.0862745, 0.0862745, 0.0862745, 1.0 ) );
    m_pTitleScreen->addChild( pTitleCoverLayout );

    m_pTitleLayout = new GuiLayoutAbsolute( glm::ivec2( 0, 200 ), Gui::LEFT, glm::uvec2( 955, 400 ), "title" );
    m_pTitleLayout->setBackgroundImage( m_titleTextureInfo, glm::uvec2( 0, 359 ), glm::uvec2( 955, 400 ) );
    m_pTitleLayout->changePriority( 1 );
    m_pTitleScreen->addChild( m_pTitleLayout );

    // Flying Bullet
    m_pFlyingBulletLayout = new GuiLayoutAbsolute( glm::ivec2( 40, -100 ), "flying bullet" );
    m_pTitleLayout->addChild( m_pFlyingBulletLayout );

    // Intro Sound Effect
    m_pAudioManager->createSource( SS_INTRO_SHOT, "Assets/audio/introShot.ogg" );
    m_pAudioManager->play( SS_INTRO_SHOT );

    // Set Timer For How Long Title Screen Should Be Visible
    m_pAnimationManager->addFunction( 5.0, Animation::LINEAR, splashScreenUpdate, this, splashScreenDone, this );

    /*
    int debugLogging = m_pSettings->getDebugLogging() | Settings::DEBUG_RENDER;
    m_pSettings->setDebugLogging( debugLogging );
    */

} // Game

// ----------------------------------------------------------------------------
// ~Game()
Game::~Game() { 
    DeleteCriticalSection( &s_criticalSection );
} // ~Game


// ----------------------------------------------------------------------------
// splashScreenDone()
// Defined Outside Of Game But Is Declared As A Friend Function To Game And It Utilizes The VolatileLocker
void splashScreenDone( void* pParameter ) {
    Game* pGame = reinterpret_cast<Game*>( pParameter );
    if ( nullptr == pGame ) {
        throw ExceptionHandler( __FUNCTION__ + std::string( " Invalid pParameter passed in" ) );
    }

    VolatileLocker<GameState>( pGame->m_gameState, s_criticalSection )->timerDone();

    if ( VolatileLocker<GameState>( pGame->m_gameState, s_criticalSection )->is( GameState::PLAYING ) ) {
        pGame->m_pAudioManager->play( SS_CHOOSE_LETTER );
    }

} // splashScreenDone

// ----------------------------------------------------------------------------
// keyboardInput()
// A Member Function Of Game That Utilizes The VolatileLocker
void Game::keyboardInput( unsigned vkCode, bool isPressed ) {
    if ( isPressed || VolatileLocker<GameState>( m_gameState, s_criticalSection )->isSplashScreen() ) {
        // Wait For Splash Screen To Be Finished
        // Only React To Key Release Events
        return;
    }

    static unsigned lastKey = 0;

    if ( (VK_ESCAPE == lastKey && VK_ESCAPE == vkCode) ||
        (VK_ESCAPE == vkCode && m_bGameOver) ) {
        // TODO: Show Credits

        quitGame( nullptr, nullptr );
        return;
    }
    lastKey = vkCode;

    if ( m_bGameOver ) {
        if ( VK_SPACE == vkCode ) {
            restart();
        }
        return;
    }

    if ( VK_ESCAPE == vkCode ) {
        updateSpeech( "To qui the game, press ESC again.", COLOR_YELLOW );
        speak( SS_QUIT_GAME );

    } else if ( vkCode >= VK_KEYA && vkCode <= VK_KEYZ ) {
        // Play Sound
        m_pAudioManager->play( SS_GUN_SHOT );

        updatePuzzle( static_cast<char>( vkCode ) );

        // Show Gun Fire & Start Timer To Reset Graphics Back To Normal
        showGunFire( true );
        m_pAnimationManager->addTimer( 0.2, resetGunGraphics, this );

    } else {
        updateSpeech( "Choose a letter." );
        speak( SS_CHOOSE_LETTER );
    }

} // keyboardInput

} // namespace vmk

Now I know the OP mentioned about working in a Thread Pool or Queue, but I think the overall concept of being able to lock threads for safety reasons can be shown here as well. This may also serve as a guide to anyone who may read this.

Upvotes: 0

Lightness Races in Orbit
Lightness Races in Orbit

Reputation: 385174

The queue ensures thread safety by creating a lock_guard<...> on queue operations. Wouldn't memory consistency of the object being moved between threads be guaranteed since the memory fencing and synchronization would be taken care by the lock_guard?

Yes.

Part of me wants to ensure I am only passing thread-safe objects between threads, but I feel like this case there should be no problem. Is this true?

Yes.

This is essentially why volatile is inapplicable as a device for concurrent data access in C++; it doesn't solve race conditions and, once you've brought concurrency devices (e.g. mutexes) into the fray to fix that, those are also taking care of the memory consistency issue so there's simply nothing left for volatile to do.

Upvotes: 5

Related Questions