Reputation: 5031
we have a scala application that read lines from text file and process them using Akka Stream. for better performance we set parallelism to 5. the problem is if the multiple lines contains the same email we only keep one of the line and treated others as duplicated and throw error. I tried to use a java concurrentHashMap to detect duplication but it didn't work, here is my code:
allIdentifiers = new ConcurrentHashMap[String, Int]()
Source(rows)
.mapAsync(config.parallelism.value) {
case (dataRow, index) => {
val eventResendResult: EitherT[Future, NonEmptyList[ResendError], ResendResult] =
for {
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
_ = logger.debug(
LoggingMessage(
requestId = RequestId(),
message = s"allIdentifiers: $allIdentifiers"
)
)
... more process step ...
} yield foldResponses(sent)
eventResendResult
.leftMap(errors => ResendResult(errors.toList, List.empty))
.merge
}
}
.runWith(Sink.reduce { (result1: ResendResult, result2: ResendResult) =>
ResendResult(
result1.errors ++ result2.errors,
result1.results ++ result2.results
)
})
we have config.parallelism.value set to 5, means any moment it'll process up to 5 lines concurrently. what I observed is if there are duplicated lines right next to each other, it didn't work, example:
line 0 contains email1
line 1 contains email1
line 2 contains email2
line 3 contains email2
line 4 contains email3
from the log i see the concurrentHashMap was populated with entries, but all lines passed the duplication detect and moved to the next process step. so Akka Stream's parallelism is not the same thing as java's multithreads? how can i detect duplicated line in this case?
Upvotes: 2
Views: 479
Reputation: 32319
The problem is in the following snippet:
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
In particular: imagine two threads simultaneously processing an email which should be deduplicated. It is possible for the following to happen (in order)
containsKey
and finds the email is not in the mapcontainsKey
and finds the email is not in the mapIn other words: you need to atomically check the map for the key and update it. This is a pretty common sort of thing to want, so it is exactly what ConcurrentHashMap
's put
does: it updates the value at the key and returns the previous value it replaced, if there was one.
I'm not too familiar with the combinators in Cats, so the following might not be idiomatic. However, note how it inserts and checks for a previous value in one atomic step.
cleanedRow <- EitherT(Future.successful {
val previous = allIdentifiers.put(dataRow.lift(emailIndex), index)
Either.cond(
previous != null,
dataRow,
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
)
})
Upvotes: 2