S3Mi
S3Mi

Reputation: 488

ActiveMQ + Stomp, Reading one message but four of them get dequeued

When I read one message from AMQ with Stomp I get 3 or 4 messages dequeued, don't know why.

Code for populating AMQ:

public function populate($queue, $c = 10) {

    for($i = 0; $i < $c; $i++) {
        $this->stomp->send($queue,' Random populated:'.rand(0, PHP_INT_MAX));
    }

}

ActiveMQ Queue after populating ActiveMQ messages after populating

Code for reading AMQ:

public function read($queue = null) {

    if(is_null($queue)) {
        if(!$this->isSubscribed()) {
            return false;
        }
    } else {
        $this->subscribe($queue);
    }

    return $this->stomp->readFrame();

}

Stomp readFrame() code:

public function readFrame ()
{
    if (!$this->hasFrameToRead()) {
        return false;
    }

    $rb = 1024;
    $data = '';
    $end = false;

    do {
        $read = fread($this->_socket, $rb);
        if ($read === false) {
            $this->_reconnect();
            return $this->readFrame();
        }
        $data .= $read;
        if (strpos($data, "\x00") !== false) {
            $end = true;
            $data = rtrim($data, "\n");
        }
        $len = strlen($data);
    } while ($len < 2 || $end == false);

    list ($header, $body) = explode("\n\n", $data, 2);
    $header = explode("\n", $header);
    $headers = array();
    $command = null;
    foreach ($header as $v) {
        if (isset($command)) {
            list ($name, $value) = explode(':', $v, 2);
            $headers[$name] = $value;
        } else {
            $command = $v;
        }
    }
    $frame = new StompFrame($command, $headers, trim($body));
    if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
        require_once 'Stomp/Message/Map.php';
        return new StompMessageMap($frame);
    } else {
        return $frame;
    }
    return $frame;
}

I'm 100% sure that code is being executed exactly once but the result is: ActiveMQ queue after reading one message ActiveMQ messages after reading one message

Var_dumped message:

object(StompFrame)[4]
public 'command' => string 'MESSAGE' (length=7)
public 'headers' => 
array (size=5)
  'message-id' => string 'ID:**********_-49723-1350635513276-2:1:-1:1:1' (length=45)
  'destination' => string '/queue/test' (length=11)
  'timestamp' => string '1350635842968' (length=13)
  'expires' => string '0' (length=1)
  'priority' => string '4' (length=1)
public 'body' => string 'Random populated:1859256320' (length=27)

Does anyone know what can be the cause of this behavior?

Notices:

Upvotes: 1

Views: 2818

Answers (1)

Tim Bish
Tim Bish

Reputation: 18356

I don't see you connect code but I'm going to assume you are connecting using Auto Ack mode. In STOMP with Auto Ack mode the messages are ack'd as soon as they hit the wire and since I'm also assuming you didn't change the prefetch size the broker will send you a batch of messages so as you read them from the socket and more can be sent more will get dequeued. If you want more fine grained control over message consumption you should use another ack mode like client ack and ack each message as it arrives. You can also set the prefetch window for your subscription to reduce the number of messages that are batched to your client.

See this page for the AMQ STOMP configuration options. You may also want to look over the STOMP spec again.

Upvotes: 3

Related Questions