Dilshan
Dilshan

Reputation: 3001

RMQ RPC only returns response once

I have following RPC client implementation in one of my go services implemented, following the official tutorial

The usage is simple,

func DoSomethingWithRpc() {
 ...
 rpcResponse, err := rmq_helpers.SendRpcMessage(
    ts.publisher, // Just a struct that keep conn details
    data,
 )
 // Do something with `rpcResponse`
 ...
}

And this is the SendRpcMessage function. The issue is this only works once, If I trigger DoSomethingWithRpc() for second time, it get timeout ( due to the timeout logic in SendRpcMessage() otherwise just kept hanging their blocking the tread )

However on the 2nd SendRpcMessage call i can see my RPC server receive the event and it got correct CorrelationID and ReplyTo queue values. And I can see even the reply message in RMQ queue(from the rmq dashboard)

func SendRpcMessage(
    client *rmq.Client,
    body []byte,
) (res *amqp.Delivery, err error) {
    ch := client.GetChannel()

    if ch == nil {
        err = fmt.Errorf("Channel is nil")
        return
    }

    replyQueue, err := client.GetChannel().QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )

    if err != nil {
        fmt.Println("Error declaring queue: ", err)
        return nil, err
    }

    msgs, err := client.GetChannel().Consume(
        replyQueue.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    if err != nil {
        fmt.Println("Error consuming messages: ", err)
        return nil, err
    }

    corrId := fmt.Sprintf("%d", time.Now().UnixNano())
    fmt.Println("Correlation ID: ", corrId, "ReplyTo: ", replyQueue.Name)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    err = client.GetChannel().PublishWithContext(ctx,
        "",
        client.GetQueueName(),
        false,
        false,
        amqp.Publishing{
            ContentType:   "application/json",
            Body:          body,
            ReplyTo:       replyQueue.Name,
            CorrelationId: corrId,
        },
    )

    if err != nil {
        return nil, err
    }

    timeout := time.After(5 * time.Second)

    for {
        select {
        case d := <-msgs:
            if d.CorrelationId == corrId {
                // client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
                res = &d
                return  
            }
        case <-timeout:
            err = fmt.Errorf("Timeout")
            return
        }

    }
}

What am I doing wrong here ? Any help would be really appreciated.

Upvotes: 1

Views: 56

Answers (1)

Tinkerer
Tinkerer

Reputation: 1068

You are not re-initializing the timeout. Try replacing this:

    timeout := time.After(5 * time.Second)   // <--- move this below

    for {
        select {
        case d := <-msgs:
            if d.CorrelationId == corrId {
                // client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
                res = &d
                return  
            }
        case <-timeout:
            err = fmt.Errorf("Timeout")
            return
        }

    }

with this:

    for {
        select {
        case d := <-msgs:
            if d.CorrelationId == corrId {
                // client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
                res = &d
                return  
            }
        case <-time.After(5 * time.Second):
            err = fmt.Errorf("Timeout")
            return
        }
    }

Upvotes: 1

Related Questions