Reputation: 731
I am using azure service bus topic. as i have large message so i am splitting large message and sending them in small- small messages with sessionid and split order. I want my receiver to have event driven architecture. as i have to receive all messages with same sessionid and have to aggregate them with proper split order.bellow is my code. but only first time i am getting message from bellow code. in second message it timeouts.
public class CRMESBListener : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public override void Run()
{
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running");
try
{
DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner();
dbMessageListener.Listen();
runCompleteEvent.WaitOne();
//this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Bootstrapper.Init();
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started");
return result;
}
public override void OnStop()
{
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping");
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
// TODO: Replace the following with your own logic.
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
await Task.Delay(1000);
}
}
}
public class DBMessageListener
{
#region Member Variables
private static DBMessageListener dbMessageListner;
private static object lockObject = new object();
private TopicSubscribeClientWrapper accountTopicClient;
private NamespaceManager namespaceManager;
private OnMessageOptions eventDrivenMessagingOptions;
private int crmIntegrationUserID = Common.CrmCurrentUser.UserID;
#endregion Member Variables
#region Constructors
private DBMessageListener()
{
string subscriptionName = "AllMessages";
namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString);
if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName))
{
namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName);
}
accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath);
accountTopicClient.SubscriptionName = subscriptionName;
eventDrivenMessagingOptions = new OnMessageOptions
{
AutoComplete = true
};
eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived;
eventDrivenMessagingOptions.MaxConcurrentCalls = 5;
}
#endregion Constructors
#region Methods
private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message)
{
if (message != null)
{
try
{
await ProcessDBMessage(message.GetBody<ServiceBusMessage>());
}
catch (Exception ex)
{
//log exception
}
}
}
private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
if (e != null && e.Exception != null)
{
}
}
private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message)
{
//process message
}
public static DBMessageListener GetDBMessageListner()
{
if (dbMessageListner == null)
{
lock (lockObject)
{
if (dbMessageListner == null)
{
dbMessageListner = new DBMessageListener();
}
}
}
return dbMessageListner;
}
public void Listen()
{
accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions);
}
#endregion Methods
}
public class TopicSubscribeClientWrapper : IServiceBusClientWrapper
{
#region Member Variables
private readonly string _connectionString;
private readonly string _topicName;
private readonly TopicClient _topicClient;
private SubscriptionClient _subscriptionClient;
#endregion Member Variables
#region Properties
public string SubscriptionName { get; set; }
#endregion Properties
#region Constructors
public TopicSubscribeClientWrapper(string connectionString, string topicName)
{
_connectionString = connectionString;
_topicName = topicName;
_topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
}
#endregion Constructors
#region Event Handlers
public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions)
{
_subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName);
// _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
MemoryStream largeMessageStream = new MemoryStream();
MessageSession session = _subscriptionClient.AcceptMessageSession();
while (true)
{
BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));
if (subMessage != null)
{
Stream subMessageStream = subMessage.GetBody<Stream>();
subMessageStream.CopyTo(largeMessageStream);
subMessage.Complete();
//Console.Write(".");
}
else
{
//Console.WriteLine("Done!");
break;
}
}
BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);
var message = onMessageCallback.Method.GetParameters();
message.SetValue(largeMessage, 1);
_subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
}
#endregion Event Handlers
#region Methods
public Task SendAsync(BrokeredMessage message)
{
return _topicClient.SendAsync(message);
}
public void Close()
{
if (_subscriptionClient != null)
{
_subscriptionClient.Close();
}
_topicClient.Close();
}
#endregion Methods
}
Upvotes: 0
Views: 300
Reputation: 26012
I would suggest to take a different route. Rather than trying to create a session of messages to pass a large message, use claim check pattern that is addressing specifically this issue - large attachments. Write you data to a storage blob and have the URI sent with the message. It would be much simpler to save/restore a blob, rather than trying to send the payload in chunks.Also, this way it's easier to monitor your system (one failed succeeded/failed message associated with one or more blobs) and you don't have to use sessions or anything special.
Upvotes: 1