Inspirited_Coder
Inspirited_Coder

Reputation: 31

ZeroMQ PUSH/PULL communication doesn't work via IPC, but it works via TCP

It was a task from a book called "Node.js 8 the Right Way". You can see it below:

https://pp.userapi.com/c850628/v850628437/164aec/sFdn3icXb7Y.jpg

https://pp.userapi.com/c850628/v850628437/164ae4/4x02I_it_v8.jpg

That's my solution:

'use strict';
const zmq = require('zeromq');
const cluster = require('cluster');

const push = zmq.socket('push');
const pull = zmq.socket('pull');

const cores_num = require('os').cpus().length;
let workers_num = 0;

push.bind('tcp://127.0.0.1:9998');
pull.bind('tcp://127.0.0.1:9999');
// push.bind('ipc://push.ipc');
// pull.bind('ipc://pull.ipc');

if (cluster.isMaster) {
  for (let j = 0; j < cores_num; j++) {
    cluster.fork();
  }

  cluster.on('online', (worker) => {
    console.log(`Worker with pid ${worker.process.pid} is created!`);
  });

  pull.on('message', (data) => {
    const response = JSON.parse(data.toString());

    if (response.type === 'ready') {
      if (workers_num >= 0 && workers_num < 3) {
        workers_num++;

        if (workers_num == 3) {
          console.log('Ready!');

          for (let i = 0; i < 10; i++) {
            push.send(JSON.stringify({
              type: 'job',
              data: `This message has id ${i}`
            }));
          }
        }
      }
    } else if (response.type === 'result') {
      console.log(data.toString());
    }
  });
} else {
  const worker_push = zmq.socket('push');
  const worker_pull = zmq.socket('pull');

  worker_pull.connect('tcp://127.0.0.1:9998');
  worker_push.connect('tcp://127.0.0.1:9999');
  // worker_pull.connect('ipc://push.ipc');
  // worker_push.connect('ipc://pull.ipc');

  worker_push.send(JSON.stringify({
    type: 'ready'
  }));

  worker_pull.on('message', data => {
    const request = JSON.parse(data);

    if (request.type === 'job') {
      console.log(`Process ${process.pid} got message ${request.data}`);
      worker_push.send(JSON.stringify({
        type: 'result',
        data: `This message is a response from process ${process.pid}`,
        time: Date.now()
      }));
    }
  });
}

As you can see, it works only when PUSH/PULL sockets and workers communicate via TCP, but I want to know the reason why it doesn't work via IPC.

Update ( ref: Condition 4 below "pathname must be writeable" ): enter image description here I hope that you will help me with finding a problem.

Upvotes: 2

Views: 1591

Answers (2)

jamesdillonharvey
jamesdillonharvey

Reputation: 1042

Few things:

Your IPC path is incorrect:

You have ipc://push.ipc (2 slashes) you really need ipc:///push.ipc The protocol is ipc:// then you need the file path /push.ipc

File permissions:

Does your process have permission to write to the root directory? Unless you are running as root I would think not.

I would change the path to something like /tmp/push.ipc which in most systems is writable by all users.

In which case your url should be:

ipc:///tmp/push.ipc

Forking

I do not use node at all but based on my knowledge of other languages forking I think the whole program is run again in the different process/thread.

In this case isn't each worker trying to bind() again as the socket creation/bind code is outside of the if (cluster.isMaster) {

It should look like this I think

if (cluster.isMaster) {

  const push = zmq.socket('push');
  const pull = zmq.socket('pull');
  push.bind('ipc://push.ipc');
  pull.bind('ipc://pull.ipc');
  ....
}

Upvotes: 1

user3666197
user3666197

Reputation: 1

I want to know the reason why it doesn't work via IPC.

There are a few conditions for using ipc:// transport-class for using the ZeroMQ Scalable Formal Communication Archetypes and get .bind()/.connect()-ed

1) The inter-process transport passes messages between local processes using a system-dependent IPC mechanism. The inter-process transport is currently only implemented on operating systems that provide UNIX domain sockets.

2) Both the .bind()-side and the .connect()-side need to meet at some feasible address:

          push.bind(    'ipc://push.sock'     ); // will never meet its counterparty
 // ------------------(--|||://^v^v^v^v^v^v^v )
   worker_pull.connect( 'ipc:///tmp/push.sock'); //         if used other ipc://-address

3) If any second process binds to an endpoint already bound by a process, this will succeed and the first process will lose its binding. In this behaviour, the ipc:// transport-class is not consistent with the tcp:// or inproc:// transport-classes.

4) The endpoint-address pathname must be writable by the process. When the endpoint starts with /, e.g., ipc:///pathname, this will be an absolute pathname. If the endpoint specifies a directory that does not exist, the bind shall fail.

5) When the endpoint-address pathname starts with @, the abstract namespace shall be used. The abstract namespace is independent of the filesystem and if a process attempts to bind an endpoint already bound by a process, it will fail. See unix(7) for details.

6) ipc:// transport-class pathnames have a maximum size that depends on the operating system. On Linux, the maximum is 113 characters including the "ipc://" prefix (107 characters for the real path name).

7) When a wild-card * endpoint specification was used in the zmq_bind(), the caller should use real endpoint obtained from the ZMQ_LAST_ENDPOINT socket option to unbind this endpoint from a socket using zmq_unbind().

Upvotes: 1

Related Questions