Reputation: 5
I created two programs to send and receive video feed using ZeroMQ. However, the receiving program always gets stuck on the .recv()
-method.
I have used two libraries of ZeroMQ for this program: one, the native zmq
, the other a derived imagezmq
. The imagezmq
is used for sending and receiving frame data from the video while the native zmq
library is used for sending and receiving the time, when the image has been sent.
The imagezmq
part works fine.
The program only gets stuck on the zmq
part.
Following are my two programs :
FinalCam.py
import struct
import time
import imutils
import imagezmq
import cv2
import zmq
import socket
import pickle
# # image sending
sender = imagezmq.ImageSender(connect_to='tcp://localhost:5555')
hostName = socket.gethostname() # send RPi hostname with each image
vid_dir = "/root/redis-opencv-videostream/vtest.avi"
cap = cv2.VideoCapture(vid_dir) # init the camera
context = zmq.Context() # setup for sending time
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:6666")
while True: # send images as stream until Ctrl-C
ret, frame = cap.read()
frame = imutils.resize(frame, width=400) # resize without compressionq
captureTime = time.time()
sender.send_image(hostName, frame)
print (captureTime)
captureTime = struct.pack('d', captureTime)
#msg = pickle.dumps(captureTime)
print("message primed")
socket.send(captureTime)
print("time sent")
which generated this output :
1591824603.5772414
message primed
time sent
FinalRecieve.py
import cv2
import imagezmq
import time
import zmq
import struct
FRAMES = 5
image_hub = imagezmq.ImageHub() # image socket
context = zmq.Context() # time socket
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:6666")
while True: # show streamed images until Ctrl-C
loopTime = time.time()
for i in range (0, FRAMES):
hostName, frame = image_hub.recv_image()
image_hub.send_reply(b'OK')
print("recieved image, waiting for time")
captureTime = socket.recv()
print("meow")
print(captureTime)
finishTime = time.time()
fpsTime = finishTime - loopTime
fps = FRAMES / fpsTime
print(fps)
which generated this output :
received image, waiting for time
Upvotes: 0
Views: 2539
Reputation: 1
"The
imagezmq
part works fine. The program only gets stuck on thezmq
part. "
ZeroMQ is so smart ( since v2.0+ ) that is does not require anyone to be rigid on whether to { .bind() | .connect() }
at all and one AccessPoint may freely .bind()
some and also .connect()
other TransportClass
-channels for 1:N
communiation topologies as liked & needed ( in other words, the "reversed" .bind()/.connect()
is always possible ).
Not so the ImageZMQ
-spin-off module. There are set of hard-wired choices made behind your domain of control (and any hardcore hijacking the Class-internal attributes { ImageHub.zmq_socket | ImageSender.zmq_socket }
was hardly a dream of the ImageZMQ authors ( architecture-(co)-authors ) ).
Given the imagezmq
published internalities are themselves pre-decided on the hardcoded choices ( the same Class instances, depending on mode, sometimes .bind()
and .connect()
otherwise ) was declared as working, lets focus on using it to its (published) maximum.
Based on the said, compose your working parts so as to send the time "inside" the working scheme :
PAYLOAD_MASK = "{0:}|||{1:}"
...
while ...
sender.send_image( PAYLOAD_MASK.format( time.time(), hostName ),
frame
)
and may easily decode accordingly on .recv()
-side, without a need to repair the used struct.pack()
problems ( In distributed-computing, the more in ZeroMQ interconnected worlds, one does never know, what the remote platform is, the less what byte-ordering ( Endian-convention ) will be assumed "there", so struct.pack()
shall always explicitly declare Endian-type in the format
-string. Always. If not sure, one may design a self-detecting message, that may salvage such naive uses and test / adjust your format
-string for your local-side usage for .unpack()
-method accordingly for either case of any implicitly blind .pack()-sender ... goes beyond the scope of this post, yet all RPi / non-RPi platform projects are prone to this (only)-assumed-"same"-Endian caveat )
If using pyzmq
version 18.., which is not listed in the compatibility list of the ImageZMQ and the use-case is there doomed to crash or cause hidden troubles.
So one ought rather start with zmq.pyzmq_version()
checks (a professional policy in Configuration Management for controlled environments, isn't it?) and catch & resolve any non-matching cases.
zmq
part did not work ?Hard to say. Without a full code, one may just guess. My candidate would be a wrong / missing termination with zmq.LINGER
not explicitly set to zero, which causes hung-up Context()
-instance(s) that block the actual resources until reboot. A proper use of the ZeroMQ tools reflects these methods of defensive-programming ( LINGER
, CONFLATE
, MAXMSGSIZE
, IMMEDIATE
, white-listing and many other defensive .setsockopt()
and Context()
-parametrisation options ), because distributed-computing is complex and may broke on many places outside of your domain of control or Line-of-Sight.
So, do not hesitate to become a defensively-programming designer, if your architected systems are to become robust and self-healing.
aLocalCONTEXT = zmq.Context( nIOthreads )
aSUBsocket = aLocalCONTEXT.socket( zmq.SUB )
try:
aSUBsocket.bind( "tcp://*:6666" )
except:
...
aSUBsocket.setsockopt( zmq.LINGER, 0 )
aSUBsocket.setsockopt( zmq.SUBSCRIBE, b"" )
aSUBsocket.setsockopt( zmq.MAXMSGSIZE, ...)
...
Best re-read the native ZeroMQ API documentation about tuning other ISO-OSI-L2/L3 related paramters for best performance and safest distributed-computing strategies. Worth the time doing this indeed for each new API update, all the way since the v2.1+...
Look at how .send_reply()
method is unprotected from crashing for non-REQ-REP
instances of its own ImageHub
Class.
Why?
The so far non-protected call to the .send_reply()
-method will crash the application once any such call for any ImageHub
-class instance, that was initialised in it non-default REQ_REP = False
( i.e. on a SUB
-side of the PUB/SUB
mode available ).
aNonDefaultImageHUB_instance.send( "this will crash" )
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ms/anaconda2/lib/python2.7/site-packages/zmq/sugar/socket.py", line 395, in send
return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
File "zmq/backend/cython/socket.pyx", line 725, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 772, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 247, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/socket.pyx", line 242, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported
Upvotes: 0
Reputation: 395
Here’s a couple of things to try to get the native-zmq
parts working:
Use .connect()
-method for SUB
-sockets :
socket.connect("tcp://localhost:6666")
And .bind()
-method for your PUB
-sockets :
socket.bind("tcp://*:6666")
It’s explained here in the guide that connect should be used to create an outgoing connection from a socket.
In the sibling doc for .bind()
, it explains that it’s for accepting connections.
Also try setting socket options : socket.setsockopt(zmq.SUBSCRIBE, "")
It is described here in the guide that the SUB
-sockets initially filter out all messages, so that’d explain why you’re not receiving anything. The example above provides an empty filter, which accepts all incoming messages.
It’s important to note that with PUB
and SUB
based distribution, that the SUB
might not necessarily receive messages due to timing of its connection or network conditions. E.g. everything sent from the publisher before the subscriber connects isn’t receivable
Upvotes: 2