Luuk D. Jansen
Luuk D. Jansen

Reputation: 4506

Having an akka Source.queue and wait for jobs

I need a simple mechanism where 'jobs' are put into e.g a BlockingQueue and that these are processed with a trottle of e.g. 5 a second.

I was hoping to use akka Source.queue, but don't seem to grasp how to set this up so the system keeps working when the queue is empty (so a continual watch) as it just stops (and that is what the example work in the akka documentation). When encountering it previously I ended up creating my own custom Thread but in this case I need parallel processing.

What would be the pattern to create e.g. 'continuous' actor monitoring a queue? I am using Java and the Play! Framework.

[EDIT]: Added my code which seems to work now, but seems clumsy. To explain, a request to contact a device with updated data comes into the Actor. The Actor sees if there is already on in the 'requestQeueu' (when there is a change on the system, multiple update request come at the same time). If not, add it, and then schedule a connection (after a short interval to allow multiple changes to be included in the one communication). The connection is then put in the Source.queue.

private static final HashMap<Long, String> requestQueue = new HashMap<>();
private static final HashMap<Long, Date> lastDeviceConnection = new HashMap<>();

private final SourceQueueWithComplete<Device> sourceQueue;

private static final int timeBetweenConnectionsToDevice = 750;
private static final int timeToWaitBeforeConnecting = 250;

 @Inject
    ContactServer(ActorSystem system) {
        this.system = system;

        int bufferSize = 10;
        int elementsToProcess = 5;

        sourceQueue =
            Source.<Device>queue(bufferSize, OverflowStrategy.backpressure())
                .throttle(elementsToProcess, Duration.ofSeconds(1))
                .map(device -> {
                    String fromServer = null;
                    try {
                        String thisJson = requestQueue.get(device.getId());
                        requestQueue.remove(device.getId());
                        device.refresh();


                        Socket serviceSocket = new Socket(device.getIpAddress(), 7780);
                        
                        <!-- do communication -->

                        out.close();
                        in.close();
                        serviceSocket.close();

                        String data = encoding.decrypt(fromServer);

                        <!-- do processing -->
                    } catch (Exception e) {
                        logger.info("Problem contacting the device (server initiative): " + device.getName(), e);
                        if (fromServer != null)
                            logger.warn("The return data of the last error was: " + fromServer);
                    }
                    return device;
                })
                .to(Sink.foreach(x -> logger.debug("Processed: " + x)))
                .run(system);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Map.Entry.class, message -> {
                    final Device device = ((Map.Entry<Device, String>) message).getKey();
                    final String actionJson = ((Map.Entry<Device, String>) message).getValue();
                    try {

                        if (requestQueue.containsKey(device.getId())) {
                            if (actionJson != null && requestQueue.get(device.getId()) != null) {
                                requestQueue.put(device.getId(), requestQueue.get(device.getId()) + "," + actionJson);
                            } else if (actionJson != null) {
                                requestQueue.put(device.getId(), actionJson);
                            } else {
                                return;
                            }
                        } else {
                            requestQueue.put(device.getId(), actionJson);
                        }

                        long waitTime = timeToWaitBeforeConnecting;
                        if ((lastDeviceConnection.containsKey(device.getId()) && lastDeviceConnection.get(device.getId()).after(new Date(new Date().getTime() - timeBetweenConnectionsToDevice)))) {
                            long timeBeforeNextDeviceConnection = new Date().getTime() - lastDeviceConnection.get(device.getId()).getTime() - timeBetweenConnectionsToDevice;

                            if(timeBeforeNextDeviceConnection > waitTime) waitTime = timeBeforeNextDeviceConnection;
                        }

                        system.scheduler().scheduleOnce(scala.concurrent.duration.Duration.create(waitTime, TimeUnit.MILLISECONDS), () -> {
                            sourceQueue.offer(device);
                        },context().dispatcher());
                    } catch (Exception e) {
                        logger.info("Exception scheduling the device connection (server initiative): " + device.getName(), e);
                    } finally {
                        context().stop(self());
                    }
                }).build();
    }

Upvotes: 1

Views: 193

Answers (1)

artur
artur

Reputation: 1760

So, I am not sure I fully understand your requirements. Especially around timing as you are using two mechanisms:

  1. using scheduling
  2. using throttling on the queue

I also see how you're aggregating actions for particular deviceId in a HashMap. If the number of possible deviceIds is "reasonable" then I think you can leverage Akka Streams Api to do those things for you.

I am assuming following:

  1. You don't want to process more than 5 messages per second
  2. You don't want to connect to perform actions for particular device more often than once per second.
  3. You want to aggregate actions for the same device as in your code snipped at the beginning of the handler.
  4. All possible device ids are in order of thousands as opposed to millions (see groupBy documentation).
sourceQueue =
      Source.<Map.Entry<Device, String>>queue(bufferSize,
                                              OverflowStrategy.backpressure())
        .throttle(elementsToProcess, Duration.ofSeconds(1))
        .groupBy(1000, Map.Entry::getKey) //max 1000 devices
        .conflate(
          (aggregate, d) ->
            new AbstractMap.SimpleEntry<>(d.getKey(),
                                          aggregate.getValue() +
                                          "," + d.getValue()))
        .throttle(1, Duration.ofSeconds(1))
        .map(deviceEntry -> {
          //here goes the connection part, but DON'T use blocking IO, 
          //use separate threadpool with mapAsync instead
          ...
        })
        .async()
        .mergeSubstreams()
        .to(Sink.foreach(x -> logger.debug("Processed: " + x)))
        .run(system);

Then in your recieve you only call offer on the queue. You also should check the result of the offer() call to check if you were successful or if you got backpressured in which case you need to either back off or drop the item or buffer it.

Upvotes: 1

Related Questions