Reputation: 6363
For the life of me I have been unable to get RPC with RabbitMQ working with temp replyto queues. Below is a simple example derived from this test. I see bunch of exceptions in my output window and the dlq fills up, but the message is never acknowledged.
namespace ConsoleApplication4
{
class Program
{
public static IMessageService CreateMqServer(int retryCount = 1)
{
return new RabbitMqServer { RetryCount = retryCount };
}
static void Main(string[] args)
{
using (var mqServer = CreateMqServer())
{
mqServer.RegisterHandler<HelloIntro>(m =>
new HelloIntroResponse { Result = "Hello, {0}!".Fmt(m.GetBody().Name) });
mqServer.Start();
}
Console.WriteLine("ConsoleAppplication4");
Console.ReadKey();
}
}
}
namespace ConsoleApplication5
{
class Program
{
public static IMessageService CreateMqServer(int retryCount = 1)
{
return new RabbitMqServer { RetryCount = retryCount };
}
static void Main(string[] args)
{
using (var mqServer = CreateMqServer())
{
using (var mqClient = mqServer.CreateMessageQueueClient())
{
var replyToMq = mqClient.GetTempQueueName();
mqClient.Publish(new Message<HelloIntro>(new HelloIntro { Name = "World" })
{
ReplyTo = replyToMq
});
IMessage<HelloIntroResponse> responseMsg = mqClient.Get<HelloIntroResponse>(replyToMq);
mqClient.Ack(responseMsg);
}
}
Console.WriteLine("ConsoleAppplication5");
Console.ReadKey();
}
}
}
First exception
RabbitMQ.Client.Exceptions.OperationInterruptedException occurred
_HResult=-2146233088
_message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause=
HResult=-2146233088
IsTransient=false
Message=The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=405, text="RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'mq:tmp:10dd20804ee546d6bf5a3512f66143ec' in vhost '/'", classId=50, methodId=20, cause=
Source=RabbitMQ.Client
StackTrace:
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply()
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.v0_9_1.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey)
at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueue(IModel channel, String queueName)
at ServiceStack.RabbitMq.RabbitMqExtensions.RegisterQueueByName(IModel channel, String queueName)
at ServiceStack.RabbitMq.RabbitMqProducer.PublishMessage(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)
InnerException:
followed by this one
System.Threading.ThreadInterruptedException occurred
_HResult=-2146233063
_message=Thread was interrupted from a waiting state.
HResult=-2146233063
IsTransient=true
Message=Thread was interrupted from a waiting state.
Source=mscorlib
StackTrace:
at System.Threading.Monitor.ObjWait(Boolean exitContext, Int32 millisecondsTimeout, Object obj)
at System.Threading.Monitor.Wait(Object obj, Int32 millisecondsTimeout, Boolean exitContext)
InnerException:
Then it repeat for a number of times and hangs. This particular post seems to suggest that they were able to achieve some sort of success with ServerStack and RabbitMQ RPC, but before I start changing my code I'd like to know the reason that my code doesn't work.
Thank you, Stephen
Upvotes: 1
Views: 1382
Reputation: 143339
There was an issue with redeclaring an exclusive queue which is no longer being done in this commit.
There's also a new RabbitMqTest project showcasing a simple working Client/Server example communicating via 2 independent Console Applications.
This change is available from v4.0.34+ that's now on MyGet.
The ServiceStack.RabbitMq
package RabbitMq.Client
NuGet dependency has also been upgraded to v3.4.0.
Upvotes: 1
Reputation: 175
When your client call GetTempQueueName(), it creates an exclusive queue, which cannot be accessed from another connection (i.e. your server).
Therefore I created my own simple mq-client which does not use servicestack's mq client and only depends on rabbitmq's .net-library:
public class MqClient : IDisposable
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "192.168.97.201",
UserName = "guest",
Password = "guest",
//VirtualHost = "test",
Port = AmqpTcpEndpoint.UseDefaultPort,
};
private IConnection connection;
private string exchangeName;
public MqClient(string defaultExchange)
{
this.exchangeName = defaultExchange;
this.connection = factory.CreateConnection();
}
public TResponse RpcCall<TResponse>(IReturn<TResponse> reqDto, string exchange = null)
{
using (var channel = connection.CreateModel())
{
string inq_queue_name = string.Format("mq:{0}.inq", reqDto.GetType().Name);
string responseQueueName = channel.QueueDeclare("",false,false,true,null).QueueName;
//string responseQueueName = channel.QueueDeclare().QueueName;
var props = channel.CreateBasicProperties();
props.ReplyTo = responseQueueName;
var message = ServiceStack.Text.JsonSerializer.SerializeToString(reqDto);
channel.BasicPublish(exchange ?? this.exchangeName, inq_queue_name, props, UTF8Encoding.UTF8.GetBytes(message));
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(responseQueueName, true, consumer);
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//channel.BasicAck(ea.DeliveryTag, false);
string response = UTF8Encoding.UTF8.GetString(ea.Body);
string responseType = ea.BasicProperties.Type;
Console.WriteLine(" [x] New Message of Type '{1}' Received:{2}{0}", response, responseType, Environment.NewLine);
return ServiceStack.Text.JsonSerializer.DeserializeFromString<TResponse>(response);
}
}
~MqClient()
{
this.Dispose();
}
public void Dispose()
{
if (connection != null)
{
this.connection.Dispose();
this.connection = null;
}
}
}
It can be used like that:
using (var mqClient = new MqClient("mx.servicestack"))
{
var pingResponse = mqClient.RpcCall<PingResponse>(new Ping { });
}
Important: You've got to use servicestack version 4.0.32+.
Upvotes: 2