JME
JME

Reputation: 2363

Using events to pass information to another thread, not executing concurrently

In the following code I have a recurring timer that fetches a list of messages from an online queue. The goal of this code was to test that it is possible to have the recurring timer thread only fetch the messages and pass the results to the main thread for handling.

Instead what I see is that despite the event listener being in the main thread, all actions called after the event is raised, in the recurring timer thread, always finish after the events have been handled. I understand that it is possible for this to happen occasionally depending on which thread is on the cpu at any given time, but I should still see the print statements interwoven every now and again.

I have also tested this effect by adding approximately 50 messages to the queue and I still see the same results.

My code is below

public class Service implements NewWindowEventArgsListener
{

private final static String accessKey = 
        "secret";

private final static String secretKey = 
        "secret";

private final static String AWSSQSServiceUrl =
        "secret";


private boolean IsWindowing = false;
private ScheduledExecutorService _windowTimer;
private long SQSWindow = 60000;
private NewWindowEventArgs handler = new NewWindowEventArgs();
private static List<Message> messages = new ArrayList<Message>();


public void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    handler.addListener(this);

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this._windowTimer.scheduleAtFixedRate(task,
            0, SQSWindow, TimeUnit.MILLISECONDS);

    IsWindowing = true;
}

private void WindowCallback()
{
    Date now = new Date();
    System.out.println("The service is running: " + now);

    int numberOfMessages = 0;
    ArrayList<String> attributes = new ArrayList<String>();
    AWSCredentials cred = new BasicAWSCredentials(accessKey, secretKey);
    ClientConfiguration config = new ClientConfiguration();
    config.setMaxErrorRetry(10);

    AmazonSQS client = new AmazonSQSClient(cred, config);

    client.setEndpoint(AWSSQSServiceUrl);

    System.out.println("Receiving messages from the Queue.\n");
    ReceiveMessageRequest receiveMessageRequest = 
            new ReceiveMessageRequest(AWSSQSServiceUrl);

    receiveMessageRequest.setMaxNumberOfMessages(10);

    GetQueueAttributesRequest numMessages = 
            new GetQueueAttributesRequest(AWSSQSServiceUrl); 

    attributes.add("ApproximateNumberOfMessages");
    numMessages.setAttributeNames(attributes);

    numberOfMessages = Integer.valueOf(
            (client.getQueueAttributes(numMessages)).getAttributes().
            get("ApproximateNumberOfMessages")).intValue();

    System.out.println("Expected number of Messages: " + numberOfMessages);

    do
    {
        messages.addAll(client.receiveMessage(receiveMessageRequest).
            getMessages());
    }while(messages.size() < numberOfMessages);

    System.out.println("Starting the printing of messages");

    if ( messages.size() > 0)
    {
        System.out.println("A message exists!");
        System.out.println();
        handler.NewWindowEvent(messages);
        System.out.println("//////////////////////////////////");
        System.out.println("\tEmptying message list");
        messages.clear();
        System.out.println("\tMessage list empty");
        System.out.println("//////////////////////////////////");
        System.out.println();
    }
}

@Override
public void JetstreamService_NewWindow(List<Message> messages) {
    System.out.println("Number of messages: " + messages.size() + "\n");

    ObjectMapper mapper = new ObjectMapper();

    try 
    {
        for (Message message : messages)
        {

            //System.out.println(message.getBody() + "\n");
            //byte[] bytes = DatatypeConverter.parseBase64Binary(message.getBody());

            //String messageBody = new String(bytes, "UTF-8");

            //System.out.println(messageBody + "\n");

            AmazonSNSMessage b;

            b = mapper.readValue(message.getBody(), AmazonSNSMessage.class);

            String subject = b.getSubject().trim().toLowerCase();
            System.out.println(subject);

            if (subject.equals("heartbeatevent"))
            {
                HeartbeatEvent heartbeat = new HeartbeatEvent();

                heartbeat.Deserialize(b.getMessage());

                System.out.println(heartbeat.getHeaderEventTime() + "\n");
            }

            else if(subject.equals("logicaldeviceaddedevent"))
            {
                LogicalDeviceAddedEvent logical = 
                        new LogicalDeviceAddedEvent();

                logical.Deserialize(b.getMessage());

                System.out.println(
                        logical.getLogicalDeviceAddedEventRegion() + "\n");
            }

            else if(subject.equals("logicaldeviceremovedevent"))
            {
                LogicalDeviceRemovedEvent logical = 
                        new LogicalDeviceRemovedEvent();

                logical.Deserialize(b.getMessage());

                System.out.println(
                        logical.getHeaderEventId());

                System.out.println(
                        logical.getHeaderEventTime() + "\n");
            }
        }
    } catch (JsonParseException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (JsonMappingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }   
}

Can someone please explain why the messages are not being handled in the main thread, or offer and explanation on why the clear messages print statements always occur after all of the messages have been handled.

Upvotes: 0

Views: 105

Answers (1)

Pragmateek
Pragmateek

Reputation: 13374

Not sure I understand what you don't understand but I'll try to clarify what happens:

  • you start the timer and it starts to wait for the next schedule

  • when it's time for the timer callback to be executed a thread is chosen: not your main thread that is busy in a UI events loop or waiting or working, but another thread; e.g. in .Net a timer implementation uses a thread from the global threads pool

  • so your callback, i.e. WindowCallback, executes on this thread synchronously (seems like there is nothing asynchronous in the code)

  • it arrives at handler.NewWindowEvent(messages): here we must distinguish 2 orthogonal notions: the semantics, raising an event, and the way it is executed, like any method call, on the same thread, synchronously

  • so JetstreamService_NewWindow is executed, handles the messages and returns

  • execution resumes just after handler.NewWindowEvent and clears the messages list

Here is a schema (ala UML) that sums it up:

O represents the start of a method

X its ends

    timer  WindowCallback   NewWindowEvent  JetstreamService_NewWindow
    O
    |        
    +------->O
    |        |
    |        +---------------->O
    |        |                 |
    |        |                 +------------------>O
    |        |                 |                   |
    |        |                 +<------------------X
    |        |                 |
    |        +<----------------X
    |        |clear
    |<-------X
    X

Well, I'm definitely not an artist :(

To sum it up even more: starting from the timer ticks there is only one thread of execution that runs all your methods synchronously one after the other.

Hope this help

Upvotes: 1

Related Questions