Jesper
Jesper

Reputation: 383

Receive multipart messages in C with ZeroMQ

I am writing a data recording system for a device that will transmit data using the ZeroMQ socket and publish multipart messages.

My program will need to be written in C.

I did manage to write a Python scripts which works exactly as it should. This looks like this:

import zmq
import time
import numpy as np

_HEARTBEAT_PREFIX =     b'ray.heartbeat\0'  
_RAW_DATA_PREFIX =      b'ray.rawdata\0'

context = zmq.Context.instance()

time.sleep(5)

socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(5)
socket.connect("tcp://localhost:50290")
time.sleep(5)
go = True
while go:

    if socket.poll(1000):

        zframes = socket.recv_multipart(flags=zmq.NOBLOCK, copy=False)

        if (zframes[0].buffer.tobytes() == _HEARTBEAT_PREFIX):
            print ('Heartbeart')
        elif (zframes[0].buffer.tobytes() == _RAW_DATA_PREFIX):
            dataToWrite = np.frombuffer(zframes[1].buffer, dtype='i2')
            print ('Raw data: ')
            # and so on ...

This is the first time I am working with ZMQ and trying to port this Python code to C is a real challenge. The C code I have written now will run, but does not work properly:

#include "zhelpers.h"

int main (int argc, char *argv [])
{

  void *context   = zmq_ctx_new();
  void *subscriber = zmq_socket(context,ZMQ_SUB);

  int rc = zmq_connect(subscriber,"tcp://localhost:50290");
  assert(rc == 0);
  rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, "", 0);
  assert(rc == 0); 

  while (1) 
  {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);
    //  Process the message frame


    int size = zmq_msg_size(&message);
    char *string = malloc(size + 1);
    memcpy(string, zmq_msg_data(&message), size);


    zmq_msg_close (&message);

    string[size] = 0;
    printf("Message[%d]: %s\n", size, string);

    if (!zmq_msg_more (&message))
        break;      //  Last message frame
  }
  return 0;
}

This code will however provide the correct size of the incoming data, but when displaying the data it shows as unreadable characters (symbols and so on..) I will need, as I do in Python, to get the incoming data as a string in order to process it further.

Upvotes: 2

Views: 2482

Answers (1)

Jesper
Jesper

Reputation: 383

I found out that the numpy array in Python is converted to a string array containing int's, and they where very difficult values to interpret for the C program.

By manipulating the received data I was able to extract the correct values:

#include "zhelpers.h"

int main (int argc, char *argv [])
{

  int i = 0;

  void *context   = zmq_ctx_new();
  void *subscriber = zmq_socket(context,ZMQ_SUB);

  int rc = zmq_connect(subscriber,"tcp://localhost:50290");
  assert(rc == 0);

  char* filter = "ray.";

  rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, filter, sizeof(filter));
  assert(rc == 0); 
  int nextFrame = 0;

  while (1) 
  {

    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);

    //  Process the message frame

    int size = zmq_msg_size(&message);
    char *string = malloc(size + 1);
    memcpy(string, zmq_msg_data(&message), size);

    if (nextFrame)
    {

      zmq_msg_close (&message);
      printf("Raw data: "); 
      string[size] = 0;
      //printf("Data: %s\n", string);
      for ( i = 0; i < size; i+=2)
      {

        printf("%d ", (int16_t) ((string[i] & 0xff) +((string[i+1] & 0xff)<<8)));

      }


    }
      if (strcmp(string, "ray.rawdata") != 0)
      {
        nextFrame = 0;
        break;
      }
      else 
      {
        nextFrame = 1;

        zmq_msg_close (&message);

        string[size] = 0;
        printf("Prefix: %s\n", string);

        if (!zmq_msg_more (&message))
            break;      //  Last message frame
      }

  }}
  return 0;
}

Note that this only is a problem when receiving numpy arrays from Python - as long as it is a heartbeat string or whatever, there are no problems.

Upvotes: 1

Related Questions