Reputation: 9
I have been working on a codebar recognition project for weeks,. I was asked to use GIGE cameras to recognize the code bars from a PCB and I choosed to use python for the job. So far, I've finished the recognition of codebars from a picture with Opencv. The problem is how to connect to a GIGE camera and grab a photo with My program. Unfortunately, I found Opencv doesn't support GIGE camera so I had to choose Halcon instead. However, even though I can use HDevelop to connect and capture the image, I find no solution to link it to my Python program as Halcon program can only be exported as C# or C++
btw, I tried to use pythonnet and ironPython, but I don't how could I use them to execute a C# script(.cs file)
Upvotes: 0
Views: 7776
Reputation: 11
I did something similar to the previous poster but noticed that for some reason returning the frame does not work.
def saving_acquisition():
...
image_python = image_processed.get_numpy_3D()
return image_python
img = saving_acquisition()
# This results in an unknown C error.
cv2.imshow('image',img )
If you create a copy of the ndarray and return it, it works for some reason.
def saving_acquisition():
...
image_python = image_processed.get_numpy_3D()
return image_python.copy()
img = saving_acquisition()
# This works.
cv2.imshow('image',img )
Anyone who would be willing to enlighten me on why this is the case would be greatly appreciated.
Upvotes: 1
Reputation: 41
I had the same exact struggle of connecting a GigE camera with Python. Thankfully I found this library called harvesters. Using harvesters and OpenCV you can capture images from GigE cameras and process them. You can see their documentation Here.
To connect to a camera using Harvesters you first need to install the library:
pip install harvesters
After this you will need a GenTL producer that grabs the images. You will use whatever producers .CTI file you download. Personally I use Matrix Vision mvAquire as it is free and does not have a vendor lock. See more info on it here CTI FILE INFORMATION
After doing those prerequisites you can connect to a camera by doing the following:
from harvesters.core import Harvester
h = Harvester()
h.add_file('path/to/foo.cti')
h.update()
ia = h.create(0) # Connect to first camera in device_info_list
ia.start()
with ia.fetch_buffer() as buffer:
component = buffer.payload.components[0]
_2d = component.data.reshape(component.height,component.width, int(component.num_components_per_pixel))
# Do any processing on the image data here...
ia.stop()
ia.destory()
h.reset()
Upvotes: 1
Reputation: 21
I was struggling a lot with this, but I found this method by accident. I have an IDS industrial vision camera (IDS GV-5860-CP) which has a supported Python library. The IDS Peak IPL SDK has an extension to convert the image to a NumPy 3D array.
My code makes connection with the camera and accesses the datastream of the Camera. This datastream fills the buffer with data that is converted to an image. This conversion needs to be known RGB formats. That data is written in an RGB format that is shaped in arrays. Those arrays can be turn in to a NumPy 3D array. This array is accessible for OpenCV and can be showed as an image.
Most of the Gige Vision camera's work with buffers. Be cautious because buffers can cause delay. If the acquired buffer is converted to an image (NOT WRITTEN, WRITING AN IMAGE TAKES A LOT OF PROCESING POWER), the converted image only needs to be changed in a NumPy 3D array to acquire your image that can be shown in the OpenCV window.
This is my code with the IDS industrial Camera, hopefully it can help by your own project.
My code:
import numpy as np
import cv2
import sys
from ids_peak import ids_peak as peak
from ids_peak_ipl import ids_peak_ipl as ipl
from ids_peak import ids_peak_ipl_extension
m_device = None
m_dataStream = None
m_node_map_remote_device = None
out = None
def open_camera():
print("connection- camera")
global m_device, m_node_map_remote_device
try:
# Create instance of the device manager
device_manager = peak.DeviceManager.Instance()
# Update the device manager
device_manager.Update()
# Return if no device was found
if device_manager.Devices().empty():
return False
# open the first openable device in the device manager's device list
device_count = device_manager.Devices().size()
for i in range(device_count):
if device_manager.Devices()[i].IsOpenable():
m_device = device_manager.Devices()[i].OpenDevice(peak.DeviceAccessType_Control)
# Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
min_frame_rate = 0
max_frame_rate = 50
inc_frame_rate = 0
# Get frame rate range. All values in fps.
min_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Minimum()
max_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Maximum()
if m_node_map_remote_device.FindNode("AcquisitionFrameRate").HasConstantIncrement():
inc_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Increment()
else:
# If there is no increment, it might be useful to choose a suitable increment for a GUI control element (e.g. a slider)
inc_frame_rate = 0.1
# Get the current frame rate
frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Value()
# Set frame rate to maximum
m_node_map_remote_device.FindNode("AcquisitionFrameRate").SetValue(max_frame_rate)
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by connection camera")
return False
def prepare_acquisition():
print("opening stream")
global m_dataStream
try:
data_streams = m_device.DataStreams()
if data_streams.empty():
print("no stream possible")
# no data streams available
return False
m_dataStream = m_device.DataStreams()[0].OpenDataStream()
print("open stream")
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by prep acquisition")
return False
def set_roi(x, y, width, height):
print("setting ROI")
try:
# Get the minimum ROI and set it. After that there are no size restrictions anymore
x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
w_min = m_node_map_remote_device.FindNode("Width").Minimum()
h_min = m_node_map_remote_device.FindNode("Height").Minimum()
m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
m_node_map_remote_device.FindNode("Width").SetValue(w_min)
m_node_map_remote_device.FindNode("Height").SetValue(h_min)
# Get the maximum ROI values
x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
w_max = m_node_map_remote_device.FindNode("Width").Maximum()
h_max = m_node_map_remote_device.FindNode("Height").Maximum()
if (x < x_min) or (y < y_min) or (x > x_max) or (y > y_max):
print("Error x and y values")
return False
elif (width < w_min) or (height < h_min) or ((x + width) > w_max) or ((y + height) > h_max):
print("Error width and height")
return False
else:
# Now, set final AOI
m_node_map_remote_device.FindNode("OffsetX").SetValue(x)
m_node_map_remote_device.FindNode("OffsetY").SetValue(y)
m_node_map_remote_device.FindNode("Width").SetValue(width)
m_node_map_remote_device.FindNode("Height").SetValue(height)
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by setting ROI")
print(str_error)
return False
def alloc_and_announce_buffers():
print("allocating buffers")
try:
if m_dataStream:
# Flush queue and prepare all buffers for revoking
m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
# Clear all old buffers
for buffer in m_dataStream.AnnouncedBuffers():
m_dataStream.RevokeBuffer(buffer)
payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
# Get number of minimum required buffers
num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
# Alloc buffers
for count in range(num_buffers_min_required):
buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
m_dataStream.QueueBuffer(buffer)
return True
except Exception as e:
# ...
str_error = str(e)
print("Error by allocating buffers")
print(str_error)
return False
def start_acquisition():
print("Start acquisition")
try:
m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
return True
except Exception as e:
# ...
str_error = str(e)
print(str_error)
return False
def saving_acquisition():
fourcc = cv2.VideoWriter_fourcc('W','M','V','2')
out = cv2.VideoWriter( "video", fourcc, 50, (1936, 1096))
while True:
try:
# Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
buffer = m_dataStream.WaitForFinishedBuffer(5000)
image = ids_peak_ipl_extension.BufferToImage(buffer)
# Create IDS peak IPL image for debayering and convert it to RGBa8 format
image_processed = image.ConvertTo(ipl.PixelFormatName_BGR8)
# Queue buffer again
m_dataStream.QueueBuffer(buffer)
image_python = image_processed.get_numpy_3D()
frame = image_python
out.write(frame)
cv2.imshow('videoview',frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
except Exception as e:
# ...
str_error = str(e)
print("Error by saving acquisition")
print(str_error)
return False
def main():
# initialize library
peak.Library.Initialize()
if not open_camera():
# error
sys.exit(-1)
if not prepare_acquisition():
# error
sys.exit(-2)
if not alloc_and_announce_buffers():
# error
sys.exit(-3)
if not start_acquisition():
# error
sys.exit(-4)
if not saving_acquisition():
out.release()
cv2.destroyAllWindows()
print("oke")
# error
peak.Library.Close()
print('executed')
sys.exit(0)
if __name__ == '__main__':
main()
Upvotes: 1