Reputation: 4326
I have a code that uses a file handle to simulate sink for the streaming Bytestring
from a source (AWS S3
). If we want to use Network.Websocket
as the sink, would it suffice to swap LBS.writeFile
in the code below with sendBinaryData
(with handle to connection)?
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text -> IO Int
getObject cfg bucket key = do
req <- waitCatch =<< async (runResourceT $ do
{- Create a request object with S3.getObject and run the request with pureAws. -}
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
{- Stream the response to a lazy bytestring -}
liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes
let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
return $ lookup "content-length" (S3.omUserMetadata mdata))
case req of
Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
Right _ -> return 0
The source of confusion for me is how the termination of the stream is determined? In case of files, this is taken care of by writeFile
API. What about sendBinaryData
? Does it handle termination in similar way as writeFile
? Or is it determined by the data parser on the client side?
Update
This question is about how to stream the data to a websocket handle (let us assume a handle has been provided) like we do with the file handle in the example above, not really about how to manage the handle within resourceT
. conduit
does seem to take mapM_
approach to sink data. So, it seems that is indeed the way to go.
The termination question is because of this line of thought I have: if we have a function listening for data on the other side of a Websocket handle, then determining end of message seems to matter in streaming context. Given a function like below:
f :: LBS.ByteString -> a
if we do S.mapM_
to stream the data to websocket handle, does it take care of adding some kind of end of stream
marker so that f
listening on the other side can stop processing the lazy bytestring. Otherwise f
won't know when the message is complete.
Upvotes: 2
Views: 666
Reputation: 2909
Here's a few bits and pieces that may make things more intelligible. First, for the first little demo, revising your getObject
, I use Streaming.ByteString.writeFile
, which is in ResourceT
anyway, to drop the detour by lazy bytestring.
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as HTTP
import qualified Data.ByteString.Streaming as SB
import qualified Data.ByteString.Streaming.Internal as SB
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
import qualified Network.WebSockets as WebSockets
import Control.Monad.Trans.Resource
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
, _aws_s3cfg :: S3.S3Configuration a
, _aws_httpmgr :: HTTP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text -> IO Int
getObject cfg file bucket key = do
req <- waitCatch =<< async (runResourceT $ do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
let bytestream = do
-- lookup "content-length" (S3.omUserMetadata mdata))
SB.chunk B.empty -- this will be replaced by content-length
hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk
SB.writeFile file bytestream ) -- this is in ResourceT
case req of
Left _ -> return 2
Right _ -> return 0
We can abstract from this more or less what you were doing with SB.writeFile
:
getObjectAbstracted
:: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
-> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
getObjectAbstracted action cfg bucket key = do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg)
(_aws_s3cfg cfg)
(_aws_httpmgr cfg)
(S3.getObject bucket key)
action (hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk)
Here now, we need a little helper not included in the streaming bytestring library
mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
mapMChunks_ act bytestream = do
(a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
return r
and can proceed more or less as @haoformayor planned, using streaming bytestring
writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
writeConnection connection =
mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)
-- following `haoformayor`
connectWrite
:: (MonadResource m, WebSockets.WebSocketsData a)
=> WebSockets.PendingConnection
-> a -- closing message
-> SB.ByteString m r -- stream from aws
-> m r
connectWrite request closeMessage bytestream = do
(releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
(`WebSockets.sendClose` closeMessage)
writeConnection connection bytestream
getObjectWS :: WebSockets.WebSocketsData a =>
WebSockets.PendingConnection
-> a
-> AwsConfig Aws.NormalQuery
-> S3.Bucket
-> Text
-> ResourceT IO ()
getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)
Of course none of this so far makes any use of the difference between conduit
and streaming
/streaming-bytestring
.
Upvotes: 1
Reputation: 52067
(Caveat - code not tested.)
Your code is re-opening the output file and appending to it every time a packet of data comes in. Clearly a better solution is to use LBS.hPutStr
to write to the file using an already open file handle.
That is, instead of:
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
you want to use:
S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
Of course, this references the handle h
, and where does that come from?
One solution is to pass it into getObject
or otherwise create it before calling the body of getObject
, e.g.:
getObject cfg bucket key = withFile "output" $ \h -> do
req <- ...
...
S.mapM_ (liftIO . (LBS.hPutStr h) . LBS.fromStrict) obj
...
Or maybe you have to creating inside the runResourceT... I'm not sure.
Update - See @haoformayor's answer for how to have ResourceT manage the file handle for you.
Upvotes: 1
Reputation: 10238
You are correct in thinking that handles will require additional trickery. However, as you are already using the ResourceT
monad transformer, this is delightfully simple to do with allocate
. allocate
allows you to create a handle in the resource monad and register a cleanup action (which in your case is just closing the connection).
ok <- runResourceT $ do
(releaseKey, handle) <-
allocate (WebSockets.acceptRequest request)
(`WebSockets.sendClose` closeMessage)
WebSockets.sendBinaryData handle data
return ok
where
request = ...
closeMessage = ...
data = ...
ok = ...
By using allocate
, the handle is guaranteed to close by the time runResourceT
returns ok
.
I am not entirely sure that this is what you want, however. It seems to me that getObject
should not know about how to accept and close WS connections; perhaps it should take a WS connection handle as an argument and then write to it. If you upgrade its return type to ResourceT
then you could charge the caller to getObject
with the responsibility of calling runResourceT
and allocating WS handles and so forth. But hopefully the example above is enough to get you going on your way.
Upvotes: 2