user468587
user468587

Reputation: 5031

how to detect duplicated line using scala akka stream

we have a scala application that read lines from text file and process them using Akka Stream. for better performance we set parallelism to 5. the problem is if the multiple lines contains the same email we only keep one of the line and treated others as duplicated and throw error. I tried to use a java concurrentHashMap to detect duplication but it didn't work, here is my code:

allIdentifiers = new ConcurrentHashMap[String, Int]()   
Source(rows)
  .mapAsync(config.parallelism.value) {
    case (dataRow, index) => {

      val eventResendResult: EitherT[Future, NonEmptyList[ResendError], ResendResult] =
        for {

          cleanedRow <- EitherT.cond[Future](
            !allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
              allIdentifiers.put(dataRow.lift(emailIndex),index)
              dataRow
            }, {
              NonEmptyList.of(
                DuplicatedError(
                  s"Duplicated record at row $index",
                  List(identifier)
                )
              )
            }
          )

          _ = logger.debug(
            LoggingMessage(
              requestId = RequestId(),
              message = s"allIdentifiers: $allIdentifiers"
            )
          )

          ... more process step ...
        } yield foldResponses(sent)

      eventResendResult
        .leftMap(errors => ResendResult(errors.toList, List.empty))
        .merge
    }
  }
  .runWith(Sink.reduce { (result1: ResendResult, result2: ResendResult) =>
    ResendResult(
      result1.errors ++ result2.errors,
      result1.results ++ result2.results
    )
  })

we have config.parallelism.value set to 5, means any moment it'll process up to 5 lines concurrently. what I observed is if there are duplicated lines right next to each other, it didn't work, example:

line 0 contains email1
line 1 contains email1
line 2 contains email2
line 3 contains email2
line 4 contains email3

from the log i see the concurrentHashMap was populated with entries, but all lines passed the duplication detect and moved to the next process step. so Akka Stream's parallelism is not the same thing as java's multithreads? how can i detect duplicated line in this case?

Upvotes: 2

Views: 479

Answers (1)

Alec
Alec

Reputation: 32319

The problem is in the following snippet:

cleanedRow <- EitherT.cond[Future](
  !allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
    allIdentifiers.put(dataRow.lift(emailIndex),index)
    dataRow
  }, {
    NonEmptyList.of(
      DuplicatedError(
        s"Duplicated record at row $index",
        List(identifier)
      )
    )
  }
)

In particular: imagine two threads simultaneously processing an email which should be deduplicated. It is possible for the following to happen (in order)

  1. The first thread checks containsKey and finds the email is not in the map
  2. The second thread checks containsKey and finds the email is not in the map
  3. The first thread adds the email to the map (based on results from step 1.) and passes the email through
  4. The second thread adds the email to the map (based on results from step 3.) and passes the email through

In other words: you need to atomically check the map for the key and update it. This is a pretty common sort of thing to want, so it is exactly what ConcurrentHashMap's put does: it updates the value at the key and returns the previous value it replaced, if there was one.

I'm not too familiar with the combinators in Cats, so the following might not be idiomatic. However, note how it inserts and checks for a previous value in one atomic step.

cleanedRow <- EitherT(Future.successful {
  val previous = allIdentifiers.put(dataRow.lift(emailIndex), index)
  Either.cond(
    previous != null,
    dataRow,
    NonEmptyList.of(
      DuplicatedError(
        s"Duplicated record at row $index",
        List(identifier)
      )
    )
  )
})

Upvotes: 2

Related Questions