Reputation: 322
I have the following code that try to read a possibly incomplete data (image data, for example) from a network stream using usual MaybeBuilder:
let image = maybe {
let pos = 2 //Initial position skips 2 bytes of packet ID
let! width, pos = readStreamAsInt 2 pos
let! height, pos = readStreamAsInt 2 pos
let! data, pos = readStream (width*height) pos
advanceInStream pos
return {width = width; height = height; pixels = data}
So, readStream[asInt] [numBytes] [offset] function returns Some [data] or None if data has not arrived yet in a NetworkStream. advanceInStream function is executed when whole network packet is read.
I wonder if there is some way to write some custom computation expression builder to hide pos passing from its user, since it's always the same - I read some data and position in stream and pass it to the next read function as a last parameter.
P.S. MaybeBuilder used:
type MaybeBuilder() =
member x.Bind(d,f) = Option.bind f d
member x.Return d = Some d
member x.ReturnFrom d = d
member x.Zero() = None
let maybe = new MaybeBuilder()
On second thought it seems I have to make pos mutable, because of possible "for" or "while" loops in reading. Simple let! works fine with pos Bind shadowing, but you can't hold onto immutability if you add reading in a loop, right? The task becomes trivial then.
Upvotes: 7
Views: 671
Reputation: 11577
@bytebuster is making good points of maintainability about custom computation expressions but I still thought I demonstrate how to combine the State
and Maybe
monad into one.
In "traditional" languages we have good support for composing values such as integers but we run into problems when developing parsers (Producing values from a binary stream is essentially parsing). For parsers we would like to compose simple parser functions into more complex parser functions but here "traditional" languages often lack good support.
In functional languages functions are as ordinary as values and since values can be composed obviously functions can be as well.
First let's define a StreamReader
function. A StreamReader
takes a StreamPosition
(stream + position) and produces an updated StreamPosition
and a StreamReaderResult
(the read value or a failure).
type StreamReader<'T> =
StreamReader of (StreamPosition -> StreamPosition*StreamReaderResult<'T>)
(This is the most important step.)
We like to be able to compose simple StreamReader
functions into more complex ones. A very important property we want to maintain is that the compose operation is "closed" under StreamReader
meaning that result of composition is a new StreamReader
which in turn can be composed endlessly.
In order to read an image we need to read the width & height, compute the product and read the bytes. Something like this:
let readImage =
reader {
let! width = readInt32
let! height = readInt32
let! bytes = readBytes (width*height)
return width, height, bytes
Because of composition being closed readImage
is a StreamReader<int*int*byte[]>
In order to be able to compose StreamReader
like above we need to define a computation expression but before we can do that we need to define the operation Return
and Bind
for StreamReader
. It turns out Yield
is good to have as well.
module StreamReader =
let Return v : StreamReader<'T> =
StreamReader <| fun sp ->
sp, (Success v)
let Bind (StreamReader t) (fu : 'T -> StreamReader<'U>) : StreamReader<'U> =
StreamReader <| fun sp ->
let tsp, tr = t sp
match tr with
| Success tv ->
let (StreamReader u) = fu tv
u tsp
| Failure tfs -> tsp, Failure tfs
let Yield (ft : unit -> StreamReader<'T>) : StreamReader<'T> =
StreamReader <| fun sp ->
let (StreamReader t) = ft ()
t sp
is trivial as the StreamReader
should return the given value and don't update the StreamPosition
is a bit more challenging but describes how to compose two StreamReader
functions into a new one. Bind
runs the first StreamReader
function and checks the result, if it's a failure it returns a failure otherwise it uses the StreamReader
result to compute the second StreamReader
and runs that on the update stream position.
just creates the StreamReader
function and runs it. Yield
is used by F# when building computation expressions.
Finally let's create the computation expression builder
type StreamReaderBuilder() =
member x.Return v = StreamReader.Return v
member x.Bind(t,fu) = StreamReader.Bind t fu
member x.Yield(ft) = StreamReader.Yield ft
let reader = StreamReaderBuilder ()
Now we built the basic framework for combining StreamReader
functions. In addition we would we need to define the primitive StreamReader
Full example:
open System
open System.IO
// The result of a stream reader operation is either
// Success of value
// Failure of list of failures
type StreamReaderResult<'T> =
| Success of 'T
| Failure of (string*StreamPosition) list
and StreamPosition =
Stream : byte[]
Position : int
member x.Remaining = max 0 (x.Stream.Length - x.Position)
member x.ReadBytes (size : int) : StreamPosition*StreamReaderResult<byte[]> =
if x.Remaining < size then
x, Failure ["EOS", x]
let nsp = StreamPosition.New x.Stream (x.Position + size)
nsp, Success (x.Stream.[x.Position..(x.Position + size - 1)])
member x.Read (converter : byte[]*int -> 'T) : StreamPosition*StreamReaderResult<'T> =
let size = sizeof<'T>
if x.Remaining < size then
x, Failure ["EOS", x]
let nsp = StreamPosition.New x.Stream (x.Position + size)
nsp, Success (converter (x.Stream, x.Position))
static member New s p = {Stream = s; Position = p;}
// Defining the StreamReader<'T> function is the most important decision
// In this case a stream reader is a function that takes a StreamPosition
// and produces a (potentially) new StreamPosition and a StreamReadeResult
type StreamReader<'T> = StreamReader of (StreamPosition -> StreamPosition*StreamReaderResult<'T>)
// Defining the StreamReader CE
module StreamReader =
let Return v : StreamReader<'T> =
StreamReader <| fun sp ->
sp, (Success v)
let Bind (StreamReader t) (fu : 'T -> StreamReader<'U>) : StreamReader<'U> =
StreamReader <| fun sp ->
let tsp, tr = t sp
match tr with
| Success tv ->
let (StreamReader u) = fu tv
u tsp
| Failure tfs -> tsp, Failure tfs
let Yield (ft : unit -> StreamReader<'T>) : StreamReader<'T> =
StreamReader <| fun sp ->
let (StreamReader t) = ft ()
t sp
type StreamReaderBuilder() =
member x.Return v = StreamReader.Return v
member x.Bind(t,fu) = StreamReader.Bind t fu
member x.Yield(ft) = StreamReader.Yield ft
let reader = StreamReaderBuilder ()
let read (StreamReader sr) (bytes : byte[]) (pos : int) : StreamReaderResult<'T> =
let sp = StreamPosition.New bytes pos
let _, sr = sr sp
// Defining various stream reader functions
let readValue (converter : byte[]*int -> 'T) : StreamReader<'T> =
StreamReader <| fun sp -> sp.Read converter
let readInt32 = readValue BitConverter.ToInt32
let readInt16 = readValue BitConverter.ToInt16
let readBytes size : StreamReader<byte[]> =
StreamReader <| fun sp ->
sp.ReadBytes size
let readImage =
reader {
let! width = readInt32
let! height = readInt32
let! bytes = readBytes (width*height)
return width, height, bytes
let main argv =
// Sample byte stream
let bytes = [|2;0;0;0;3;0;0;0;1;2;3;4;5;6|] |> byte
let result = read readImage bytes 0
printfn "%A" result
Upvotes: 4