Saturnian
Saturnian

Reputation: 1948

How to count incoming messages from a "stream" in Java?

So I'm receiving numerous messages from a stream - well, it's not a stream per se, it's really a method that's fired off when a message is received - and I would like to count the number of messages received in 10 seconds.

Here's what I have so far:

package com.example.demo;

import java.net.URI;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

public class ExampleClient extends WebSocketClient {

    private float messagesRecievedCount;

    public ExampleClient(URI serverUri) {
        super(serverUri);
        System.out.println("Created object");
        setMessagesRecievedCount(0);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        System.out.println("Connection established!");
    }

    @Override
    public void onMessage(String s) {
        setMessagesRecievedCount(getMessagesRecievedCount() + 1);
    }

    public void getMessagesPerMinute(){
        float start = getMessagesRecievedCount();
        float end = 0;
        long ten = System.currentTimeMillis() + 1000;
        while(System.currentTimeMillis() < ten) {
            end = getMessagesRecievedCount();
        }
        System.out.println("start: "+start+" end: "+end+
                "Total messages: "+ (end-start)+"\n");
    }

    public float getMessagesRecievedCount() {
        return messagesRecievedCount;
    }

    public void setMessagesRecievedCount(float messagesRecievedCount) {
        this.messagesRecievedCount = messagesRecievedCount;
    }
}

I have a global variable messagesRecievedCount which keeps a running count of messages received from a websocket stream. Whenever a message is received the onMessage() method is fired and it updates the message count. I want to count the number of messages received in 10 seconds (and extrapolate it to a minute) - for which I have the getMessagesPerMinute().

Obviously the way I'm doing it is not smart - it's blocking and the count of messages after 10 seconds is the same (when actually it isn't, I've actually received 20 messages). I feel like I should be doing threads but I don't know how to go about it. What would you suggest? I'm really new to this and just tinkering around.

This is the main class where I'm calling ExampleClient.java from:

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketSession;

import java.net.URI;
import java.net.URISyntaxException;

@SpringBootApplication
public class DemoApplication  {

    private WebSocketSession clientSession;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    public DemoApplication () throws URISyntaxException {
        ExampleClient c = new ExampleClient( new URI( "wss://example.com/" ) );
        c.connect();
        c.getMessagesPerMinute();
    }
    
}

The c.connect() establishes the connection and the onMessage() is triggered soon after!

Upvotes: 0

Views: 516

Answers (1)

user13149581
user13149581

Reputation:

Your code actully runs fo 1 second (1000ms, I don't know if this is a typo or a voluntary simplification). One other problem is that it calls end = getMessagesRecievedCount(); repeatedly inside a while loop, while you actually need only the starting and final value. A way to solve this would be using Thread.sleep() (if you never need to cancel the counting midway):

    public void getMessagesPerMinute(){
    float start = getMessagesRecievedCount();
    float end = 0;
    try{
            Thread.sleep(10000);
    }
    catch(InterruptedException e){
        System.out.println("do something");
    }
    end = getMessagesRecievedCount();
    System.out.println("start: "+start+" end: "+end+
            "Total messages: "+ (end-start)+"\n");
    }

For blocking the important thing is that this code runs in a different thread that the one updating the value of messagesRecievedCount or doing other things that you may want to do in the meanwhile, so calling it inside a new thread is probably the best solution. I'm not familiar with the framework you are using so it may be already using different threads that better suit this purpose.

If you intend to do something more with the variable messagesRecievedCount some synchronization would be required, but for an estimate of the number of messages for minute this should be good enough.

Here is some test code I used that you can hopefully adapt to better suit your case and play with to pinpoint the problem. The difference is quite constant in this case, but the values are clearly updated. Making the ExampleClient instance public is a shortcut which should probaby be avoided in the actual code.

public class Test{

    public static ExampleClient example=new ExampleClient();
    
    public static void main(String[] args){
        Thread a=new MessagesPerTenSecondFetcher();
        Thread b=new MessagesPerTenSecondFetcher();
        Thread c=new MessagesPerTenSecondFetcher();
        Thread d= new MessageProducer();
        a.start();
        d.start();
        b.start();
        try{
            Thread.sleep(2000);
        }
        catch(InterruptedException e){
            System.out.println("do something");
        }
        c.start();
    }
}

class ExampleClient {
    private float messagesRecievedCount;

    public void onMessage(String s) {
        setMessagesRecievedCount(getMessagesRecievedCount() + 1);
    }

    public void getMessagesPerMinute(){
        float start = getMessagesRecievedCount();
        float end = 0;
        try{
                Thread.sleep(10000);
        }
        catch(InterruptedException e){
            System.out.println("do something");
        }
        end = getMessagesRecievedCount();
        System.out.println("start: "+start+" end: "+end+
                "Total messages: "+ (end-start)+"\n");
    }

    public float getMessagesRecievedCount() {
        return messagesRecievedCount;
    }

    public void setMessagesRecievedCount(float messagesRecievedCount) {
        this.messagesRecievedCount = messagesRecievedCount;
    }
}

class MessagesPerTenSecondFetcher extends Thread{
     @Override
     public void run(){
         Test.example.getMessagesPerMinute();
     }
}

class MessageProducer extends Thread{
     @Override
     public void run(){
        for(int i =0; i<100;i++){
            Test.example.onMessage("a");
            try{
                Thread.sleep(130);
            }
            catch(InterruptedException e){
                System.out.println("do something");
            }
        }
    }
}

Upvotes: 1

Related Questions