ceran
ceran

Reputation: 1412

Akka Streams: validation of elements being streamed

I'm new to Akka Streams and I'm wondering how to implement some kind of mid-stream validation. Example:

FileIO
  .fromPath(file)
  .via(Framing.delimiter(...)
  .map(_.utf8String)
  .map(_.split("\t", -1))
  .validate(arr => arr.length == 10) // or similar
  ...

I assumed that this scenario is so common that there must be a predefined functionality for validating a stream on the fly. However, I wasn't able to find anything about it. Am I on the wrong tracks here and validation is something that should not be done this way in Akka Streams?

In my particular scenario, I'm processing a file line by line. If only one single line is invalid, it does not make sense to continue and the processing should be aborted.

Upvotes: 2

Views: 507

Answers (3)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

I agree with @Stephen that takeWhile is what you need. You can use it with the inclusive flag set to true if you want the failing elements to be passed downstream.

Also, if you want to make your stream the most expressive, you can have the validation flow producing Either[ValidationError, String].

The example below is a bit clunky, I would prefer to use the graphDSL and partition, but hopefully you get the idea.

  val errorSink: Sink[TooManyElements, _] = ???
  val sink: Sink[Array[String], _] = ???

  FileIO
    .fromPath(file)
    .via(Framing.delimiter(...))
    .map(_.utf8String.split("\t", -1))
    .map{
      case arr if arr.length > 10 ⇒ Left(TooManyElements(arr.length))
      case arr                    ⇒ Right(arr)
    }
    .takeWhile(_.isRight, inclusive = true)
    .alsoTo(Flow[Either[TooManyElements, Array[String]]].filter(_.isLeft).to(errorSink)
    .filter(_.isRight)
    .to(sink)

Upvotes: 2

Viktor Klang
Viktor Klang

Reputation: 26579

I'd probably create a type to represent the constraints, then you can do the assertions when creating instances of that type, as well as know downstream which constraints have been applied.

Example:

object LineItem {
  // Makes it possible to provide the validation before allocating the item
  def apply(string: String): LineItem = {
    require(string.length == 10)
    new LineItem(string) // Call the companion-accessible constructor
  }
}
// private[LineItem] makes sure that `new` only works from companion object
final case class LineItem private[LineItem](string: String)

Upvotes: 3

Stephen
Stephen

Reputation: 4296

You could use .takeWhile. This will process all elements before the invalid item, and not process any items after it.

FileIO
  .fromPath(file)
  .via(Framing.delimiter(...)
  .map(_.utf8String)
  .map(_.split("\t", -1))
  .takeWhile(arr => arr.length == 10)
  ...

Upvotes: 2

Related Questions