Reputation: 8043
I have an actor based system in which I am reading an external file sitting in an S3 bucket and moving taking each of the file lines and sending it over to another actor that processes that particular line. What I am trouble understanding is what happens when an exception is thrown while reading the file.
My code is as follows:
import akka.actor._
import akka.actor.ActorSystem
class FileWorker(processorWorker: ActorRef) extends Actor with ActorLogging {
val fileUtils = new S3Utils()
private def processFile(fileLocation: String): Unit = {
try{
fileUtils.getLinesFromLocation(fileLocation).foreach {
r =>
{
//Some processing happens for the line
}
}
}
}
}catch{
case e:Exception => log.error("Issue processing files from the following location %s".format(fileLocation))
}
}
def receive() = {
case fileLocation: String => {
processFile(fileLocation)
}
}
}
In my S3Utils
class I have defined the getLinesFromLocation
method as follows:
def getLinesFromLocation(fileLocation: String): Iterator[String] = {
try{
for {
fileEntry <- getFileInfo(root,fileLocation)
} yield fileEntry
}catch{
case e:Exception => logger.error("Issue with file location %s: %s".format(fileLocation,e.getStackTraceString));throw e
}
}
The method where I am actually reading the file is defined in the private method getFileInfo
private def getFileInfo(rootBucket: String,fileLocation: String): Iterator[String] = {
implicit val codec = Codec(Codec.UTF8)
codec.onMalformedInput(CodingErrorAction.IGNORE)
codec.onUnmappableCharacter(CodingErrorAction.IGNORE)
Source.fromInputStream(s3Client.
getObject(rootBucket,fileLocation).
getObjectContent()).getLines
}
I have written the above pieces with the assumption that the underlying file sitting on S3 will not be cached anywhere and I will be simply iterating through the individual lines in constant space and processing them. In case there's an issue with reading a particular line, the iterator will move on without affecting the Actor.
My first question would be, is my understanding of iterators correct? In all actuality, am I actually reading the lines from the underlying file system(in this case the S3 bucket) without applying any pressure to the memory/or introducing any memory leaks.
The next question would be, if the iterator encounters an error while reading an individual entry, does the entire process of iteration is killed or it moves on to the next entry.
My last question would be, is my file-processing logic written correctly?
It will be great to get some insights into this.
Thanks
Upvotes: 0
Views: 276
Reputation: 3365
Looks like amazon s3 has no async implementation and we are stuck with pinned actors. So your implementation is correct, providing you allocate a thread per connection and will not block input, and will not use too many connections.
Important steps to take:
1) processFile should not block current thread. Preferably it should delegate it's input to the another actor:
private def processFile(fileLocation: String): Unit = {
...
fileUtils.getLinesFromLocation(fileLocation).foreach { r =>
lineWorker ! FileLine(fileLocation, r)
}
...
}
2) Make FileWorker
a pinned actor:
## in application.config:
my-pinned-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
// in the code:
val fileWorker = context.actorOf(Props(classOf[FileWorker], lineWorker).withDispatcher("my-pinned-dispatcher"), "FileWorker")
if the iterator encounters an error while reading an individual entry, does the entire process of iteration is killed?
yes, your entire process will be killed and the actor will take the next job from it's mailbox.
Upvotes: 2