Reputation: 87
Here I am trying to create 10 seconds bucket for incoming UDP packets using packet arrival time, but always creates multiple keys for within 10 seconds after removing.
public static void main(String[] args) {
ConcurrentHashMap<Long, String> tenSecondBucket =
new ConcurrentHashMap<Long, String>();
This thread tries to write into hash map continuously. When adding new entry it compares the old entry by key (timestamp), whether older than 10 seconds, if yes, then create new Entry, otherwise it will update it.
Thread writingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1);
if(tenSecondBucket.size() > 0) {
// getting last key
long lastKey = 0;
for (long keyValue : tenSecondBucket.keySet()) {
lastKey = keyValue;
}
if(System.currentTimeMillis() - lastKey > 10000) {
tenSecondBucket.put(System.currentTimeMillis(), "secondEntry");
} else {
tenSecondBucket.put(lastKey, "updatedEntry");
}
} else {
tenSecondBucket.put(System.currentTimeMillis(), "newEntry");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
writingThread.start();
This thread removes the 10 seconds older keys.
Thread removingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
if(System.currentTimeMillis() - key > 10000) {
tenSecondBucket.remove(key);
}
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
removingThread.start();
This thread tries to read what is happening there.
Thread readingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
System.out.println("testing key which is timestamp " + key);
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
readingThread.start();
}
Upvotes: 0
Views: 755
Reputation: 20467
As Steve said in comments, your approach to get last key is incorrect and will result in random value.
You also mentioned in comments that you need this to be thread-safe for multiple writer threads.
I'd try something like the following, using a shared AtomicLong
to save the "last key", and update it atomically with updateAndGet
:
AtomicLong lastKey = new AtomicLong();
Thread writingThread = new Thread(() -> {
while (true) {
try {
Thread.sleep(100);
long now = System.currentTimeMillis();
long localLastKey = lastKey.updateAndGet(oldValue -> oldValue < now - 10000 ? now : oldValue);
if (localLastKey == now) {
tenSecondBucket.put(now, "newEntry");
} else {
tenSecondBucket.put(localLastKey, "updatedEntry@" + now);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Upvotes: 2