Hazelcast remove from iQueue skipping some elements

I have hazelcast client that is putting generic message class into a iQueue and hazelcast member consume this generic message via Listener do the logic and remove the object from the queue. But it is not removing all the objects. On mancenter i can see that there are still items into the queue (not all for example from 100 objects in queue it is removing around 80 from them) and i don`t know why it is not removing some of the objects. Currently in mancenter it is showing 12 items into the queue (from arround 100 requests) but it shouldn't have any.Still the code is working and returning results. The only problem is that i can see in mancenter that this items are getting more and more into the queue until i stop the hazelcast server.

My generic message class:

public class GenericMessage<T> implements Message<T>, Serializable {

    private static final long serialVersionUID = -1927585972068115172L;

    private final T payload;

    private MessageHeaders headers;

    public GenericMessage(T payload) {
    Assert.notNull(payload, "payload must not be null");
    HashMap<Object, Object> headers = new HashMap<>();
    this.headers = new MessageHeaders(headers);
    this.payload = payload;
    }

    @Override
    public MessageHeaders getHeaders() {
    return this.headers;
    }

    @Override
    public T getPayload() {
    return this.payload;
    }

    @Override
    public String toString() {
    return "[Payload=" + this.payload + "][Headers=" + this.headers + "]";
    }

    @Override
    public int hashCode() {
    return this.headers.hashCode() * 23 + ObjectUtils.nullSafeHashCode(this.payload);
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj) {
        return true;
    }
    if (obj != null && obj instanceof GenericMessage<?>) {
        GenericMessage<?> other = (GenericMessage<?>) obj;
        if (this.headers.getKey() != null && other.headers.getKey() != null) {
        return this.headers.getKey().equals(other.headers.getKey());
        } else {
        return false;
        }

    }
    return false;
    }

}

MessageHeaders class:

public class MessageHeaders implements Map<Object, Object>, Serializable {

    private static final long serialVersionUID = 4469807275189880042L;

    protected Map<Object, Object> headers;

    public static final String KEY = "key";

    public MessageHeaders(Map<Object, Object> headers) {

    this.headers = (headers != null) ? headers : new HashMap<>();
    }

    @SuppressWarnings("unchecked")
    public <T> T get(Object key, Class<T> type) {

    Object value = this.headers.get(key);
    if (value == null) {
        return null;
    }
    if (!type.isAssignableFrom(value.getClass())) {
        throw new IllegalArgumentException("Incorrect type specified for header '"
            + key
            + "'. Expected ["
            + type
            + "] but actual type is ["
            + value.getClass()
            + "]");
    }
    return (T) value;
    }

    @Override
    public int hashCode() {

    return this.headers.hashCode();
    }

    @Override
    public boolean equals(Object obj) {

    if (this == obj) {
        return true;
    }
    if (obj != null && obj instanceof MessageHeaders) {
        MessageHeaders other = (MessageHeaders) obj;
        return this.headers.equals(other.headers);
    }
    return false;
    }
    @Override
    public boolean containsKey(Object key) {

    return this.headers.containsKey(key);
    }

    @Override
    public boolean containsValue(Object value) {

    return this.headers.containsValue(value);
    }

    @Override
    public Set<Map.Entry<Object, Object>> entrySet() {

    return Collections.unmodifiableSet(this.headers.entrySet());
    }

    @Override
    public Object get(Object key) {

    return this.headers.get(key);
    }

    @Override
    public boolean isEmpty() {

    return this.headers.isEmpty();
    }

    @Override
    public Set<Object> keySet() {

    return Collections.unmodifiableSet(this.headers.keySet());
    }

    @Override
    public int size() {

    return this.headers.size();
    }

    @Override
    public Collection<Object> values() {

    return Collections.unmodifiableCollection(this.headers.values());
    }

    @Override
    public Object put(Object key, Object value) {

    throw new UnsupportedOperationException("MessageHeaders is immutable.");
    }

    @Override
    public void putAll(Map<? extends Object, ? extends Object> t) {

    throw new UnsupportedOperationException("MessageHeaders is immutable.");
    }

    @Override
    public Object remove(Object key) {

    throw new UnsupportedOperationException("MessageHeaders is immutable.");
    }

    @Override
    public void clear() {

    throw new UnsupportedOperationException("MessageHeaders is immutable.");
    }

    private void writeObject(ObjectOutputStream out) throws IOException {

    List<String> keysToRemove = new ArrayList<>();
    for (Map.Entry<Object, Object> entry : this.headers.entrySet()) {
        if (!(entry.getValue() instanceof Serializable)) {
        keysToRemove.add(String.valueOf(entry.getKey()));
        }
    }
    for (String key : keysToRemove) {
        // if (logger.isInfoEnabled()) {
        // logger.info("removing non-serializable header: " +
        // key);
        // }
        this.headers.remove(key);
    }
    out.defaultWriteObject();
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {

    in.defaultReadObject();
    }

    public String getKey() {

    return this.get(KEY, String.class);
    }

    public void setKey(String key) {

    this.headers.put(KEY, key);
    }
}

Putting into queue implementation:

User user = new User();
GenericMessage<User> message = new GenericMessage<User>(user);
String key="123";
message.getHeaders().setKey(key);
IQueue<Object> queue = hazelcastInstance.getQueue("user_queue");
queue.add(message);

Hazelcast listener configuration:

IQueue<Object> userQueue = hazelcastInstance.getQueue("user_queue");
    UserListener userListener = context.getBean(UserListener.class);
    userQueue.addItemListener(userListener, true);

Listener:

public class UserListener implements ItemListener<Object> {

    @Autowired
    private UserService service;

    @Override
    public void itemAdded(ItemEvent<Object> arg0) {
        service.process(arg0);
    }

}

Service:

public class UserService {
    @Async("userTaskExecutor")
    public void process(ItemEvent<Object> item) {
    GenericMessage<User> message = (GenericMessage<User>) item.getItem();
    hazelcastInstance.getQueue("user_queue").remove(message);
}

Upvotes: 0

Views: 302

Answers (1)

With a lot of testing and debugging i found the problem. It turns out that the documentation of the remove(object) method is misleading. In the documentation is says that this method rely on .equals() class method but it turns out that hazelcast compares the serialized object against each serialized object. So i implement a custom compare:

    GenericMessage<?> incomeMessage = (GenericMessage<?>) object;
    boolean removed = hazelcastInstance.getQueue(queueId).remove(object);
    if (!removed) {
        Iterator<Object> iterator = hazelcastInstance.getQueue(queueId).iterator();
        while (iterator.hasNext()) {
        Object next = iterator.next();
        GenericMessage<?> message = (GenericMessage<?>) next;
        if (incomeMessage.getHeaders().getKey()
            .equals(message.getHeaders().getKey())) {
            object = next;
            removed = hazelcastInstance.getQueue(queueId).remove(object);
            break;
        }

        }
    }

Upvotes: 2

Related Questions