blue-sky
blue-sky

Reputation: 53806

Message not being sent when Akka actor is killed using DeathWatch

I'm attempting to send a message when an actor is killed.

This is based on Akka deathwatch documentation : http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java

In serviceActor I'm awaiting a "kill" message but I'm never actually sending this message. So to receive the message in ServiceActor I use :

else if (msg instanceof Terminated) {
        final Terminated t = (Terminated) msg;
        if (t.getActor() == child) {
            lastSender.tell(Msg.TERMINATED, getSelf());
        }
    } else {
        unhandled(msg);
    }

I've set the duration to 10 milliseconds :

Duration.create(10, TimeUnit.MILLISECONDS)

But the message Msg.TERMINATED is never received in onReceive method :

@Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }

How can I send a message to HelloWorld when ServiceActor fails ?

Entire code :

package terminatetest;
import akka.Main;

public class Launcher {

    public static void main(String args[]) {

        String[] akkaArgsArray = new String[1];

        akkaArgsArray[0] = "terminatetest.HelloWorld";

        Main.main(akkaArgsArray);

    }

}

package terminatetest;


import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart() {

        int counter = 0;

        akka.actor.ActorSystem system = getContext().system();

        final ActorRef greeter = getContext().actorOf(
                Props.create(ServiceActor.class), String.valueOf(counter));

        system.scheduler().scheduleOnce(
                Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
                    public void run() {
                        greeter.tell(PoisonPill.getInstance(), getSelf());
                    }
                }, system.dispatcher());

        greeter.tell("http://www.google.com", getSelf());

        counter = counter + 1;
    }

    @Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }
}

package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

    final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
    {
        this.getContext().watch(child);
    }

    ActorRef lastSender = getContext().system().deadLetters();

    public static enum Msg {
        SUCCESS, FAIL, TERMINATED;
    }

    @Override
    public void onReceive(Object msg) {

        if (msg instanceof String) {
            String urlName = (String) msg;

            try {
                long startTime = System.currentTimeMillis();
                URL url = new URL(urlName);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.connect();

                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder out = new StringBuilder();
                String line;
                while ((line = reader.readLine()) != null) {
                    out.append(line);
                }
                System.out.println("Connection successful to " + url);
                System.out.println("Content is " + out);
                long endTime = System.currentTimeMillis();
                System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

            } catch (MalformedURLException mue) {
                println("URL Name " + urlName);
                System.out.println("MalformedURLException");
                System.out.println(mue.getMessage());
                mue.printStackTrace();
                getSender().tell(Msg.FAIL, getSelf());
            } catch (IOException ioe) {
                println("URL Name " + urlName);
                System.out.println("IOException");
                System.out.println(ioe.getMessage());
                ioe.printStackTrace();
                System.out.println("Now exiting");
                getSender().tell(Msg.FAIL, getSelf());
            }
        }

        else if (msg instanceof Terminated) {
                final Terminated t = (Terminated) msg;
                if (t.getActor() == child) {
                    lastSender.tell(Msg.TERMINATED, getSelf());
                }
            } else {
                unhandled(msg);
            }
    }

}

Update : I'm now initiating the poisonPill from the child actor itself using :

Update to ServiceActor :

if (urlName.equalsIgnoreCase("poisonPill")) {   
    this.getSelf().tell(PoisonPill.getInstance(), getSelf());
    getSender().tell(Msg.TERMINATED, getSelf());
}

Update to HelloWorld :

system.scheduler().scheduleOnce(
        Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
            public void run() {
                greeter.tell("poisonPill", getSelf());
            }
        }, system.dispatcher());

This displays following output :

startTime : 1412777375414
Connection successful to http://www.google.com
Content is ....... (I'veremoved the content for brevity)
Total Time : 1268 milliseconds
Terminated

The poisonPill message is sent after 10 milliseconds and for this example the actor lives for 1268 milliseconds. So why is the actor not terminating when the poisonPill is sent ? Is this because the timings are so short ?

Updated code :

package terminatetest;


import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart() {

        int counter = 0;

        akka.actor.ActorSystem system = getContext().system();

        final ActorRef greeter = getContext().actorOf(
                Props.create(ServiceActor.class), String.valueOf(counter));

        system.scheduler().scheduleOnce(
                Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
                    public void run() {
                        greeter.tell("poisonPill", getSelf());
                    }
                }, system.dispatcher());

        greeter.tell("http://www.google.com", getSelf());

        counter = counter + 1;
    }

    @Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }
}


package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

    ActorRef lastSender = getSender();

    public static enum Msg {
        SUCCESS, FAIL, TERMINATED;
    }

    @Override
    public void onReceive(Object msg) {

        if (msg instanceof String) {
            String urlName = (String) msg;

            if (urlName.equalsIgnoreCase("poisonPill")) {   
                this.getSelf().tell(PoisonPill.getInstance(), getSelf());
                getSender().tell(Msg.TERMINATED, getSelf());
            }

            else {

                try {
                    long startTime = System.currentTimeMillis();
                    System.out.println("startTime : "+startTime);
                    URL url = new URL(urlName);
                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                    conn.connect();

                    BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                    StringBuilder out = new StringBuilder();
                    String line;
                    while ((line = reader.readLine()) != null) {
                        out.append(line);
                    }
                    System.out.println("Connection successful to " + url);
                    System.out.println("Content is " + out);
                    long endTime = System.currentTimeMillis();
                    System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

                } catch (MalformedURLException mue) {
                    println("URL Name " + urlName);
                    System.out.println("MalformedURLException");
                    System.out.println(mue.getMessage());
                    mue.printStackTrace();
                    getSender().tell(Msg.FAIL, getSelf());
                } catch (IOException ioe) {
                    println("URL Name " + urlName);
                    System.out.println("IOException");
                    System.out.println(ioe.getMessage());
                    ioe.printStackTrace();
                    System.out.println("Now exiting");
                    getSender().tell(Msg.FAIL, getSelf());
                }
            }
        }
    }

}

Upvotes: 0

Views: 1531

Answers (1)

cmbaxter
cmbaxter

Reputation: 35443

I think your problem stems from the fact that you only set lastSender once, during construction of the ServiceActor, and you explicitly set it to deadletter. If you want to send a message back to the actor that sent you the String message, then you will need to set lastSender to that sender(). Failure to do so will result in your Msg.TERMINATED always going to deadletter.

EDIT

I see the real issue here now. In the HelloWorld actor, you are sending a PoisonPill to the ServiceActor. The ServiceActor will stop itself as a result, thus stopping the child ref too (as it's a child actor to ServiceActor). At this point, you would think the Terminated message would be delivered to ServiceActor because it explicitly watches child (and it probably does get delivered), but you've already sent a PoisonPill to ServiceActor so it will not process any messages received after that message (which would be the Terminate) so that's why the block:

else if (msg instanceof Terminated) {

is never hit in ServiceActor.

EDIT2

Your actor receives the request to hit google first and receives the "poisonPill" message second (10 milliseconds later). As an actor processes it's mailbox in order, the actor fully processes the request to hit google before it processes the message to stop itself. That's why the actor doesn't stop after 10 milliseconds. You can't stop an actor in the middle of what it's doing.

Upvotes: 1

Related Questions