Reputation: 3001
I have following RPC client implementation in one of my go services implemented, following the official tutorial
The usage is simple,
func DoSomethingWithRpc() {
...
rpcResponse, err := rmq_helpers.SendRpcMessage(
ts.publisher, // Just a struct that keep conn details
data,
)
// Do something with `rpcResponse`
...
}
And this is the SendRpcMessage function. The issue is this only works once, If I trigger DoSomethingWithRpc()
for second time, it get timeout ( due to the timeout logic in SendRpcMessage()
otherwise just kept hanging their blocking the tread )
However on the 2nd SendRpcMessage
call i can see my RPC server receive the event and it got correct CorrelationID
and ReplyTo
queue values. And I can see even the reply message in RMQ queue(from the rmq dashboard)
func SendRpcMessage(
client *rmq.Client,
body []byte,
) (res *amqp.Delivery, err error) {
ch := client.GetChannel()
if ch == nil {
err = fmt.Errorf("Channel is nil")
return
}
replyQueue, err := client.GetChannel().QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {
fmt.Println("Error declaring queue: ", err)
return nil, err
}
msgs, err := client.GetChannel().Consume(
replyQueue.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
fmt.Println("Error consuming messages: ", err)
return nil, err
}
corrId := fmt.Sprintf("%d", time.Now().UnixNano())
fmt.Println("Correlation ID: ", corrId, "ReplyTo: ", replyQueue.Name)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = client.GetChannel().PublishWithContext(ctx,
"",
client.GetQueueName(),
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
ReplyTo: replyQueue.Name,
CorrelationId: corrId,
},
)
if err != nil {
return nil, err
}
timeout := time.After(5 * time.Second)
for {
select {
case d := <-msgs:
if d.CorrelationId == corrId {
// client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
res = &d
return
}
case <-timeout:
err = fmt.Errorf("Timeout")
return
}
}
}
What am I doing wrong here ? Any help would be really appreciated.
Upvotes: 1
Views: 56
Reputation: 1068
You are not re-initializing the timeout. Try replacing this:
timeout := time.After(5 * time.Second) // <--- move this below
for {
select {
case d := <-msgs:
if d.CorrelationId == corrId {
// client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
res = &d
return
}
case <-timeout:
err = fmt.Errorf("Timeout")
return
}
}
with this:
for {
select {
case d := <-msgs:
if d.CorrelationId == corrId {
// client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
res = &d
return
}
case <-time.After(5 * time.Second):
err = fmt.Errorf("Timeout")
return
}
}
Upvotes: 1