Reputation: 1948
So I'm receiving numerous messages from a stream - well, it's not a stream per se, it's really a method that's fired off when a message is received - and I would like to count the number of messages received in 10 seconds.
Here's what I have so far:
package com.example.demo;
import java.net.URI;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
public class ExampleClient extends WebSocketClient {
private float messagesRecievedCount;
public ExampleClient(URI serverUri) {
super(serverUri);
System.out.println("Created object");
setMessagesRecievedCount(0);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println("Connection established!");
}
@Override
public void onMessage(String s) {
setMessagesRecievedCount(getMessagesRecievedCount() + 1);
}
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
long ten = System.currentTimeMillis() + 1000;
while(System.currentTimeMillis() < ten) {
end = getMessagesRecievedCount();
}
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
public float getMessagesRecievedCount() {
return messagesRecievedCount;
}
public void setMessagesRecievedCount(float messagesRecievedCount) {
this.messagesRecievedCount = messagesRecievedCount;
}
}
I have a global variable messagesRecievedCount
which keeps a running count of messages received from a websocket stream. Whenever a message is received the onMessage()
method is fired and it updates the message count. I want to count the number of messages received in 10 seconds (and extrapolate it to a minute) - for which I have the getMessagesPerMinute()
.
Obviously the way I'm doing it is not smart - it's blocking and the count of messages after 10 seconds is the same (when actually it isn't, I've actually received 20 messages). I feel like I should be doing threads but I don't know how to go about it. What would you suggest? I'm really new to this and just tinkering around.
This is the main class where I'm calling ExampleClient.java
from:
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketSession;
import java.net.URI;
import java.net.URISyntaxException;
@SpringBootApplication
public class DemoApplication {
private WebSocketSession clientSession;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication () throws URISyntaxException {
ExampleClient c = new ExampleClient( new URI( "wss://example.com/" ) );
c.connect();
c.getMessagesPerMinute();
}
}
The c.connect()
establishes the connection and the onMessage()
is triggered soon after!
Upvotes: 0
Views: 516
Reputation:
Your code actully runs fo 1 second (1000ms, I don't know if this is a typo or a voluntary simplification). One other problem is that it calls end = getMessagesRecievedCount();
repeatedly inside a while loop, while you actually need only the starting and final value. A way to solve this would be using Thread.sleep()
(if you never need to cancel the counting midway):
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
try{
Thread.sleep(10000);
}
catch(InterruptedException e){
System.out.println("do something");
}
end = getMessagesRecievedCount();
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
For blocking the important thing is that this code runs in a different thread that the one updating the value of messagesRecievedCount
or doing other things that you may want to do in the meanwhile, so calling it inside a new thread is probably the best solution. I'm not familiar with the framework you are using so it may be already using different threads that better suit this purpose.
If you intend to do something more with the variable messagesRecievedCount
some synchronization would be required, but for an estimate of the number of messages for minute this should be good enough.
Here is some test code I used that you can hopefully adapt to better suit your case and play with to pinpoint the problem. The difference is quite constant in this case, but the values are clearly updated. Making the ExampleClient instance public is a shortcut which should probaby be avoided in the actual code.
public class Test{
public static ExampleClient example=new ExampleClient();
public static void main(String[] args){
Thread a=new MessagesPerTenSecondFetcher();
Thread b=new MessagesPerTenSecondFetcher();
Thread c=new MessagesPerTenSecondFetcher();
Thread d= new MessageProducer();
a.start();
d.start();
b.start();
try{
Thread.sleep(2000);
}
catch(InterruptedException e){
System.out.println("do something");
}
c.start();
}
}
class ExampleClient {
private float messagesRecievedCount;
public void onMessage(String s) {
setMessagesRecievedCount(getMessagesRecievedCount() + 1);
}
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
try{
Thread.sleep(10000);
}
catch(InterruptedException e){
System.out.println("do something");
}
end = getMessagesRecievedCount();
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
public float getMessagesRecievedCount() {
return messagesRecievedCount;
}
public void setMessagesRecievedCount(float messagesRecievedCount) {
this.messagesRecievedCount = messagesRecievedCount;
}
}
class MessagesPerTenSecondFetcher extends Thread{
@Override
public void run(){
Test.example.getMessagesPerMinute();
}
}
class MessageProducer extends Thread{
@Override
public void run(){
for(int i =0; i<100;i++){
Test.example.onMessage("a");
try{
Thread.sleep(130);
}
catch(InterruptedException e){
System.out.println("do something");
}
}
}
}
Upvotes: 1