Daniel Bulla
Daniel Bulla

Reputation: 53

Zero-copy receive and message size

Is there a way to query the size of the received message, when using zero-copy for receiving?

With this I (try to) achieve zero-copy:

zmq_recv(sock, buf, sizeof(buf), 0);

I also tried:

zmq_msg_t msg;
zmq_msg_init_data (&msg, buf, sizeof(buf), nullptr, NULL);
zmq_msg_recv(&msg, sock, 0);
size_t len = zmq_msg_size(&msg);

This returns correct size, but does not fill buf. I think zmq_msg_init_data is not intended for use with zmq_msg_recv and the message rebuilds on receive.

Upvotes: 4

Views: 3508

Answers (2)

Yessy
Yessy

Reputation: 1352

if you use the JeroMQ (Pure Java for ZeroMQ), you may achieve the zero-copy on the receive side by ZMQ.Socket.setMsgAllocator

Generaly speaking, we just care the zero copy for large object, so you can set a custom message allocator for example:

class MappedMemoryMsgAllocator implements MsgAllocator {

static final MsgAllocatorHeap heap = new MsgAllocatorHeap();
private final int threshold;

public MappedMemoryMsgAllocator(int threshold) {
    this.threshold = threshold;
}

public MappedMemoryMsgAllocator() {
    this(1024 * 1024 * 4);
}

@Override
public Msg allocate(int size) {
    if ((threshold > 0) && (size > threshold)) {
        try {
            return new Msg(UtilsJNI.nativeMapMemory(size, UtilsJNI.PROT_WRITE, UtilsJNI.MAP_PRIVATE));
        } catch (IOException e) {
            Log.d(TAG, "allocate:: fail to mmap", e);
            return null;
        }
    } else {
        return heap.allocate(size);
    }
}

public static void forceFreeMsgArray(zmq.Msg[] msgs) {
    if (msgs != null) {
        for (zmq.Msg msg : msgs) {
            final ByteBuffer buf = msg.buf();
            if (buf.isDirect()) {
                try {
                    UtilsJNI.nativeUnmapMemory(buf);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

native C++ code:

extern "C"
JNIEXPORT jobject
UtilsJNI_nativeMapMemory(JNIEnv *env, jobject clazz, jlong size,
                                              jint protection, jint mapFlags) {
    void *pAddr = mmap64(NULL, size, protection, mapFlags|MAP_ANONYMOUS, -1, 0);
    if (pAddr == NULL) {
        env->ThrowNew(env->FindClass("java/io/IOException"), "fail to mmap");
        return NULL;
    } else {
        return env->NewDirectByteBuffer(pAddr, size);
    }
}

extern "C"
JNIEXPORT void
UtilsJNI_nativeUnmapMemory(JNIEnv *env, jobject clazz, jobject buffer) {
    if (munmap(env->GetDirectBufferAddress(buffer), env->GetDirectBufferCapacity(buffer)) < 0) {
        env->ThrowNew(env->FindClass("java/io/IOException"), "fail to munmap");
    }
} 

Upvotes: 0

rveerd
rveerd

Reputation: 4006

Quoting the guide on zero-copy:

There is no way to do zero-copy on receive: ZeroMQ delivers you a buffer that you can store as long as you wish, but it will not write data directly into application buffers.

Zero-copy is for sending only, not for receiving.

Oh and zmq_recv returns the number of bytes received.

Upvotes: 6

Related Questions