Reputation: 345
I have hazelcast client that is putting generic message class into a iQueue and hazelcast member consume this generic message via Listener do the logic and remove the object from the queue. But it is not removing all the objects. On mancenter i can see that there are still items into the queue (not all for example from 100 objects in queue it is removing around 80 from them) and i don`t know why it is not removing some of the objects. Currently in mancenter it is showing 12 items into the queue (from arround 100 requests) but it shouldn't have any.Still the code is working and returning results. The only problem is that i can see in mancenter that this items are getting more and more into the queue until i stop the hazelcast server.
My generic message class:
public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = -1927585972068115172L;
private final T payload;
private MessageHeaders headers;
public GenericMessage(T payload) {
Assert.notNull(payload, "payload must not be null");
HashMap<Object, Object> headers = new HashMap<>();
this.headers = new MessageHeaders(headers);
this.payload = payload;
}
@Override
public MessageHeaders getHeaders() {
return this.headers;
}
@Override
public T getPayload() {
return this.payload;
}
@Override
public String toString() {
return "[Payload=" + this.payload + "][Headers=" + this.headers + "]";
}
@Override
public int hashCode() {
return this.headers.hashCode() * 23 + ObjectUtils.nullSafeHashCode(this.payload);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj instanceof GenericMessage<?>) {
GenericMessage<?> other = (GenericMessage<?>) obj;
if (this.headers.getKey() != null && other.headers.getKey() != null) {
return this.headers.getKey().equals(other.headers.getKey());
} else {
return false;
}
}
return false;
}
}
MessageHeaders class:
public class MessageHeaders implements Map<Object, Object>, Serializable {
private static final long serialVersionUID = 4469807275189880042L;
protected Map<Object, Object> headers;
public static final String KEY = "key";
public MessageHeaders(Map<Object, Object> headers) {
this.headers = (headers != null) ? headers : new HashMap<>();
}
@SuppressWarnings("unchecked")
public <T> T get(Object key, Class<T> type) {
Object value = this.headers.get(key);
if (value == null) {
return null;
}
if (!type.isAssignableFrom(value.getClass())) {
throw new IllegalArgumentException("Incorrect type specified for header '"
+ key
+ "'. Expected ["
+ type
+ "] but actual type is ["
+ value.getClass()
+ "]");
}
return (T) value;
}
@Override
public int hashCode() {
return this.headers.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj instanceof MessageHeaders) {
MessageHeaders other = (MessageHeaders) obj;
return this.headers.equals(other.headers);
}
return false;
}
@Override
public boolean containsKey(Object key) {
return this.headers.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return this.headers.containsValue(value);
}
@Override
public Set<Map.Entry<Object, Object>> entrySet() {
return Collections.unmodifiableSet(this.headers.entrySet());
}
@Override
public Object get(Object key) {
return this.headers.get(key);
}
@Override
public boolean isEmpty() {
return this.headers.isEmpty();
}
@Override
public Set<Object> keySet() {
return Collections.unmodifiableSet(this.headers.keySet());
}
@Override
public int size() {
return this.headers.size();
}
@Override
public Collection<Object> values() {
return Collections.unmodifiableCollection(this.headers.values());
}
@Override
public Object put(Object key, Object value) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
@Override
public void putAll(Map<? extends Object, ? extends Object> t) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
@Override
public Object remove(Object key) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
@Override
public void clear() {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
private void writeObject(ObjectOutputStream out) throws IOException {
List<String> keysToRemove = new ArrayList<>();
for (Map.Entry<Object, Object> entry : this.headers.entrySet()) {
if (!(entry.getValue() instanceof Serializable)) {
keysToRemove.add(String.valueOf(entry.getKey()));
}
}
for (String key : keysToRemove) {
// if (logger.isInfoEnabled()) {
// logger.info("removing non-serializable header: " +
// key);
// }
this.headers.remove(key);
}
out.defaultWriteObject();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}
public String getKey() {
return this.get(KEY, String.class);
}
public void setKey(String key) {
this.headers.put(KEY, key);
}
}
Putting into queue implementation:
User user = new User();
GenericMessage<User> message = new GenericMessage<User>(user);
String key="123";
message.getHeaders().setKey(key);
IQueue<Object> queue = hazelcastInstance.getQueue("user_queue");
queue.add(message);
Hazelcast listener configuration:
IQueue<Object> userQueue = hazelcastInstance.getQueue("user_queue");
UserListener userListener = context.getBean(UserListener.class);
userQueue.addItemListener(userListener, true);
Listener:
public class UserListener implements ItemListener<Object> {
@Autowired
private UserService service;
@Override
public void itemAdded(ItemEvent<Object> arg0) {
service.process(arg0);
}
}
Service:
public class UserService {
@Async("userTaskExecutor")
public void process(ItemEvent<Object> item) {
GenericMessage<User> message = (GenericMessage<User>) item.getItem();
hazelcastInstance.getQueue("user_queue").remove(message);
}
Upvotes: 0
Views: 302
Reputation: 345
With a lot of testing and debugging i found the problem. It turns out that the documentation of the remove(object) method is misleading. In the documentation is says that this method rely on .equals() class method but it turns out that hazelcast compares the serialized object against each serialized object. So i implement a custom compare:
GenericMessage<?> incomeMessage = (GenericMessage<?>) object;
boolean removed = hazelcastInstance.getQueue(queueId).remove(object);
if (!removed) {
Iterator<Object> iterator = hazelcastInstance.getQueue(queueId).iterator();
while (iterator.hasNext()) {
Object next = iterator.next();
GenericMessage<?> message = (GenericMessage<?>) next;
if (incomeMessage.getHeaders().getKey()
.equals(message.getHeaders().getKey())) {
object = next;
removed = hazelcastInstance.getQueue(queueId).remove(object);
break;
}
}
}
Upvotes: 2