Reputation: 4766
I want to use MassTransit + RabbitMq bus for scheduling message from the bus. I write two C# console application, one for messages creator and send messages to schedular, and other for message consumer.
the following code for scheduling in the bus, so that send message to the scheduler one per second and then the scheduler send to consumer with 10 seconds delay. My problem is that No message is sent to the consumer or the consumer queue in rabbitMq client. Where is my mistake?
Note: UseInMemoryScheduler work fine but UseMessageScheduler doesn't work.
Bus message creator
class Program
{
public static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
Console.ReadKey();
}
static async Task MainAsync(string[] args)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Username("guest");
settings.Password("guest");
});
//rabbit.UseInMemoryScheduler(); // This works
rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));// This doesn't work,
});
busControl.Start();
var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));
for (int i = 0; i < 1000000; i++)
{
await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),
DateTime.Now.AddSeconds(10),
new MessageCreated()
{
Text = $"message {i}"
});
Thread.Sleep(1000);
}
Console.ReadKey();
busControl.Stop();
}
}
message consumer.
class Program
{
static void Main(string[] args)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
rabbit.ReceiveEndpoint(host, "publisher", conf =>
{
conf.Consumer<Consumer>();
});
});
busControl.Start();
Console.ReadKey();
busControl.Stop();
}
}
public class Consumer : IConsumer<MessageCreated>
{
public Task Consume(ConsumeContext<MessageCreated> context)
{
MessageCreated message = context.Message;
Console.WriteLine(message.Text);
context.Publish(new MessagePublished
{
Text = message.Text,
});
return Task.FromResult(context.Message);
}
}
UPDATED Based on @maldworth answer I changed my code for the following. But problem not solved.
class Program
{
public static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
Console.ReadKey();
}
private static async Task<IScheduler> CreateSchedulerAsync()
{
var schedulerFactory = new StdSchedulerFactory();
var scheduler = await schedulerFactory.GetScheduler();
return scheduler;
}
static async Task MainAsync(string[] args)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(async cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
var scheduler = await CreateSchedulerAsync();
cfg.ReceiveEndpoint("quartz", e =>
{
cfg.UseMessageScheduler(e.InputAddress);
e.Consumer(() => new ScheduleMessageConsumer(scheduler));
e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
});
cfg.ReceiveEndpoint(host, "publisher", conf =>
{
conf.Consumer<PublisherConsumer>();
});
cfg.ReceiveEndpoint(host, "subscriber", conf =>
{
conf.Consumer<SubscriberConsumer>();
});
});
busControl.Start();
for (int i = 0; i < 1000000; i++)
{
var text = $"message {i}";
Console.WriteLine($"Schedule: {text}");
await busControl.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),
DateTime.Now.AddSeconds(30),
new ScheduleMessage()
{
Text = text
});
Thread.Sleep(10000);
}
Console.ReadKey();
busControl.Stop();
}
}
public class PublisherConsumer : IConsumer<ScheduleMessage>
{
public Task Consume(ConsumeContext<ScheduleMessage> context)
{
Console.WriteLine($"In Publisher: {context.Message.Text}");
context.Publish(new PublishMessage
{
Text = context.Message.Text,
});
return Task.FromResult(context.Message);
}
}
public class SubscriberConsumer : IConsumer<PublishMessage>
{
public Task Consume(ConsumeContext<PublishMessage> context)
{
Console.WriteLine($"In Subscriber: {context.Message.Text}");
return Task.FromResult(context.Message);
}
}
And App.config file content is:
<configSections>
<section name="quartz" type="System.Configuration.NameValueSectionHandler, System, Version=1.0.5000.0,Culture=neutral, PublicKeyToken=b77a5c561934e089" />
</configSections>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.2" />
</startup>
<quartz>
<add key="quartz.scheduler.instanceName" value="MassTransit-Quartz" />
<add key="quartz.scheduler.instanceId" value="AUTO" />
<add key="quartz.threadPool.type" value="Quartz.Simpl.SimpleThreadPool, Quartz" />
<add key="quartz.threadPool.threadCount" value="4" />
<add key="quartz.jobStore.misfireThreshold" value="60000" />
<add key="quartz.serializer.type" value="binary" />
<add key="quartz.jobStore.type" value="Quartz.Impl.AdoJobStore.JobStoreTX, Quartz" />
<add key="quartz.jobStore.useProperties" value="false" />
<add key="quartz.jobStore.driverDelegateType" value="Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz" />
<add key="quartz.jobStore.clustered" value="true" />
<add key="quartz.jobStore.tablePrefix" value="QRTZ_" />
<add key="quartz.jobStore.dataSource" value="quartzDS" />
<add key="quartz.dataSource.quartzDS.connectionString" value="Server=.;Database=QuartzDB;Integrated Security=SSPI" />
<add key="quartz.dataSource.quartzDS.provider" value="SqlServer" />
</quartz>
Upvotes: 1
Views: 3100
Reputation: 547
You're almost there, just update the logic with this:
busControl.Start();
scheduler.JobFactory = new MassTransitJobFactory(busControl);
scheduler.Start().Wait();
Console.ReadKey();
busControl.Stop();
Upvotes: 1
Reputation: 111
So you don't mention if you have a 3rd consumer running (in a 3rd console app). For scheduling to work with Quartz, you need the 3rd consumer specifically just for quartz. It has to be running, and in this case the quartz receive endpoint would be listening to the "quartz" queue.
[Updated]
Here's a sample of the configuration you will want for the 3rd console app (the quartz service):
var scheduler = CreateScheduler();
configurator.ReceiveEndpoint("quartz", e =>
{
configurator.UseMessageScheduler(e.InputAddress);
e.Consumer(() => new ScheduleMessageConsumer(scheduler));
e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
});
...
private static IScheduler CreateScheduler()
{
ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
var scheduler = schedulerFactory.GetScheduler();
return scheduler;
}
And you will also need to configure quartz to us a store (SQLite, MSSql, RAM if you want to test in memory). Please see the example configuration here.
[/Updated]
[Updated2]
Somebody posted a similar question in the groups. Fortunately, they provide a sample github with a bunch of different MT functionality, one of which is the separate scheduler. Please take a look, that should have everything you need.
[Updated2]
If you want to test without running a full blown quartz scheduler, then you can use an InMemory scheduler. But that is for testing purposes only, you shouldn't use that in production.
Also, in your first code snippit, you don't need to get the send endpoint for the scheduler: var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));
Because, in the bus configuration, rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));
indicates that anytime you call ScheduleSend (from a ConsumeContext, or IBus/IBusControl), it will always use that /quartz address.
And lastly, this line await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),
, you can then change to busControl.ScheduleSend(...)
Upvotes: 0