Reputation: 1272
I am using publish/subscribe for a project and want to find a solution for a problem that I foresee in the future.
The problem I see is that we are going to have many publishers that they will update some data and will cause a publish an update message (on a specific topic), there would be many topics and many updates per topic (like 1000 updates in 1 second in one topic and 2000 in another one), and I have to prevent that as I don't want my subscribers to receive tons of updated message per second.
I just care about having one update on each topic every n seconds if there is any new message
Look at this example (n = 1 seconds):
[time: 0.00] *message (publish message as it is first message)
[time: 0.02] message (nop)
[time: 0.03] message (nop)
[time: 0.04] message (nop)
[time: 0.10] message (nop)
[time: 1.00] *(1 second after last publish) (publish message as as we had a message at 0.10)
[time: 1.22] message (nop)
[time: 2.00] *(1 second after last publish) (publish message as as we had a message at 1.22)
(NO UPDATE as no update from last publish)
[time: 5.50] message (publish message as it is first message in last second)
[time: 5.60] message (nop)
[time: 6.50] *(1 second after last publish) (publish message as as we had a message at 5.60)
The infrastructure is JAVA, we have rabbitmq, appsync, aws servcies in hand.
Proposed solutions so far are to:
Use a blocking thread to pause after the first message and ignore next messages on the same topic with a timeout to fire a final message after n seconds and terminate itself.
Save the timestamp of message per topic key in cache, ignore new messages if the key exists in cache, fire a bunch of workers to process the queue and fire messages for messages older than n second and remove the saved key from cache.
Fire the first message and save the timestamp in cache for the topic and don't fire any message after as long as the new message is less than n second old (we will lose the last message in this case)
Each solution have it's pro and cons, I am looking for some more brains to see what are other options, maybe a queue system may help?
Upvotes: 0
Views: 159
Reputation: 201
This can be achieved using reactive programming libraries, like Project Reactor.
You can receive a Flux
from the topics, and throttle the flow of incoming messages using a combination of filter()
, skip()
and buffer()
operations.
You can find more information on the specific operations available here
Upvotes: 1
Reputation: 1745
Add some buffer for messages, trigger actual push to topic from dedicated thread with fixed delays between executions:
public class Publisher implements Runnable {
int maxPendingMessages = 5000;
int maxBulkMessages = 10;
private final LinkedList<String> messages = new LinkedList<>();
private final Object lock = new Object();
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(15);
Publisher p = new Publisher();
ses.scheduleAtFixedRate(p, 0, 1000, TimeUnit.MILLISECONDS);
for (int i = 0; i < 5000; i++) {
p.sendMessage("test " + i);
}
}
public void sendMessage(String msg) {
Assert.notNull(msg, "topic message should not be empty!");
if (messages.size() > maxPendingMessages) {
synchronized (lock) {
messages.pollFirst();
}
}
synchronized (lock) {
messages.add(msg);
}
}
public void run() {
int total = messages.size();
if (total > maxBulkMessages) {
total = maxBulkMessages;
}
int count = 0;
while (count <= total) {
if (Thread.interrupted()) {
break;
}
count++;
try {
final String m;
synchronized (lock) {
m = messages.pollFirst();
}
if (m == null) {
// don't try to send empty ( null ) message
continue;
}
System.out.println("message was:" + m);
// actual push to topic here
} catch (Exception e) {
// no need to process pending messages on delivery errors
try {
messages.clear();
} catch (Exception ee) {
}
}
}
} }
Upvotes: 2