Liu Alex
Liu Alex

Reputation: 9

How can I use Python to read and capture images from a GIGE camera?

I have been working on a codebar recognition project for weeks,. I was asked to use GIGE cameras to recognize the code bars from a PCB and I choosed to use python for the job. So far, I've finished the recognition of codebars from a picture with Opencv. The problem is how to connect to a GIGE camera and grab a photo with My program. Unfortunately, I found Opencv doesn't support GIGE camera so I had to choose Halcon instead. However, even though I can use HDevelop to connect and capture the image, I find no solution to link it to my Python program as Halcon program can only be exported as C# or C++

btw, I tried to use pythonnet and ironPython, but I don't how could I use them to execute a C# script(.cs file)

Upvotes: 0

Views: 7776

Answers (3)

Daeroin
Daeroin

Reputation: 11

I did something similar to the previous poster but noticed that for some reason returning the frame does not work.

def saving_acquisition():  
  ...
  image_python = image_processed.get_numpy_3D()
  return image_python

img = saving_acquisition()

# This results in an unknown C error.
cv2.imshow('image',img )

If you create a copy of the ndarray and return it, it works for some reason.

def saving_acquisition():  
  ...
  image_python = image_processed.get_numpy_3D()
  return image_python.copy()

img = saving_acquisition()

# This works.
cv2.imshow('image',img )

Anyone who would be willing to enlighten me on why this is the case would be greatly appreciated.

Upvotes: 1

baileyhelfer
baileyhelfer

Reputation: 41

I had the same exact struggle of connecting a GigE camera with Python. Thankfully I found this library called harvesters. Using harvesters and OpenCV you can capture images from GigE cameras and process them. You can see their documentation Here.

To connect to a camera using Harvesters you first need to install the library:

pip install harvesters

After this you will need a GenTL producer that grabs the images. You will use whatever producers .CTI file you download. Personally I use Matrix Vision mvAquire as it is free and does not have a vendor lock. See more info on it here CTI FILE INFORMATION

After doing those prerequisites you can connect to a camera by doing the following:

from harvesters.core import Harvester
h = Harvester()
h.add_file('path/to/foo.cti')
h.update()
ia = h.create(0) # Connect to first camera in device_info_list

ia.start()
with ia.fetch_buffer() as buffer:
            component = buffer.payload.components[0]
            _2d = component.data.reshape(component.height,component.width, int(component.num_components_per_pixel))
            # Do any processing on the image data here...

ia.stop()
ia.destory()
h.reset()

Upvotes: 1

MaixmNlandu
MaixmNlandu

Reputation: 21

I was struggling a lot with this, but I found this method by accident. I have an IDS industrial vision camera (IDS GV-5860-CP) which has a supported Python library. The IDS Peak IPL SDK has an extension to convert the image to a NumPy 3D array.

My code makes connection with the camera and accesses the datastream of the Camera. This datastream fills the buffer with data that is converted to an image. This conversion needs to be known RGB formats. That data is written in an RGB format that is shaped in arrays. Those arrays can be turn in to a NumPy 3D array. This array is accessible for OpenCV and can be showed as an image.

Most of the Gige Vision camera's work with buffers. Be cautious because buffers can cause delay. If the acquired buffer is converted to an image (NOT WRITTEN, WRITING AN IMAGE TAKES A LOT OF PROCESING POWER), the converted image only needs to be changed in a NumPy 3D array to acquire your image that can be shown in the OpenCV window.

This is my code with the IDS industrial Camera, hopefully it can help by your own project.

My code:

import numpy as np 
import cv2
import sys

from ids_peak import ids_peak as peak
from ids_peak_ipl import ids_peak_ipl as ipl
from ids_peak import ids_peak_ipl_extension






m_device = None
m_dataStream = None
m_node_map_remote_device = None
out = None


def open_camera():
  print("connection- camera")
  global m_device, m_node_map_remote_device
  try:
      # Create instance of the device manager
    device_manager = peak.DeviceManager.Instance()
 
      # Update the device manager
    device_manager.Update()
 
      # Return if no device was found
    if device_manager.Devices().empty():
      return False
 
      # open the first openable device in the device manager's device list
    device_count = device_manager.Devices().size()
    for i in range(device_count):
        if device_manager.Devices()[i].IsOpenable():
            m_device = device_manager.Devices()[i].OpenDevice(peak.DeviceAccessType_Control)
 
              # Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
            m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
            min_frame_rate = 0
            max_frame_rate = 50
            inc_frame_rate = 0

            
            # Get frame rate range. All values in fps.
            min_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Minimum()
            max_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Maximum()
            
            if m_node_map_remote_device.FindNode("AcquisitionFrameRate").HasConstantIncrement():
                inc_frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Increment()
            else:
                # If there is no increment, it might be useful to choose a suitable increment for a GUI control element (e.g. a slider)
                inc_frame_rate = 0.1
            
            # Get the current frame rate
            frame_rate = m_node_map_remote_device.FindNode("AcquisitionFrameRate").Value()
            
            # Set frame rate to maximum
            m_node_map_remote_device.FindNode("AcquisitionFrameRate").SetValue(max_frame_rate)
        return True
  except Exception as e:
      # ...
    str_error = str(e)
    print("Error by connection camera")
    return False
 
 
def prepare_acquisition():
  print("opening stream")
  global m_dataStream

  try:
    data_streams = m_device.DataStreams()
    if data_streams.empty():
      print("no stream possible")
      # no data streams available
      return False
 
    m_dataStream = m_device.DataStreams()[0].OpenDataStream()
    print("open stream")
 
    return True
  except Exception as e:
      # ...
      str_error = str(e)
      print("Error by prep acquisition")
      return False
 
 
def set_roi(x, y, width, height):
  print("setting ROI")
  try:
      # Get the minimum ROI and set it. After that there are no size restrictions anymore
    x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
    y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
    w_min = m_node_map_remote_device.FindNode("Width").Minimum()
    h_min = m_node_map_remote_device.FindNode("Height").Minimum()
 
    m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
    m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
    m_node_map_remote_device.FindNode("Width").SetValue(w_min)
    m_node_map_remote_device.FindNode("Height").SetValue(h_min)
 
      # Get the maximum ROI values
    x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
    y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
    w_max = m_node_map_remote_device.FindNode("Width").Maximum()
    h_max = m_node_map_remote_device.FindNode("Height").Maximum()
 
    if (x < x_min) or (y < y_min) or (x > x_max) or (y > y_max):
      print("Error x and y values")
      return False
    elif (width < w_min) or (height < h_min) or ((x + width) > w_max) or ((y + height) > h_max):
      print("Error width and height")
      return False
    else:
          # Now, set final AOI
        m_node_map_remote_device.FindNode("OffsetX").SetValue(x)
        m_node_map_remote_device.FindNode("OffsetY").SetValue(y)
        m_node_map_remote_device.FindNode("Width").SetValue(width)
        m_node_map_remote_device.FindNode("Height").SetValue(height)
 
        return True
  except Exception as e:
      # ...
       str_error = str(e)
       print("Error by setting ROI")
       print(str_error)
       return False
 
 
def alloc_and_announce_buffers():
  print("allocating buffers")
  try:
    if m_dataStream:
          # Flush queue and prepare all buffers for revoking
        m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
 
          # Clear all old buffers
        for buffer in m_dataStream.AnnouncedBuffers():
            m_dataStream.RevokeBuffer(buffer)
 
        payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
 
          # Get number of minimum required buffers
        num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
 
          # Alloc buffers
        for count in range(num_buffers_min_required):
            buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
            m_dataStream.QueueBuffer(buffer)
 
        return True
  except Exception as e:
      # ...
    str_error = str(e)
    print("Error by allocating buffers")
    print(str_error)
    return False
 
 
def start_acquisition():
  print("Start acquisition")

  try:
    m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
    m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
    m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
       
    return True
  except Exception as e:
      # ...
      str_error = str(e)
      print(str_error)
      return False

def saving_acquisition():  
  fourcc = cv2.VideoWriter_fourcc('W','M','V','2')
  out = cv2.VideoWriter( "video", fourcc, 50, (1936,  1096))
  while True:
    try:
      
      # Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
      buffer = m_dataStream.WaitForFinishedBuffer(5000)

      image = ids_peak_ipl_extension.BufferToImage(buffer)
        
      # Create IDS peak IPL image for debayering and convert it to RGBa8 format
            
      image_processed = image.ConvertTo(ipl.PixelFormatName_BGR8)
      # Queue buffer again
      m_dataStream.QueueBuffer(buffer)
        
      image_python = image_processed.get_numpy_3D()

      frame = image_python
    
      out.write(frame)
      cv2.imshow('videoview',frame)
      
      key = cv2.waitKey(1)
      if key == ord('q'):
        break

      
    except Exception as e:
      # ...
      str_error = str(e)
      print("Error by saving acquisition")
      print(str_error)
      return False

 
def main():
  
  # initialize library
  peak.Library.Initialize()
 
  if not open_camera():
    # error
    sys.exit(-1)
 
  if not prepare_acquisition():
    # error
    sys.exit(-2)
 
  if not alloc_and_announce_buffers():
    # error
    sys.exit(-3)
 
  if not start_acquisition():
    # error
    sys.exit(-4)

  if not saving_acquisition():
    out.release()
    cv2.destroyAllWindows()
    print("oke")
    # error
 
  peak.Library.Close()
  print('executed')
  sys.exit(0)
 
if __name__ == '__main__':
  main()

Upvotes: 1

Related Questions