krabcore
krabcore

Reputation: 999

netMQ 4.0 multithreading

I have some problems with multi-threaded server based on netMQ 4.0. I tried to use http://zguide.zeromq.org/cs:mtserver, but there is no context on netMQ 4.0.

I tried:

for (var i = 0; i < workerCount; ++i)
{
    new Thread(() => Worker(connStr.Value)).Start();
}

//...
private void Worker(string connStr)
{
    using (var socket = new DealerSocket(connStr))
    {
        while (true)
        {
            var msg = socket.ReceiveMultipartMessage();
            //...
        }
    }
}

but I get error:

NetMQ.TerminatingException: CheckContextTerminated

and yes, it is terminated.

How can I create context in netMQ 4.0 or how can I create multi-threading server using netMQ 4.0?

Upvotes: 1

Views: 1924

Answers (2)

krabcore
krabcore

Reputation: 999

correct solution:

using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
    {
        workers.Bind("inproc://workers");
            for (var i = 0; i < workerCount; i++)
            {
                new Thread(Worker).Start();
            }
            var prx = new Proxy(clients, workers);
            prx.Start();
            }

private void Worker()
    {
        using (var socket = new ResponseSocket())
        {
            socket.Connect("inproc://workers");
            while (true)
            {
                 //...
            }
        }
    }

Upvotes: 1

VMAtm
VMAtm

Reputation: 28366

If you are using the .NET version 4.0 or higher, the Thread creation approach is outdated and shouldn't be used in such way - if your workerCount is high enough and you aren't providing any scheduler logic, your performance could degrade significantly instead of benefit.

This you can do instead your approach using TPL:

  1. You can easily replace your worker threads with LongRunning tasks.
  2. You probably should introduce the CancellationToken for your workers to stop them correctly.

So your code could be something like this:

/// field in your class
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
{
    workers.Bind("inproc://workers");
    for (var i = 0; i < workerCount; ++i)
    {
        Task.Factory.StartNew(Worker
            , cancellationTokenSource.Token
            , TaskCreationOptions.LongRunning
            , TaskScheduler.Default);
    }
    var prx = new Proxy(clients, workers);
    prx.Start();
}

private void Worker()
{
    using (var socket = new ResponseSocket())
    {
        socket.Connect("inproc://workers");
        while (!cancellationTokenSource.Token.IsCancellationRequested)
        {
             //...
        }
        // Cancel the task and exit
        cancellationTokenSource.Token.ThrowIfCancellationRequested();
    }
}

To simplify it you can pass the CancellationToken as a parameter to your Worker method.

Upvotes: 1

Related Questions