Danish Ajaib
Danish Ajaib

Reputation: 93

How to decode image data from camera stream?

I am connected to a HikVision camera stream and I am able to isolate picture data but I am not able to convert that data into an image. If I save the binary data to an image file, the file doesn't show any image. If I use Pillow , it says no image identified.

Here is the image data : https://drive.google.com/file/d/1z1TfPb88Ng5qkXWZZ9QfrZR4H5fh43pg/view?usp=sharing ..

The text is too long for stackoverflow or pastebin..

This is how I am getting the data from the stream:

with requests.get(url, auth=auth, headers=headers, stream=True) as response:
   
 

      
        response_text = ''
        license_plate = ''
        direction = ''
        color = ''
        is_picture = False
        picture_data = b''
    
        for line in response.iter_lines():
            if not line:
                continue
    
            line_str = str(line)
    
            if 'Content-Type: image/jpeg' in line_str:
                is_picture = True
                picture_data = b''
            elif is_picture:
                picture_data += line
    
            response_text += line.decode('utf-8', errors='ignore')
            
            if '--boundary' not in response_text:
                continue
            
    
            if is_picture:
                with open('picture_data.txt', 'wb') as f:
                    f.write(picture_data)
                print(picture_data)
                print('picture saved')
                is_picture = False
                # picture_data = b''
            response_text = response_text.rstrip('--boundary')
            split_text = response_text.split('<EventNotificationAlert')
            if len(split_text) > 1:
                notification_alert = '<EventNotificationAlert' + split_text[1]
                event_type = '<eventType></eventType>'
                event_type = re.search('<eventType>(.*?)</eventType>', notification_alert).group(1)
                if event_type == 'ANPR':
                    with open('test.xml', 'a') as f:
                        f.writelines(notification_alert)
                else:
                    print('not anpr')
            else:
                print("'<EventNotificationAlert' not found in response_text")
                
            response_text = ''

And this is how I am trying to decode the picture_data:

bytes = picture_data
a = bytes.find(b'\xff\xd8')
b = bytes.find(b'\xff\xd9')
if a != -1 and b != -1:
    jpg = bytes[a:b+8]
    bytes = bytes[b+2:]
    i = Image.open(BytesIO(jpg))
    buffered = BytesIO()
    i.save(buffered, format="JPEG")
    byte_im = buffered.getvalue()

This does not work, the PIL library is not able to identify any image.

I am able to get an image from the camera directly using:

def get_picture_direct(self, registration):
    url = f'http://{self.stream_url}/ISAPI/Streaming/channels/101/picture'
    auth = HTTPDigestAuth(self.stream_username, self.stream_password)
    response = requests.get(url, auth=auth)
    jpg = response.content
    i = Image.open(BytesIO(jpg))
    buffered = BytesIO()
    i.save(buffered, format="JPEG")
    byte_im = buffered.getvalue()
    url=self.upload_image_to_firestore(vehicle_registration=registration, bucket_name=f'{self.storage_bucket}', data=byte_im)
    return url

The event stream from the camera send ANPR event notifications and an image comes with each ANPR notification. If I use the direct method after the notification, the image is captured too late and misses the vehicle number plate.

How do convert tyhis byte data to an image correctly?

Any help is greatly appreciated.

Upvotes: 0

Views: 686

Answers (1)

Paul Peter
Paul Peter

Reputation: 41

this is the sample code for receive the anpr alarm from Hikvision anpr camera in listening mode and save the plate number and plate number image by python:

from http.server import BaseHTTPRequestHandler, HTTPServer
import xml.etree.ElementTree as ET
import re
import csv
import os

class ANPRRequestHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        # Read the body of the request
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        # Split the payload into parts
        parts = post_data.split(b'--boundary\r\n')[1:-1]  # Exclude the first and last split elements

        for part in parts:
            headers, body = part.split(b'\r\n\r\n', 1)
            body = body.strip()

            # Process XML part
            if b'anpr.xml' in headers:
                self.process_xml(body)

            # Process image part
            elif b'.jpg' in headers:
                filename = re.findall(rb'filename="([^"]+)"', headers)[0].decode()
                self.save_image(body, filename)

        # Send response
        self.send_response(200)
        self.end_headers()
        self.wfile.write("Received and processed ANPR event".encode('utf-8'))
    def append_to_csv(self, license_plate):
        # CSV file path
        csv_file = 'license_plates.csv'

        # Check if the CSV file exists
        file_exists = os.path.isfile(csv_file)

        # Open the file in append mode
        with open(csv_file, mode='a', newline='') as file:
            writer = csv.writer(file)

            # If the file does not exist, write the header
            if not file_exists:
                writer.writerow(['License Plate'])

            # Write the license plate data
            writer.writerow([license_plate])

    def process_xml(self, xml_data):
        # Parse XML data
        #print(xml_data)
        try:
            
            root = ET.fromstring(xml_data)
            
            # Find the license plate element
            # Update XPath to include namespace prefix
            namespaces = {'ns': 'http://www.hikvision.com/ver20/XMLSchema'}

                # Find the license plate element
            # Update XPath to include namespace prefix
            license_plate_element = root.find('.//ns:ANPR/ns:licensePlate', namespaces)
    
            if license_plate_element is not None and license_plate_element.text:
                license_plate = license_plate_element.text.strip()
                print(f"License Plate: {license_plate}")
                self.append_to_csv(license_plate)
            else:
                print("License plate not found in XML.")
        except ET.ParseError as e:
            print(f"Error parsing XML: {e}")


    def save_image(self, image_data, image_name):
        # Save the image
        with open(image_name, 'wb') as image_file:
            image_file.write(image_data)
        print(f"Saved image {image_name}")

def run(server_class=HTTPServer, handler_class=ANPRRequestHandler, port=8888):
    server_address = ('', port)
    httpd = server_class(server_address, handler_class)
    print(f"Starting httpd on port {port}...")
    httpd.serve_forever()

if __name__ == '__main__':
    run()

and this is the wireshark data:
POST / HTTP/1.1
Content-Type: multipart/form-data; boundary=boundary
Host: 10.199.13.29:8888
Connection: close
Content-Length: 525605

--boundary
Content-Disposition: form-data; name="anpr.xml"; filename="anpr.xml"
Content-Type: text/xml
Content-Length: 2744

<?xml version="1.0" encoding="UTF-8"?>
<EventNotificationAlert version="2.0" xmlns="http://www.hikvision.com/ver20/XMLSchema">
<ipAddress>10.199.27.128</ipAddress>
<portNo>8888</portNo>
<protocol>HTTP</protocol>
<macAddress>d4:e8:53:bc:4c:02</macAddress>
<channelID>1</channelID>
<dateTime>2024-01-19T11:17:19+08:00</dateTime>
<activePostCount>1</activePostCount>
<eventType>ANPR</eventType>
<eventState>active</eventState>
<eventDescription>ANPR</eventDescription>
<channelName>Camera 01</channelName>
<ANPR>
<country>3</country>
<licensePlate>PECT600</licensePlate>
<line>2</line>
<direction>reverse</direction>
<confidenceLevel>97</confidenceLevel>
<plateType>unknown</plateType>
<plateColor>unknown</plateColor>
<licenseBright>0</licenseBright>
<dangmark>no</dangmark>
<twoWheelVehicle>no</twoWheelVehicle>
<threeWheelVehicle>no</threeWheelVehicle>
<plateCharBelieve>99,99,99,99,99,99,99</plateCharBelieve>
<vehicleType>vehicle</vehicleType>
<detectDir>8</detectDir>
<detectType>0</detectType>
<alarmDataType>0</alarmDataType>
<vehicleInfo>
<index>8050</index>
<colorDepth>2</colorDepth>
<color>white</color>
<length>0</length>
<vehicleLogoRecog>1036</vehicleLogoRecog>
<vehileSubLogoRecog>0</vehileSubLogoRecog>
<vehileModel>0</vehileModel>
</vehicleInfo>
<pictureInfoList>
<pictureInfo>
<fileName>licensePlatePicture.jpg</fileName>
<type>licensePlatePicture</type>
<dataType>0</dataType>
<picRecogMode>1</picRecogMode>
<absTime>20240119111719654</absTime>
<pId>2024011911172148100XqTdZBLMwoCkk</pId>
</pictureInfo>
<pictureInfo>
<fileName>detectionPicture.jpg</fileName>
<type>detectionPicture</type>
<dataType>0</dataType>
<picRecogMode>1</picRecogMode>
<absTime>20240119111719654</absTime>
<plateRect>
<X>691</X>
<Y>397</Y>
<width>59</width>
<height>34</height>
</plateRect>
<pId>2024011911172148200AcB8K3CS9B2h5</pId>
</pictureInfo>
</pictureInfoList>
<originalLicensePlate>PECT600</originalLicensePlate>
<CRIndex>3</CRIndex>
<vehicleListName>otherList</vehicleListName>
<plateSize>0</plateSize>
</ANPR>
<UUID>2024011911172144900FlERIduqg06eKsYuvLX072NugIxaHw22mHG5D5KBmWqX</UUID>
<picNum>2</picNum>
<monitoringSiteID></monitoringSiteID>
<isDataRetransmission>false</isDataRetransmission>
<DeviceGPSInfo>
<longitudeType>E</longitudeType>
<latitudeType>N</latitudeType>
<Longitude>
<degree>111</degree>
<minute>0</minute>
<sec>0.000000</sec>
</Longitude>
<Latitude>
<degree>0</degree>
<minute>22</minute>
<sec>0.000000</sec>
</Latitude>
</DeviceGPSInfo>
<detectionBackgroundImageResolution>
<height>1568</height>
<width>2560</width>
</detectionBackgroundImageResolution>
<countryAbbreviation>NON</countryAbbreviation>
</EventNotificationAlert>

--boundary
Content-Disposition: form-data; name="licensePlatePicture.jpg"; filename="2024011911172148100XqTdZBLMwoCkk.jpg"
Content-Type: image/jpeg
Content-Length: 5940

......JFIF..

Upvotes: 0

Related Questions