Manas
Manas

Reputation: 519

Akka persistentChannel does not delete message from Journal upon confirm

I am writing a piece of code that uses PersistentChannel to send a message to an actor that does some IO. Upon completion it confirms the ConfirmablePersistent message. The document says that upon confirmation the message shall be deleted in a PersistentChannel. But in my case my files stays in the journal with out getting deleted. My requirement is that as soon as I get a successful result for the IO or the deadline has exceeded the persisted message should be deleted from the journal.

class IOWorker(config: Config, ref: ActorRef) 
    extends Actor with ActorLogging {
  import IOWorker._
  val channel = context.actorOf(PersistentChannel.props(
      PersistentChannelSettings(redeliverInterval = 1.minute, 
           pendingConfirmationsMax = 1,pendingConfirmationsMin = 0)))  
  val doIOActor = context.actorOf(DOIOActor(config))    
  def receive = {
      case payload @ (msg, deadline)=> 
        channel ! Deliver(Persistent(payload), doIOActor.path)
  }
}

object DOIOActor {  
  def apply(config: Config) = Props(classOf[DOIOActor], config)
}
class DOIOActor(config: Config) extends Actor 
    with ActorLogging {

  def receive = {
    case p @ ConfirmablePersistent(payload, sequenceNr, redeliveries) =>      
      payload match {
          case (msg, deadline: Deadline) => 
            deadline.hasTimeLeft match {
              case false => p.confirm()
              case true =>              
                sender ! SAVED(msg)
                Try{DOIO}
                match 
                {
                  case Success(v) =>
                    sender ! SUCCESS(msg)
                    p.confirm()
                  case Failure(doioException) => 
                    log.warning(s"Could not complete DOIO. $doioException")
                    throw doioException
                }
      }
  }

}
  def DOIO(ftpClient: FTPClient, destination: String, file: AISData) = {    
    SOMEIOTASK match {
      case true => log.info(s"Storing file to $destination.")
      case false => 
        throw new Exception(s"Could not DOIO to destination $destination")
    }
  }
}

Upvotes: 0

Views: 535

Answers (1)

Roland Kuhn
Roland Kuhn

Reputation: 15472

Deletions are performed asynchronously by most journal implementations, as discussed on the mailing list.

Upvotes: 1

Related Questions