hotzst
hotzst

Reputation: 7526

Implications of weakly consistent ConcurrentSkipListSet

Using a ConcurrentSkipListSet I have observed some wired behaviour, that I suspect is caused by the weakly consistency of the concurrent set.

The JavaDoc has this to say on that topic:

Most concurrent Collection implementations (including most Queues) also differ from the usual java.util conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:

  • they may proceed concurrently with other operations
  • they will never throw ConcurrentModificationException
  • they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

This is the code that I use:

private final ConcurrentSkipListSet<TimedTask> sortedEvents;

public TimedUpdatableTaskList(){
    Comparator<TimedTask> comparator = 
        (task1, task2) -> task1.getExecutionTime().compareTo(task2.getExecutionTime());
    sortedEvents = new ConcurrentSkipListSet<>(comparator);
}

public void add(TimedTask task) {
    log.trace("Add task {}", task);
    sortedEvents.add(task);
}

public void handleClockTick(ClockTick event) {
    LocalDateTime now = date.getCurrentDate();
    logContent("Task list BEFORE daily processing ("+now+")");
    for (Iterator<TimedTask> iterator = sortedEvents.iterator(); iterator.hasNext();) {
        TimedTask task = iterator.next();
        Preconditions.checkNotNull(task.getExecutionTime(),
                "The exectution time of the task may not be null");
        if (task.getExecutionTime().isBefore(now)) {
            log.trace("BEFORE: Execute task {} scheduled for {} on {}",
                    task, task.getExecutionTime(), now);
            try {
                task.run();
                iterator.remove();
            } catch (Exception e) {
                log.error("Failed to execute timed task", e);
            }
            log.trace("AFTER: Execute task {} scheduled for {} on {}",
                    task, task.getExecutionTime(), now);
        }
        if (task.getExecutionTime().isAfter(now)) {
            break; // List is sorted
        }
    }
    logContent("Task list AFTER daily processing");
}

private void logContent(String prefix) {
    StringBuilder sb = new StringBuilder();
    sortedEvents.stream().forEach(task ->sb.append(task).append(" "));
    log.trace(prefix + ": "+sb.toString());
}

At occasion I can see log output like this:

2018-05-19 13:46:00,453 [pool-3-thread-1] TRACE ... - Add task AIRefitTask{ship=Mercurius, scheduled for: 1350-07-16T08:45}
2018-05-19 13:46:00,505 [pool-3-thread-5] TRACE ... - Task list BEFORE daily processing (1350-07-16T09:45): AIRefitTask{ship=Tidewalker, scheduled for: 1350-07-16T08:45} AIRepairTask{ship=Hackepeter, scheduled for: 1350-07-16T13:45} ch.sahits.game.openpatrician.engine.event.task.WeaponConstructionTask@680da167 ch.sahits.game.openpatrician.engine.player.DailyPlayerUpdater@6e22f1ba AIRepairTask{ship=St. Bonivatius, scheduled for: 1350-07-17T03:45} AIRepairTask{ship=Hackepeter, scheduled for: 1350-07-17T05:45} ch.sahits.game.openpatrician.engine.event.task.WeeklyLoanerCheckTask@47571ace 

These are two almost consecutive log lines. Please note that they are executed on different threads. The TimedTask entry that is added is not listed in the second log line.

Am I correct in my assumption that this is due to the weakly consistency? If so, would this also imply that the iterator.next() retrieves a different entry than iterator.remove() deletes?

What I am observing, is that this added entry is never processed and does not show up in the concurrent set at any time.

What would be a good solution to avoid this? What comes to my mind, is create a copy of the set and iterate over that one, as it is acceptable, that entries can be processed in a future iteration, as long as they are processed. Looking at Weakly consistent iterator by ConcurrentHashMap suggests the iteration already happens on a copy of the set, so this might not change anything.

EDIT Sample implementation of a TimedTask:

class AIRefitTask extends TimedTask {

    private static final Logger LOGGER = LogManager.getLogger(AIRefitTask.class);

    private AsyncEventBus clientServerEventBus;

    private ShipWeaponsLocationFactory shipWeaponLocationFactory;

    private ShipService shipService;

    private final IShip ship;
    private final EShipUpgrade level;
    private final IShipyard shipyard;

    public AIRefitTask(LocalDateTime executionTime, IShip ship, EShipUpgrade upgrade, IShipyard shipyard) {
        super();
        setExecutionTime(executionTime);
        LOGGER.debug("Add AIRefitTask for {} to be done at {}", ship.getName(), executionTime);
        this.ship = ship;
        this.level = upgrade;
        this.shipyard = shipyard;
    }

    @Override
    public void run() {
        EShipUpgrade currentLevel = ship.getShipUpgradeLevel();
        while (currentLevel != level) {
            ship.upgrade();
            List<IWeaponSlot> oldWeaponSlots = ship.getWeaponSlots();
            List<IWeaponSlot> newWeaponSlots = shipWeaponLocationFactory.getShipWeaponsLocation(ship.getShipType(), level);
            ship.setWeaponSlots(newWeaponSlots);
            for (IWeaponSlot slot : oldWeaponSlots) {
                if (slot.getWeapon().isPresent()) {
                    EWeapon weapon = (EWeapon) slot.getWeapon().get();
                    if (slot instanceof SecondaryLargeWeaponSlot) {
                        if (!shipService.isLargeWeapon(weapon)) { // ignore large weapons in secondary slots
                            shipService.placeWeapon(weapon, ship);
                        }
                    } else {
                        // Not secondary slot
                        shipService.placeWeapon(weapon, ship);
                    }
                }
            }
            currentLevel = ship.getShipUpgradeLevel();
        }
        ship.setAvailable(true);
        shipyard.removeCompletedUpgrade(ship);
        LOGGER.debug("Refited ship {}", ship.getName());
        clientServerEventBus.post(new RefitFinishedEvent(ship));
    }

    @Override
    public String toString() {
        return "AIRefitTask{ship="+ship.getUuid()+", scheduled for: "+getExecutionTime()+"}";
    }
}

Upvotes: 0

Views: 182

Answers (1)

hotzst
hotzst

Reputation: 7526

As @BenManes pointed out in his comment, the issue is with the Comparator used. When the result of the Comparator is 0, even through the two tasks are not equal, entries will be overridden. In effect, the Comparator should consider the same fields as hashCode and equals. Use a Comparator implementation like this:

public int compare(TimedTask task1, TimedTask task2) {
    int executionTimeBasedComparisonResult = task1.getExecutionTime().compareTo(task2.getExecutionTime());
    if (executionTimeBasedComparisonResult == 0) { // two execution times are equal
        return task1.getUuid().compareTo(task2.getUuid());
    }
    return executionTimeBasedComparisonResult;
}

With an implementation like this the comparison is based on the execution time and when both of them are the same (comparison is 0) ensure they are ordered based on their UUID.

For the use case the order of tasks with the same execution time is not relevant.

Upvotes: 0

Related Questions