Reputation: 82487
There are occasions that I need to group some of my messages together so that they can be processed in order by my worker processes.
But every single example that I find shows only how to do one message per conversation.
What I am looking for is an example that sends more than one message in a conversation AND a an example that shows how to process them too. (I may be sending them both, but I only seem to be able to get one back out.)
Upvotes: 0
Views: 991
Reputation: 4574
A conversation provides the boundary within which messages are processed in order - so you need to SEND all the messages in the group with the same ConversationId. The way I do this is with a utility table that stores the ConversationId when created so that each time a message is sent, it looks up the appropriate conversationid to send it on.
SELECT @conversationHandle = ConversationHandle FROM Qproc.SessionConversation
WHERE
FromService = @fromService
AND ToService = @toService
AND OnContract = @onContract
AND Terminated IS NULL
IF @conversationHandle IS NULL
BEGIN
BEGIN DIALOG CONVERSATION @conversationHandle
FROM SERVICE @fromService
TO SERVICE @toService
ON CONTRACT @onContract
WITH ENCRYPTION = OFF; --, LIFETIME = 60*60*24*100;
-- Store the ongoing conversation for further use
INSERT INTO QProc.SessionConversation ( FromService, ToService, OnContract,ConversationHandle)
VALUES( @fromService, @toService, @onContract, @conversationHandle )
END
-- Create the dialog timer, timeout is seconds; this will notify the ClientQueue if nothing has happened on the conversation
--in the timeout period
BEGIN CONVERSATION TIMER (@conversationHandle) TIMEOUT = 60*8;
SEND ON CONVERSATION @conversationHandle
MESSAGE TYPE [http://COMPANYNAME/AsyncTriggerRequestMesssage]
(@messageBody);
The reason you only see one message at the other end is to do with conversation group locking - you should read up on this to understand what is going on, but basically once a message processing procedure has seen a message, its view on the message queue is restricted to a single conversation. This won't be an issue once you reuse the same conversationID. Here is an example receive:
DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg VARCHAR(8000);
DECLARE @RecvReqMsgName sysname;
WHILE (1=1)
BEGIN
BEGIN TRANSACTION;
WAITFOR
( RECEIVE TOP(1)
@RecvReqDlgHandle = conversation_handle,
@RecvReqMsg = message_body,
@RecvReqMsgName = message_type_name
FROM QProc.AsyncTaskServiceQueue
), TIMEOUT 500;
IF @@ROWCOUNT=0
BEGIN
ROLLBACK TRANSACTION;
BREAK
END
IF @RecvReqMsgName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @RecvReqDlgHandle;
END
IF @RecvReqMsgName='http://COMPANYNAME/AsyncTriggerRequestMesssage'
BEGIN
DECLARE @BodyDoc XML;
SET @BodyDoc=CONVERT(XML, @RecvReqMsg) ;
EXEC QProc.AsyncTaskRunTask @BodyDoc;
END
COMMIT TRANSACTION;
END
FInally, you'll need to clean those conversations up once they're no longer used, something like this:
DECLARE @conversationHandle UNIQUEIDENTIFIER;
DECLARE @messageTypeName SYSNAME;
BEGIN TRANSACTION;
RECEIVE TOP(1)
@conversationHandle = conversation_handle,
@messageTypeName = message_type_name
FROM QProc.AsyncTaskClientQueue;
IF @conversationHandle IS NOT NULL
BEGIN
--If the DialogTimer message arrives, then there has been no activity on this conversation for a while (see timeout setting in [QProc].[DispatchAsyncTaskMessage])
--so we terminate gracefully and go home.
IF @messageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer'
OR @messageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @conversationHandle;
UPDATE Qproc.SessionConversation SET TERMINATED = getUtcDate() WHERE ConversationHandle = @conversationHandle;
END
END
COMMIT TRANSACTION;
Upvotes: 1