Reputation: 1412
I'm new to Akka Streams and I'm wondering how to implement some kind of mid-stream validation. Example:
FileIO
.fromPath(file)
.via(Framing.delimiter(...)
.map(_.utf8String)
.map(_.split("\t", -1))
.validate(arr => arr.length == 10) // or similar
...
I assumed that this scenario is so common that there must be a predefined functionality for validating a stream on the fly. However, I wasn't able to find anything about it. Am I on the wrong tracks here and validation is something that should not be done this way in Akka Streams?
In my particular scenario, I'm processing a file line by line. If only one single line is invalid, it does not make sense to continue and the processing should be aborted.
Upvotes: 2
Views: 507
Reputation: 9023
I agree with @Stephen that takeWhile
is what you need. You can use it with the inclusive
flag set to true if you want the failing elements to be passed downstream.
Also, if you want to make your stream the most expressive, you can have the validation flow producing Either[ValidationError, String]
.
The example below is a bit clunky, I would prefer to use the graphDSL and partition
, but hopefully you get the idea.
val errorSink: Sink[TooManyElements, _] = ???
val sink: Sink[Array[String], _] = ???
FileIO
.fromPath(file)
.via(Framing.delimiter(...))
.map(_.utf8String.split("\t", -1))
.map{
case arr if arr.length > 10 ⇒ Left(TooManyElements(arr.length))
case arr ⇒ Right(arr)
}
.takeWhile(_.isRight, inclusive = true)
.alsoTo(Flow[Either[TooManyElements, Array[String]]].filter(_.isLeft).to(errorSink)
.filter(_.isRight)
.to(sink)
Upvotes: 2
Reputation: 26579
I'd probably create a type to represent the constraints, then you can do the assertions when creating instances of that type, as well as know downstream which constraints have been applied.
Example:
object LineItem {
// Makes it possible to provide the validation before allocating the item
def apply(string: String): LineItem = {
require(string.length == 10)
new LineItem(string) // Call the companion-accessible constructor
}
}
// private[LineItem] makes sure that `new` only works from companion object
final case class LineItem private[LineItem](string: String)
Upvotes: 3
Reputation: 4296
You could use .takeWhile
. This will process all elements before the invalid item, and not process any items after it.
FileIO
.fromPath(file)
.via(Framing.delimiter(...)
.map(_.utf8String)
.map(_.split("\t", -1))
.takeWhile(arr => arr.length == 10)
...
Upvotes: 2