Paul
Paul

Reputation: 382

Reading Lines Asynchronously in Haskell

I am trying to read lines without blocking using Async. I have figured out how to read one line, but I don't know the correct approach to make this continuous. Ideally I would like to have a generator that I could read the next line without blocking the main loop. How could I go about this?

module Main where

import Control.Concurrent
import Control.Concurrent.Async

-- New async reader
main :: IO ()
main = do
  getl <- async getLine
  mainLoop getl

-- Read stdin and echo forever
mainLoop :: Async String -> IO ()
mainLoop getl = do
  tryRead getl >>= tryPrint
  threadDelay $ 1000 * 1000
  mainLoop getl

-- Try to read stdin without blocking
tryRead :: Async String -> IO (Maybe String)
tryRead recvr = do
  res <- poll recvr
  case res of
    Nothing -> return Nothing
    Just (Right a) -> return $ Just a
    Just (Left e) -> error $ show e

-- Try to print string
tryPrint :: Maybe String -> IO ()
tryPrint (Just str) = print str
tryPrint Nothing = return ()

Output. What I assume is happening is that the Async getline is evaluated the first time, and then that value is returned every subsequent time the Async is polled.

hell
"hell"
"hell"
"hell"
"hell"
"hell"

Upvotes: 2

Views: 309

Answers (2)

Paul
Paul

Reputation: 382

A better solution using channel streams from unagi-chan.

module Main where

import Control.Monad
import Control.Concurrent hiding (newChan, writeChan)
import Control.Concurrent.Chan.Unagi.NoBlocking

main :: IO ()
main = do
  (inchan, outchan) <- newChan
  _ <- forkIO $ getStdin inchan
  [stream] <- streamChan 1 outchan
  mainLoop stream

mainLoop :: Stream String -> IO ()
mainLoop stream = do
  (str, stream') <- getNext stream
  forM_ str print
  threadDelay 1000
  mainLoop stream'

getStdin :: InChan String -> IO ()
getStdin chan = forever $ getLine >>= writeChan chan

getNext :: Stream String -> IO (Maybe String, Stream String)
getNext stream = do
  next <- tryReadNext stream
  case next of
    Next str stream' -> return (Just str, stream')
    Pending -> return (Nothing, stream)

Upvotes: 1

Paul
Paul

Reputation: 382

I hacked something together which seems to work, although its a little ugly. tryRead now returns the next async to poll, which is either the old async event or a new one. I would like to remove the necessity to pass the async event around, but I guess this will do for now.

module Main where

import Control.Concurrent
import Control.Concurrent.Async

-- New async reader
main :: IO ()
main = mainLoop Nothing

-- Read stdin and echo forever
mainLoop :: Maybe (Async String) -> IO ()
mainLoop getl = do
  (res, getl') <- tryRead getl
  tryPrint res
  threadDelay 1000
  mainLoop (Just getl')

-- Try to read stdin without blocking
tryRead :: Maybe (Async String) -> IO (Maybe String, Async String)
tryRead Nothing = do
  recvr <- async getLine
  return (Nothing, recvr)
tryRead (Just recvr) = do
  res <- poll recvr
  case res of
    Nothing -> return (Nothing, recvr)
    Just (Right a) -> do recvr' <- async getLine
                         return (Just a, recvr')
    Just (Left e) -> error $ show e

-- Try to print string
tryPrint :: Maybe String -> IO ()
tryPrint (Just str) = print str
tryPrint Nothing = return ()

Upvotes: 0

Related Questions