bourne
bourne

Reputation: 1245

recv call hangs after socket is closed by remote

I am doing socket programming and have issues figuring out how to terminate a blocking recv call when it is already waiting and the other side closes the socket. The code that I have is as follows.

My Client looks like below

class Client {
public:

    void connect();

    void disConnect();

    void addPayload(const std::string& payload);

    void setPort(unsigned int port);

private:
    void shutdown();

    void recvThread(unsigned int socketId);
    void sendThread(unsigned int socketId);


    std::thread m_sendthread;

    std::thread m_recvthread;

    unsigned int m_socketId;

    bool m_isShutdown;

    std::mutex m_mutex;

    std::condition_variable m_sendThreadWait;

    std::vector<std::string> m_payloadJsonList;

    unsigned int m_port;
}; 

Client.cpp looks like below

void Client::connect() {
    LOG(__func__);
    m_socketId = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (m_socketId < 0) {
        LOG(__func__).m("failed to create the connecting socket.");
        return;
    }

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    server_addr.sin_port = htons(m_port);

    if (::connect(m_socketId, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        LOG(__func__).m("Unable to connect.");
        close(m_socketId);
        return;
    }

    LOG(__func__).m("Successfully connected.");

    m_recvthread = std::thread(&Client::recvThread, this, m_socketId);
    m_sendthread = std::thread(&Client::sendThread, this, m_socketId);
}

void Client::addPayload(const std::string& payload) {
    LOG(__func__);
    rapidjson::Document document;
    if (!json::parseJSON(payload, document)) {
        LOG(__func__).d("reason", "invalidJson");
        return;
    }
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_payloadJsonList.emplace_back(payload);
    }
}

void Client::recvThread(unsigned int socketId) {
    LOGLX(__func__).m("started.");
    bool joinThread = false;
    do {
        int requestLenBuffer = 0;  // Request size
        std::cout<<"1\n";
        ssize_t bytesRead = recv(socketId, &requestLenBuffer, sizeof(requestLenBuffer), 0);
        std::cout<<"2\n";
        if (0 == bytesRead) {
            {
                std::lock_guard<std::mutex> lock(m_mutex);
                joinThread = m_isShutdown;
            }
            LOG(__func__).m("connection closed by remote.");
            break;
        }

        if (bytesRead < 0) {
            LOG(__func__).m("recv failed reading request size.");
            return;
        }
        int requestInBytes = ntohl(requestLenBuffer);

        LOG(__func__).d("RequestSize", requestInBytes);

        ssize_t bytesRemaining = requestInBytes;
        ssize_t totalBytesRead = 0;
        std::vector<unsigned char> byteBuffer(bytesRemaining + 1);  // +1 for the NUL-terminator

        while (bytesRemaining > 0) {
            std::cout<<"3\n";
            ssize_t bytesRead = recv(socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
            std::cout<<"4\n";
            if (0 == bytesRead) {
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    joinThread = m_isShutdown;
                }
                LOG(__func__).m("connection closed by remote.");
                break;
            } else if (bytesRead < 0) {
                LOG(__func__).m("recv failed reading request.");
                joinThread = true;
                break;
            }
            totalBytesRead += bytesRead;
            bytesRemaining -= bytesRead;
        }

        if (!bytesRemaining && !joinThread) {
            byteBuffer[requestInBytes] = '\0';  // NUL-terminate the string
            LOG(__func__).d("Recieved Request: ", byteBuffer.data());
        }
    } while (!joinThread);
    LOG(__func__).m("completed");
}

void Client::sendThread(unsigned int socketId) {
    LOG(__func__).m("started.");
    std::vector<unsigned char> payloadBuffer;
    do {
        payloadBuffer.clear();
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_sendThreadWait.wait(lock, [this] { return m_payloadJsonList.size() || m_isShutdown; });
            if (m_isShutdown) {
                break;
            }
            std::string payload = m_payloadJsonList.back();
            std::vector<unsigned char> buffer(payload.begin(), payload.end());
            payloadBuffer = std::move(buffer);
            m_payloadJsonList.pop_back();
        }

        LOG(__func__).d("Sending Payload", payloadBuffer.data()).d("size", payloadBuffer.size());

        int requestLengthToSend = htonl(payloadBuffer.size());
        uint8_t bytesRemaining = payloadBuffer.size();
        uint8_t bytesAlreadySent = 0;
        send(socketId, &requestLengthToSend, sizeof(requestLengthToSend), 0);

        do {
            uint8_t bytes = send(socketId, &payloadBuffer[0] + bytesAlreadySent, bytesRemaining, 0);
            bytesAlreadySent += bytes;
            bytesRemaining -= bytes;

        } while (bytesRemaining > 0);

    } while (true);
    LOG(__func__).m("completed.");
}

void Client::disConnect() {
    LOG(__func__);
    shutdown();
}

void Client::setPort(unsigned int port) {
    LOG(__func__);
    m_port = port;
}

void Client::shutdown() {
    LOGLX(__func__);
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_isShutdown = true;
        m_sendThreadWait.notify_one();
    }

    close(m_socketId);
    //::shutdown(m_socketId, SHUT_RDWR);

    if (m_sendthread.joinable()) {
        m_sendthread.join();
    }

    if (m_recvthread.joinable()) {
        m_recvthread.join();
    }
}

My main class for client app looks like below

int main(int argc, char* argv[]) {
  
    //string jsonPayload = ...read payload;
    Client client;
    client.setPort(12345);
    client.addPayload(jsonPayload);
    client.connect();

    std::thread waitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(15)); });

    if (waitThread.joinable()) {
        waitThread.join();
    }
    client.disConnect();
    return 0;
}

my server class looks like below

server.h

class Server {
public:
    ~Server() = default;

    void listen() override;
    void disConnect() override;
    void setPort(unsigned int port) override;
    void shutdown() override;

protected:
    void acceptNewConnection();

    void sendThread();
    void recvThread();
    void acceptThread();

    
    Message processBuffer(std::vector<unsigned char>& byteBuffer);

    std::thread m_sendThread;
    std::thread m_recvThread;

    std::thread m_acceptThread;

    bool m_isShutdown;

    std::mutex m_mutex;

    int m_listenSocket;

    unsigned int m_port;

    unsigned int m_clientSocketId;

    std::condition_variable m_sendThreadWait;


    std::vector<Message> m_payloadList;

};

My server.cpp class looks like below

void Server::listen() {
    LOG(__func__);
    m_listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (m_listenSocket < 0) {
        LOG(__func__).m("Failed to open listen socket.");
        return;
    }

    int optval = 0;
    if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        LOG(__func__).m("Failed to set SO_REUSEADDR option.");
        close(m_listenSocket);
        return;
    }

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(m_port);

    if (bind(m_listenSocket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        LOG(__func__).m(" Failed to bind to port.");
        close(m_listenSocket);
        return;
    }

    int backlog = 1;  // We are only interested to pool one client.
    if (::listen(m_listenSocket, backlog) < 0) {
        LOG(__func__).m(" Failed to listen on the listen socket.");
        close(m_listenSocket);
        return;
    }

    LOG(__func__).m("SocketServer is listening.").d("port", m_port);

    m_acceptThread = std::thread(&Server::acceptThread, this);
}
void Server::disConnect() {
    LOG(__func__);
    shutdown();
}

void Server::setPort(unsigned int port) {
    LOG(__func__);
    m_port = port;
}

void Server::shutdown() {
    LOG(__func__);
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_isShutdown = true;
        m_sendThreadWait.notify_one();
    }

    close(m_listenSocket);
    close(m_clientSocketId);

    if (m_acceptThread.joinable()) {
        m_acceptThread.join();
    }

    if (m_recvThread.joinable()) {
        m_recvThread.join();
    }

    if (m_sendThread.joinable()) {
        m_sendThread.join();
    }
}

void SocketServer::acceptNewConnection() {
    LOG(__func__);
    socklen_t nlen = sizeof(struct sockaddr_in);
    try {
        int incomingSocketId = accept(m_listenSocket, NULL, &nlen);
        if (incomingSocketId < 0) {
            LOG(__func__).d("reason", "acceptConnectionFailed").m("Failed to accept the client connection");
            return;
        }

        m_clientSocketId = incomingSocketId;
        m_sendThread = std::thread(&Server::sendThread, this);
        m_recvThread = std::thread(&Server::recvThread, this);
    } catch (const std::exception& e) {
        LOG(__func__).d("reason", "acceptConnectionFailed").d("socket closed by remote.", e.what());
        return;
    }
}

void Server::sendThread() {
    LOG(__func__).m("started.");
    do {
        Message message;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_sendThreadWait.wait(lock, [this] { return m_payloadList.size() || m_isShutdown; });
            if (m_isShutdown) {
                break;
            }
            message = m_payloadList.back();
            m_payloadList.pop_back();
        }

        //Process the message and convert to string
        std::string payload = message.toString();

        ACSDK_INFO(LX(__func__).d("Sending Payload", payload).d("size", payload.size()));

        std::vector<unsigned char> buffer(payload.begin(), payload.end());
        int responseLength = payload.size();
        int responseLength_after_htol = htonl(responseLength);
        uint8_t bytesRemaining = responseLength;
        uint8_t bytesAlreadySent = 0;
        send(m_clientSocketId, &responseLength_after_htol, sizeof(responseLength), 0);

        do {
            uint8_t bytes = send(m_clientSocketId, &buffer[0] + bytesAlreadySent, bytesRemaining, 0);
            bytesAlreadySent += bytes;
            bytesRemaining -= bytes;

        } while (bytesRemaining > 0);
    } while (true);
    LOG(__func__).m("completed.");
}

void Server::acceptThread() {
    LOG(__func__).m("started.");
    acceptNewConnection();
    LOG(__func__).m("completed");
}

void Server::recvThread() {
    LOG(__func__).m("started.");
    bool joinThread = false;
    do {
        int requestLenBuffer = 0;
        std::cout<<"1\n";
        ssize_t bytesRead = recv(m_clientSocketId, &requestLenBuffer, sizeof(requestLenBuffer), 0);
        std::cout<<"2\n";
        if (0 == bytesRead) {
            {
                std::lock_guard<std::mutex> lock(m_mutex);
                joinThread = m_isShutdown;
            }
            LOG(__func__).m("connection closed by remote.");
            break;
        }

        if (bytesRead < 0) {
            LOG(__func__).m("recv failed reading request size.");
            break;
        }
        int requestInBytes = ntohl(requestLenBuffer);

        LOG(__func__).d("RequestSize", requestInBytes);

        ssize_t bytesRemaining = requestInBytes;
        ssize_t totalBytesRead = 0;
        std::vector<unsigned char> byteBuffer(bytesRemaining + 1);  // +1 for the NUL-terminator

        while (bytesRemaining > 0) {
            std::cout<<"3\n";
            ssize_t bytesRead = recv(m_clientSocketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
            std::cout<<"4\n";
            if (0 == bytesRead) {
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    joinThread = m_isShutdown;
                }
                LOG(__func__).m("connection closed by remote.");
                break;
            } else if (bytesRead < 0) {
                LOG(__func__).m("recv failed reading request.");
                joinThread = true;
                break;
            }
            totalBytesRead += bytesRead;
            bytesRemaining -= bytesRead;
        }

        if (!bytesRemaining && !joinThread) {
            byteBuffer[requestInBytes] = '\0';  // NUL-terminate the string
            LOG(__func__).d("Recieved Message: ", byteBuffer.data());

            /// Process the recieved Request/Message
        }
    } while (!joinThread);
    LOG(__func__).m("completed");
}

Main class for server app looks like below

int main(int argc, char* argv[]) {
    Server server;
    server.setPort(12345);
    server.listen();

    std::thread waitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(15)); });

    if (waitThread.joinable()) {
        waitThread.join();
    }

    server.disConnect();

    std::thread newWaitThread = std::thread([]() { std::this_thread::sleep_for(std::chrono::seconds(2)); });
    if (newWaitThread.joinable()) {
        newWaitThread.join();
    }
    return 0;
}

Output of Client application

...
Client:connect
Client:connect::Successfully connected.
Client:recvThread::started.
1
Client:sendThread::started.
Client:sendThread:Sending Payload={...},size=160
Client:disConnect
Client:shutdown
Client:sendThread::completed.

OutPut of server application

...
Server:listen
Server:listen::SocketServer is listening.:port=12345
Server:acceptThread::started.
Server:acceptNewConnection
Server:sendThread::started.
Server:acceptThread::completed
Server:recvThread::started.
1
2
Server:recvThread:RequestSize=160
3
4
Server:recvThread:Recieved Request: ={...}
1
Server:disConnect
Server:shutdown
Server:sendThread::completed.

So as can be seen RecvThread is not getting completed on both the application and is waiting on the recv blocking call even through I have closed the socket.

Things that I have tried/considered so far

  1. I tried to call ::shutdown(m_socketId, SHUT_RDWR); instead of close(m_socketId);, but nothing changed
  2. I read that this might be a known issue and to rewrite my class using non blocking calls like poll(), select() etc., but I would want that to be my last resort since I am happy with my code architecture and have already data transmitting over socket using my existing architecture.
  3. I also read about setting SO_RCVTIMEO on the listening thread and setting a timeout, but does that imply that even after the socket is closed the application will just hang there until we hit the SO_RCVTIMEO timeout and then recv call will proceed and I will get a chance to complete the recvThread.

-------UPDATE 1----------

updated orginal code so that recv is getting stored in variable of type ssize_t instead of uint8_t but still seeing that my recv is hanging even after i am closing the listening , connecting and the socket returned by accept in shutdown methods of my server and client classes.

---------UPDATE 2-------------

So i tried shutting down the socketId returned by accept

::shutdown(m_clientSocketId, SHUT_RDWR); in the shutdown of the SERVEr class but that did not made any differences to the behaviour and recv call still hangs.

i also tried setting SO_RCVTIMEO on the socketId returned by accept and it seems that the behaviour is that the socket loses even before shutdown occur, so i have the code as below

void Server::acceptNewConnection() {
    LOG(__func__);
    socklen_t nlen = sizeof(struct sockaddr_in);
    try {
        int incomingSocketId = accept(m_listenSocket, NULL, &nlen);
        if (incomingSocketId < 0) {
            LOG(__func__).m("Failed to accept the client connection");
            return;
        }

        m_clientSocketId = incomingSocketId;

        struct timeval tv;
        tv.tv_sec = 2;
        tv.tv_usec = 0;
        if (setsockopt(m_clientSocketId, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
            LOG(__func__).m("Failed to set SO_RCVTIMEO");
            return;
        }

        m_sendThread = std::thread(&Server::sendThread, this);
        m_recvThread = std::thread(&Server::recvThread, this);
    } catch (const std::exception& e) {
        LOG(__func__).d("socket closed by remote.", e.what());
        return;
    }
}

and the output from Server is as below which shows that after the 2sec timeout the recv thread is completing which is not the desired behaviour as i want recv call to come out only after the socket is closed and not complete the recv thread before hand, so this solution will not work

1
2
Server:recvThread::recv failed reading request size.
Server:recvThread::completed
Server:disConnect
Server:shutdown
Server:sendThread::completed.

Upvotes: 1

Views: 160

Answers (1)

Maximilian Gerhardt
Maximilian Gerhardt

Reputation: 5353

In Client::recvThread() as well as Server::recvThread() you do

        while (bytesRemaining > 0) {
            std::cout<<"3\n";
            uint8_t bytesRead = recv(socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
            std::cout<<"4\n";
            if (0 == bytesRead) {
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    joinThread = m_isShutdown;
                }
                LOG(__func__).m("connection closed by remote.");
                break;
            } else if (bytesRead < 0) {
                LOG(__func__).m("recv failed reading request.");
                joinThread = true;
                break;
            }
            totalBytesRead += bytesRead;
            bytesRemaining -= bytesRead;
        }

But in

uint8_t bytesRead = recv(..)

you use a uint8_t which is unsigned (value range 0 to 255). Thus if (bytesRead < 0) can absolutely never trigger. Your compiler should have warned you about this, maybe you ignored the warning.

The proper variable type to use for the return value of recv() is ssize_t, a signed size_t essentially, as the documentation says.

When the other side closes the TCP socket cleanly, any pending recv() should abort with a return value of 0, given the used "len" argument wasn't 0 to begin with.

       These calls return the number of bytes received, or -1 if an
       error occurred.  In the event of an error, errno is set to
       indicate the error.

       When a stream socket peer has performed an orderly shutdown, the
       return value will be 0 (the traditional "end-of-file" return).

Upvotes: 2

Related Questions