Mahesh
Mahesh

Reputation: 731

Azure service bus topic receive message with session with event driven architecture model

I am using azure service bus topic. as i have large message so i am splitting large message and sending them in small- small messages with sessionid and split order. I want my receiver to have event driven architecture. as i have to receive all messages with same sessionid and have to aggregate them with proper split order.bellow is my code. but only first time i am getting message from bellow code. in second message it timeouts.

         public class CRMESBListener : RoleEntryPoint
            {
                private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

            public override void Run()
            {
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running");
                try
                {
                    DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner();
                    dbMessageListener.Listen();
                    runCompleteEvent.WaitOne();
                    //this.RunAsync(this.cancellationTokenSource.Token).Wait();
                }
                finally
                {
                    this.runCompleteEvent.Set();
                }
            }

            public override bool OnStart()
            {
                // Set the maximum number of concurrent connections
                ServicePointManager.DefaultConnectionLimit = 12;

                // For information on handling configuration changes
                // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.


                bool result = base.OnStart();
                Bootstrapper.Init();

                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started");

                return result;
            }

            public override void OnStop()
            {
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping");

                this.cancellationTokenSource.Cancel();
                this.runCompleteEvent.WaitOne();

                base.OnStop();

                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped");
            }

            private async Task RunAsync(CancellationToken cancellationToken)
            {
                // TODO: Replace the following with your own logic.
                while (!cancellationToken.IsCancellationRequested)
                {
                    Trace.TraceInformation("Working");
                    await Task.Delay(1000);
                }
            }
        }













      public class DBMessageListener
        {
            #region Member Variables

            private static DBMessageListener dbMessageListner;
            private static object lockObject = new object();
            private TopicSubscribeClientWrapper accountTopicClient;

            private NamespaceManager namespaceManager;
            private OnMessageOptions eventDrivenMessagingOptions;

            private int crmIntegrationUserID = Common.CrmCurrentUser.UserID;

            #endregion Member Variables

            #region Constructors

            private DBMessageListener()
            {
                string subscriptionName = "AllMessages";
                namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString);

                if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName))
                {
                    namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName);
                }
                accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath);
                accountTopicClient.SubscriptionName = subscriptionName;



                eventDrivenMessagingOptions = new OnMessageOptions
                {
                    AutoComplete = true
                };

                eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived;
                eventDrivenMessagingOptions.MaxConcurrentCalls = 5;
            }

            #endregion Constructors

            #region Methods

            private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message)
            {
                if (message != null)
                {
                    try
                    {
                        await ProcessDBMessage(message.GetBody<ServiceBusMessage>());
                    }
                    catch (Exception ex)
                    {
                        //log exception
                    }
                }

            }

            private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e)
            {
                if (e != null && e.Exception != null)
                {

                }
            }

            private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message)
            {

    //process message           
            }

            public static DBMessageListener GetDBMessageListner()
            {
                if (dbMessageListner == null)
                {
                    lock (lockObject)
                    {
                        if (dbMessageListner == null)
                        {
                            dbMessageListner = new DBMessageListener();
                        }
                    }
                }

                return dbMessageListner;
            }

            public void Listen()
            {
                accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions);

            }

            #endregion Methods
        }  


 public class TopicSubscribeClientWrapper : IServiceBusClientWrapper
    {
        #region Member Variables

        private readonly string _connectionString;
        private readonly string _topicName;
        private readonly TopicClient _topicClient;
        private SubscriptionClient _subscriptionClient;

        #endregion Member Variables

        #region Properties

        public string SubscriptionName { get; set; }

        #endregion Properties

        #region Constructors

        public TopicSubscribeClientWrapper(string connectionString, string topicName)
        {
            _connectionString = connectionString;
            _topicName = topicName;
            _topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
        }

        #endregion Constructors

        #region Event Handlers

        public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions)
        {

            _subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName);

           // _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);

            MemoryStream largeMessageStream = new MemoryStream();
            MessageSession session = _subscriptionClient.AcceptMessageSession();

            while (true)
            {
                BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));

                if (subMessage != null)
                {
                    Stream subMessageStream = subMessage.GetBody<Stream>();
                    subMessageStream.CopyTo(largeMessageStream);

                    subMessage.Complete();
                    //Console.Write(".");
                }
                else
                {
                    //Console.WriteLine("Done!");
                    break;
                }
            }

            BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);
            var message = onMessageCallback.Method.GetParameters();
            message.SetValue(largeMessage, 1);
            _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);

        }

        #endregion Event Handlers

        #region Methods

        public Task SendAsync(BrokeredMessage message)
        {
            return _topicClient.SendAsync(message);
        }

        public void Close()
        {
            if (_subscriptionClient != null)
            {
                _subscriptionClient.Close();
            }

            _topicClient.Close();
        }

        #endregion Methods
    }

Upvotes: 0

Views: 300

Answers (1)

Sean Feldman
Sean Feldman

Reputation: 26012

I would suggest to take a different route. Rather than trying to create a session of messages to pass a large message, use claim check pattern that is addressing specifically this issue - large attachments. Write you data to a storage blob and have the URI sent with the message. It would be much simpler to save/restore a blob, rather than trying to send the payload in chunks.Also, this way it's easier to monitor your system (one failed succeeded/failed message associated with one or more blobs) and you don't have to use sessions or anything special.

Upvotes: 1

Related Questions