gstackoverflow
gstackoverflow

Reputation: 36988

Akka official tutorial: Parent actor doesn't receive termination message when child is stopped with PoisonPill

I am reading an official Akka tutorial. I repeat the actions from the tutorial but I get another result.

My source code:

Device group:

package com.lightbend.akka.sample.iot;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.event.Logging;
import akka.event.LoggingAdapter;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class DeviceGroup extends AbstractActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    final String groupId;
    final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
    final Map<ActorRef, String> actorToDeviceId = new HashMap<>();

    public DeviceGroup(String groupId) {
        this.groupId = groupId;
    }

    public static Props props(String groupId) {
        return Props.create(DeviceGroup.class, groupId);
    }


    @Override
    public void preStart() {
        log.info("DeviceGroup {} started", groupId);
    }

    @Override
    public void postStop() {
        log.info("DeviceGroup {} stopped", groupId);
    }

    private void onTrackDevice(Device.RequestTrackDevice trackMsg) {
        if (this.groupId.equals(trackMsg.groupId)) {
            ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
            if (deviceActor != null) {
                deviceActor.forward(trackMsg, getContext());
            } else {
                log.info("Creating device actor for {}", trackMsg.deviceId);
                deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
                deviceIdToActor.put(trackMsg.deviceId, deviceActor);
                actorToDeviceId.put(deviceActor, trackMsg.deviceId);
                deviceActor.forward(trackMsg, getContext());
            }
        } else {
            log.warning(
                    "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
                    groupId, this.groupId
            );
        }
    }

    private void onTerminated(Terminated t) {
        ActorRef deviceActor = t.getActor();
        String deviceId = actorToDeviceId.get(deviceActor);
        log.info("Device actor for {} has been terminated", deviceId);
        actorToDeviceId.remove(deviceActor);
        deviceIdToActor.remove(deviceId);
    }
    private void onDeviceList(RequestDeviceList r) {
        getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Device.RequestTrackDevice.class, this::onTrackDevice)
                .match(Terminated.class, this::onTerminated)
                .match(RequestDeviceList.class, this::onDeviceList)
                .build();
    }

    public static final class RequestDeviceList {
        final long requestId;

        public RequestDeviceList(long requestId) {
            this.requestId = requestId;
        }
    }

    public static final class ReplyDeviceList {
        final long requestId;
        final Set<String> ids;

        public ReplyDeviceList(long requestId, Set<String> ids) {
            this.requestId = requestId;
            this.ids = ids;
        }
    }
}

Device:

package com.lightbend.akka.sample.iot;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;

import java.util.Optional;

public class Device extends AbstractActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    final String groupId;

    final String deviceId;

    public Device(String groupId, String deviceId) {
        this.groupId = groupId;
        this.deviceId = deviceId;
    }

    public static Props props(String groupId, String deviceId) {
        return Props.create(Device.class, groupId, deviceId);
    }


    Optional<Double> lastTemperatureReading = Optional.empty();

    @Override
    public void preStart() {
        log.info("Device actor {}-{} started", groupId, deviceId);
    }

    @Override
    public void postStop() {
        log.info("Device actor {}-{} stopped", groupId, deviceId);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(RequestTrackDevice.class, r -> {
                    if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) {
                        getSender().tell(new DeviceRegistered(), getSelf());
                    } else {
                        log.warning(
                                "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
                                r.groupId, r.deviceId, this.groupId, this.deviceId
                        );
                    }
                })
                .match(ReadTemperature.class, r -> {
                    getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf());
                })
                .match(RecordTemperature.class, r -> {
                    log.info("Recorded temperature reading {} with {}", r.value, r.requestId);
                    lastTemperatureReading = Optional.of(r.value);
                    getSender().tell(new TemperatureRecorded(r.requestId), getSelf());
                })
                .build();
    }

    //temperature request
    public static final class ReadTemperature {
        long requestId;

        public ReadTemperature(long requestId) {
            this.requestId = requestId;
        }
    }

    //temperature response
    public static final class RespondTemperature {
        long requestId;
        Optional<Double> value;

        public RespondTemperature(long requestId, Optional<Double> value) {
            this.requestId = requestId;
            this.value = value;
        }
    }

    //==============
    public static final class RecordTemperature {
        final long requestId;
        final double value;

        public RecordTemperature(long requestId, double value) {
            this.requestId = requestId;
            this.value = value;
        }
    }

    public static final class TemperatureRecorded {
        final long requestId;

        public TemperatureRecorded(long requestId) {
            this.requestId = requestId;
        }
    }

    //===================
    public static final class RequestTrackDevice {
        public final String groupId;
        public final String deviceId;

        public RequestTrackDevice(String groupId, String deviceId) {
            this.groupId = groupId;
            this.deviceId = deviceId;
        }
    }

    public static final class DeviceRegistered {
    }
}

I try to run the following test:

 @Test
 public void testListActiveDevicesAfterOneShutsDown() {
     TestKit probe = new TestKit(system);
     ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

     groupActor.tell(new Device.RequestTrackDevice("group", "device1"), probe.getRef());
     probe.expectMsgClass(Device.DeviceRegistered.class);
     ActorRef toShutDown = probe.getLastSender();

     groupActor.tell(new Device.RequestTrackDevice("group", "device2"), probe.getRef());
     probe.expectMsgClass(Device.DeviceRegistered.class);

     groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
     DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
     assertEquals(0L, reply.requestId);
     assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);

     probe.watch(toShutDown);
     toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
     probe.expectTerminated(toShutDown);

     // using awaitAssert to retry because it might take longer for the groupActor
     // to see the Terminated, that order is undefined
     probe.awaitAssert(Duration.fromNanos(10_000_000_000L),() -> {
         groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
         DeviceGroup.ReplyDeviceList r =
                 probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
         assertEquals(1L, r.requestId);
         assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
         return null;
     });
 }

In the logs I see the following:

[INFO] [04/09/2018 17:56:31.976] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] DeviceGroup group started
[INFO] [04/09/2018 17:56:31.977] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a] Creating device actor for device1
[INFO] [04/09/2018 17:56:31.979] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a/device-device1] Device actor group-device1 started
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a] Creating device actor for device2
[INFO] [04/09/2018 17:56:31.983] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device2] Device actor group-device2 started
[INFO] [04/09/2018 17:56:31.992] [default-akka.actor.default-dispatcher-2] [akka://default/user/$a/device-device1] Device actor group-device1 stopped

java.lang.AssertionError: 
Expected :[device2]
Actual   :[device1, device2]

I tried to debug it and found that the com.lightbend.akka.sample.iot.DeviceGroup#onTerminated method was not invoked.

What is wrong with that code?

Upvotes: 1

Views: 293

Answers (1)

Frederic A.
Frederic A.

Reputation: 3514

You don't watch the actor, so you don't get notified when it dies.

The linked tutorial contains the following line of code that you didn't include in your code above, this is most likely why your onTerminated is not called:

getContext().watch(deviceActor);

Upvotes: 3

Related Questions