henninb
henninb

Reputation: 131

Stomp 1.2 ActiveMQ 5.15 - ACK received without a ack id for acknowledge

I have built a c function that should read from an ActiveMQ via stomp library. This code is reading an additional MESSAGE prior to the RECEIPT (which is what I would expect). Please advise on why my RECEIPT frame is not coming prior to a second MESSAGE frame. Also please note that I am wrapping this code with BEGIN and COMMIT frames which may or may not be best practice.

Thanks in advance.

int receive_from_queue_ack( MQDEF *mqdef, int transaction ) {
    apr_status_t rc;
    stomp_frame frame_write;
    stomp_frame* frame_read = NULL;
    char transaction_str[37];
    char* ack = NULL;
    char* protocol = NULL;
    char* receipt_id = NULL;
    char* message_id = NULL;
    char* message = NULL;
    char* seq = NULL;
    uuid_t uuid;
    char receipt_id_begin_uuid[37];
    char receipt_id_commit_uuid[37];
    char receipt_id_ack_uuid[37];
    char id_uuid[37];
    int receipt_flag = 1;

    uuid_generate_random(uuid);
    uuid_unparse_lower(uuid, transaction_str);

    uuid_generate_random(uuid);
    uuid_unparse_lower(uuid, receipt_id_begin_uuid);

    uuid_generate_random(uuid);
    uuid_unparse_lower(uuid, id_uuid);

    uuid_generate_random(uuid);
    uuid_unparse_lower(uuid, receipt_id_commit_uuid);


    uuid_generate_random(uuid);
    uuid_unparse_lower(uuid, receipt_id_ack_uuid);

    fprintf(stdout, "1) *** Transaction BEGIN - (receive_from_queue)\n");
    frame_write.command = "BEGIN";
    frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
    apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str); 
    frame_write.body_length = -1;
    frame_write.body = NULL;
    rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
    if ( rc != APR_SUCCESS ) {
        return FAILURE;
    }

    printf("3) Subscribe begin\n");
    frame_write.command = "SUBSCRIBE";
    frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
    apr_hash_set(frame_write.headers, "destination", APR_HASH_KEY_STRING, mqdef->queueName);
    apr_hash_set(frame_write.headers, "ack", APR_HASH_KEY_STRING, "client");
    apr_hash_set(frame_write.headers, "id", APR_HASH_KEY_STRING, "client-123");
    apr_hash_set(frame_write.headers, "activemq.prefetchSize", APR_HASH_KEY_STRING, "1");
    frame_write.body_length = -1;
    frame_write.body = NULL;
    rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
    if ( rc != APR_SUCCESS ) {
        return FAILURE;
    }
    frame_read = NULL;
    rc = stomp_read(mqdef->amqConnection, &frame_read, (apr_pool_t *)mqdef->pool);
    if ( rc != APR_SUCCESS ) {
        return FAILURE;
    }
    ack = NULL;
    if( frame_read != NULL && strncmp(frame_read->command, "MESSAGE", 7) == 0 ) {
        ack = (char*) apr_hash_get(frame_read->headers, "ack", APR_HASH_KEY_STRING);
        printf("5) ack=<%s>\n", ack);
        if( ack == NULL ) {
            printf("ack being null is a problem\n");
            exit(1);
        }
        message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
        printf("5) message_id=<%s>\n", message_id);
        printf("5) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
    } else if( frame_read != NULL && strncmp(frame_read->command, "ERROR", 5) == 0 ) {
        message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
        printf("5) message_id=<%s>\n", message_id);
        printf("5) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
        exit(1);
    } else {
        message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
        printf("5) message_id=<%s>\n", message_id);
        printf("5) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
        printf("5) should never get here.\n");
        exit(1);
    }
    frame_write.command = "ACK";
    frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
    apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str);
    apr_hash_set(frame_write.headers, "id", APR_HASH_KEY_STRING, ack);
    apr_hash_set(frame_write.headers, "receipt-id", APR_HASH_KEY_STRING, receipt_id_ack_uuid);
    printf("set id =<%s>\n", ack);
    frame_write.body_length = -1;
    frame_write.body = NULL;
    rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
    if ( rc != APR_SUCCESS ) {
        return FAILURE;
    }
    frame_read = NULL;
    rc = stomp_read(mqdef->amqConnection, &frame_read, (apr_pool_t *)mqdef->pool);
    if ( rc != APR_SUCCESS ) {
        return FAILURE;
    }
    if( frame_read != NULL && strncmp(frame_read->command, "RECEIPT", 7) == 0 ) {
        message_id = (char*) apr_hash_get(frame_read->headers, "message-id", APR_HASH_KEY_STRING);
        receipt_id = (char*) apr_hash_get(frame_read->headers, "receipt-id", APR_HASH_KEY_STRING);
        printf("7) receipt-id=<%s>\n", receipt_id);
        printf("7) message-id=<%s>\n", message_id);
        printf("7) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
    } else if( frame_read != NULL && strncmp(frame_read->command, "ERROR", 5) == 0 ) {
        message_id = (char*) apr_hash_get(frame_read->headers, "message_id", APR_HASH_KEY_STRING);
        receipt_id = (char*) apr_hash_get(frame_read->headers, "receipt-id", APR_HASH_KEY_STRING);
        printf("7) receipt-id=<%s>\n", receipt_id);
        printf("7) message-id=<%s>\n", message_id);
        printf("7) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
        exit(1);
    } else {
        message_id = (char*) apr_hash_get(frame_read->headers, "message_id", APR_HASH_KEY_STRING);
        printf("7) message-id=<%s>\n", message_id);
        printf("7) command=<%s>, body=<%s>\n", frame_read->command, frame_read->body);
        ack = (char*) apr_hash_get(frame_read->headers, "ack", APR_HASH_KEY_STRING);
        printf("5) ack=<%s>\n", ack);
        printf("this is a failure\n");
        frame_write.command = "ACK";
        frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
        apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str);
        apr_hash_set(frame_write.headers, "id", APR_HASH_KEY_STRING, ack);
        apr_hash_set(frame_write.headers, "receipt-id", APR_HASH_KEY_STRING, receipt_id_ack_uuid);
        rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
        if ( rc != APR_SUCCESS ) {
            return FAILURE;
        }
    }

    frame_write.command = "COMMIT";
    frame_write.headers = apr_hash_make((apr_pool_t *)mqdef->pool);
    apr_hash_set(frame_write.headers, "transaction", APR_HASH_KEY_STRING, transaction_str);
    frame_write.body_length = -1;
    frame_write.body = NULL;
    rc = stomp_write(mqdef->amqConnection, &frame_write, (apr_pool_t *)mqdef->pool);
    if ( rc != APR_SUCCESS ) {
        return FAILURE;
    }
    fprintf(stdout, "12) *** Transaction Complete - (receive_from_queue)\n\n");
}

Upvotes: 0

Views: 1253

Answers (1)

Tim Bish
Tim Bish

Reputation: 18356

You are not using the correct STOMP header in the acknowledge to indicate the ACK ID so the broker is telling you that you did it wrong. The STOMP specification is a good place to start to learn what to put in each performative, in this case the header is "id" and not "message-id"

As usual with C code it is quite hard to read so reformating your code above with some comments or context would be helpful but so far I don't see where the client code takes the "ack" header from the incoming messages and transfers it to the actual 'ACK' frame so that could be the issue.

From the spec:

If the message is received from a subscription that requires explicit acknowledgment (either client or client-individual mode) then the MESSAGE frame MUST also contain an ack header with an arbitrary value. This header will be used to relate the message to a subsequent ACK or NACK frame.

Refer to the STOMP specification to learn more

Upvotes: 1

Related Questions