J.F.
J.F.

Reputation: 63

Writing to a socket from different threads (a concurrent UDP server in Haskell)

EDIT:

I'm writing an UDP BitTorrent tracker in Haskell. The state is based on STM (two TVar Maps) passed in my datatype ServerState into runUDPServer, acceptConnections, handleConnection and handleRequestData. The clients will either request starting a "connection", announce or scrape. Every time someone sends the server a message, they're supposed to get a message back. (The protocol is here: http://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html).

I will do some binary parsing, some processing in the IO monad (just STM really) and send a message encoded in binary back to the sender. Initially, I was thinking that I could just run every request like this in its own thread, but I guess I could just fork a few threads and let them do the work instead. One problem with that might be that the whole server (all threads) would be blocked by n people sending UDP packages really slowly (but maybe that's not actually possible).

I think I could define my question more clearly thus: if I just fork n threads that all run handleConnection simultaneously, will that mess with the socket somehow? Also, (how) could I ideally somehow spawn a new thread for each received packet?

I mean, when I fork a few threads and write to stdout, the output will be garbled / a mixture between what was printed from the separate threads. Network.accept actually provides a handle and the individual threads don't really need to know about the socket, but I can't use accept. I wouldn't just assume that it's safe to write to a socket from multiple threads simultaneously.

{-# LANGUAGE OverloadedStrings #-}

import Control.Exception (bracket)
import qualified Data.ByteString.Char8 as BS
import qualified Network.Socket as S hiding (send, sendTo, recv, recvFrom)
import qualified Network.Socket.ByteString as S

runUDPServer serverState port =
    S.withSocketsDo $ bracket (createSocket port) S.close (acceptConnections serverState)

    where
        createSocket port = do
            serverAddr <- fmap head $ S.getAddrInfo
                (Just (S.defaultHints {S.addrFlags = [S.AI_PASSIVE]}))
                Nothing
                (Just port)

            socket <- S.socket (S.addrFamily serverAddr) S.Datagram S.defaultProtocol
            S.bind socket $ S.addrAddress serverAddr

            return socket

        acceptConnections serverState socket = do
            handleConnection serverState socket
            acceptConnections socket

        handleConnection serverState socket = do
            (requestData, remoteAddress) <- S.recvFrom socket 2048
            responseData <- handleRequestData serverState requestData remoteAddress
            S.sendTo socket responseData remoteAddress

        handleRequestData :: ServerState -> BS.ByteString -> S.SockAddr -> IO BS.ByteString
        handleRequestData serverState requestData remoteAddress = do
            putStrLn "-----"

            putStrLn $ "Received UDP message"
            putStrLn $ "Address: " ++ show remoteAddress

            -- (left out code here)
            return "Dummy ByteString"

I would be very grateful for any tips, pointers etc.

Upvotes: 1

Views: 1474

Answers (2)

sclv
sclv

Reputation: 38893

Personally I'd have a single thread for reads, that just called recv in a loop, and tossing the requests on to a Chan then another for writes, spooling off a Chan. However, I actually do suspect, though can't confirm, that you could have multiple threads doing this directly as in your described architecture. The reason I think this may be ok is that IO gets trafficed through the IO manager, which handles multiplexing. While not containing the latest developments, I think the MIO paper should cover the basics of how things are currently implemented.

Upvotes: 0

Christopher King
Christopher King

Reputation: 10941

You need to learn some concurrency bro!

First thing you need to learn is forkIO :: IO () -> IO ThreadId. This is where all the concurrency starts. You give it an IO action, and it starts a thread to run that IO action, like magic man! Everything, including that accept thing you were talking about, goes back to forkIO! Since Haskell data is immutable, it's perfectly safe to use (if you don't use locks, no problem (and then deadlock is impossible).)

The rest of learning concurrency in Haskell is how to use forkIO, and libraries based on forkIO (and some related primitives.) First, read Control.Concurrent. Then read Parallel and Concurrent Haskell (free ebook). To get a feel for how the book helps your situation, go to Chapter 12.

Haskell is much much much better at dealing with concurrency than any imperative and/or impure language, unless specifically built for concurrency (cue downvotes from non-Haskellers). Embrace it.


Okay, for this, you would use some sort of channel. MVar would actually be sufficient (as long as you are writing faster than you are producing). See this. If you produce faster than you write, see this.

The stm package has similar constructs.

Upvotes: 0

Related Questions