Reputation: 3013
I would like to implement a function that returns a transducer that waits for a block of values to be "emitted".
The function I have in mind would have the following signature:
/**
* The `Process1` which awaits the next "effect" to occur and passes all values emitted by
* this effect to `rcv` to determine the next state.
*/
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???
My understanding is that I could then use this function to implement the following function which I think would be quite useful:
/**
* Groups inputs into chunks of dynamic size based on the various effects
* that back emitted values.
*
* @example {{{
* val numberTask = Task.delay(1)
* val listOfNumbersTask = Task.delay(List(5,6,7))
* val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
* sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
* }}}
*/
def chunkByEffect[I]: Process1[I, Vector[I]] = {
receiveBlock(vec => emit(vec) ++ chunkByEffect)
}
My ultimate objective (slightly simplified) is to implement the following function:
/**
* Transforms a stream of audio into a stream of text.
*/
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]
The function makes an external call to a voice recognition service. Thus it is unreasonable to make a network call for every single Byte
in the stream. I need to chunk bytes together before making a network call. I could make audio
a Process[Task, ByteVector]
but that would require testing code to know the maximum chunk size that the function supports, I would rather that be managed by the function itself. Also, when this service is being used inside of a service, the service will be itself receiving network calls with a given size of audio, I would like for the chunkXXX
function to be smart about chunking so that it does not hold onto data that is already available.
Basically, the stream of audio coming from the network will have the form Process[Task, ByteVector]
and will be translated into a Process[Task, Byte]
by flatMap(Process.emitAll(_))
. However, the test code will directly produce a Process[Task, Byte]
and feed that into voiceRecognition
. In theory, I believe it should be possible given the appropriate combinator to provide an implementation of voiceRecognition
that does the right thing with both these streams and I think the chunkByEffect
function described above is the key to that. I realize now that I would need the chunkByEffect function to have min
and max
parameter that specifies the minimum and maximum size of chunking irrespective of the underlying Task
producing the bytes.
Upvotes: 3
Views: 72
Reputation: 3013
I guess the answer at this point is that this is really hard or impossible to accomplish in scalaz-stream
proper. The new version of this library is called fs2
and it has first-class support for "chunking" which is basically what I was looking for here.
Upvotes: 0
Reputation: 864
You need to have your bytes separated somehow. I suggest to work with some higher level abstraction over stream of Bytes, i.e. ByteVector.
Then you will have perhaps to do manual process1, that is implemented similarly like process1.chunkBy
only it operates on ByteVector. i.e.
def chunkBy(separator:ByteVector): Process1[ByteVector, ByteVector] = {
def go(acc: ByteVector): Process1[ByteVector, ByteVector] =
receive1Or[ByteVector,ByteVector](emit(acc)) { i =>
// implement searching of separator in accumulated + new bytes
???
}
go(ByteVector.empty)
}
Then this will hook up everything together
val speech: Process[Task,ByteVector] = ???
def chunkByWhatever: Process1[ByteVector,ByteVector] = ???
val recognizer: Channel[Task,ByteVector,String] = ???
//this shall do the trick
speech.pipe(chunkByWhatever).through(recognizer)
Upvotes: 1