Sugata Bagchi
Sugata Bagchi

Reputation: 143

Python: xmlsec.VerificationError: Signature is invalid Error, even after receiving HTTP 200 and successful response

I have the following code -

import os

import requests
import urllib3
from zeep import Client, Settings
from zeep.transports import Transport
from requests import Session
from requests_pkcs12 import Pkcs12Adapter
from zeep.wsse.signature import BinarySignature
import random
import logging.config
from pathlib import Path
from tempfile import NamedTemporaryFile
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates

# USE THE MOST VERBOSE LOGGING LEVEL
logging.config.dictConfig({
'version': 1,
'formatters': {
'verbose': {
'format': '%(name)s: %(message)s'
}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
},
'loggers': {
'zeep.transports': {
'level': 'DEBUG',
'propagate': True,
'handlers': ['console'],
},
}
})


# Source: https://gist.github.com/erikbern/756b1d8df2d1487497d29b90e81f8068
@contextlib.contextmanager
def pfx_to_pem(pfx_path, pfx_password):
''' Decrypts the .pfx file to be used with requests. '''
pfx = Path(pfx_path).read_bytes()
private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)

with NamedTemporaryFile(suffix='.pem', delete=False) as t_pem:
  with open(t_pem.name, 'wb') as pem_file:
    pem_file.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
    pem_file.write(main_cert.public_bytes(Encoding.PEM))
    for ca in add_certs:
      pem_file.write(ca.public_bytes(Encoding.PEM))
  yield t_pem.name


def generate_nonce(length=15):
"""Generate pseudorandom number."""
return ''.join([str(random.randint(0, 9)) for i in range(length)])


# CERTIFICATES PATHS
api_p12_key = os.path.join('C:\\ALL\\ERCOT\\API Outplan OSI TCC MOTE.p12')
api_certificate = os.path.join('C:\\ALL\\ERCOT\\OSITCC.crt')
api_pfx_key = os.path.join('C:\\ALL\\ERCOT\\API Outplan OSI TCC MOTE.pfx')

# SETUP
wsdl_file = os.path.join('C:\\ALL\\ERCOT\\Nodal.wsdl')

#wsdl_file = "https://testmisapi.ercot.com/2007-08/Nodal/eEDS/EWS/?WSDL"
api_base_url = "https://testmisapi.ercot.com"
session = requests.Session()
session.mount(api_base_url,
Pkcs12Adapter(pkcs12_filename=api_p12_key, pkcs12_password='AEP'))
session.verify = False

transport = Transport(session=session)
settings = Settings(forbid_entities=False)

# CREATE CLIENT
print("Creating client.")
with pfx_to_pem(pfx_path=api_pfx_key, pfx_password='AEP') as pem_fle:
client = Client(wsdl_file, settings=settings, transport=transport,
wsse=BinarySignature(pem_fle, api_certificate))

print("Making request.")
request_data = {
"Header": {
"Verb": "get",
"Noun": "SystemStatus",
"ReplayDetection": {
"Nonce": generate_nonce(),
"Created": "2022-09-15T15:39:00-06:00"},
"Revision": "1",
"Source": "source",
"UserID": "user",
},
}
print("Call URL")
print(client.service.MarketInfo(**request_data))

When I execute this code, I am getting a successful HTTP 200 response from the target server. But after that the log prints out several errors -

  File "C:\ALL\Python 3.9\PythonDev\lib\site-packages\zeep\wsse\signature.py", line 330, in _verify_envelope_with_key
    ctx.verify(signature)
xmlsec.VerificationError: Signature is invalid.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\ALL\Python 3.10\PythonDev\ERCOT_API.py", line 102, in <module>
    print(client.service.MarketInfo(**request_data))
  File "C:\ALL\Python 3.9\PythonDev\lib\site-packages\zeep\proxy.py", line 46, in __call__
    return self._proxy._binding.send(
  File "C:\ALL\Python 3.9\PythonDev\lib\site-packages\zeep\wsdl\bindings\soap.py", line 135, in send
    return self.process_reply(client, operation_obj, response)
  File "C:\ALL\Python 3.9\PythonDev\lib\site-packages\zeep\wsdl\bindings\soap.py", line 219, in process_reply
    client.wsse.verify(doc)
  File "C:\ALL\Python 3.9\PythonDev\lib\site-packages\zeep\wsse\signature.py", line 73, in verify
    _verify_envelope_with_key(envelope, key)
  File "C:\ALL\Python 3.9\PythonDev\lib\site-packages\zeep\wsse\signature.py", line 334, in _verify_envelope_with_key
    raise SignatureVerificationFailed()
zeep.exceptions.SignatureVerificationFailed

Process finished with exit code 1

Is the code trying to validate the signature for the resoponse received from the target soap server? Could anyone please help why I am getting this error and how to suppress it?

Thanks Sugata

Upvotes: 1

Views: 965

Answers (1)

Sugata Bagchi
Sugata Bagchi

Reputation: 143

I have implemented this -https://github.com/mvantellingen/python-zeep/issues/996 and modified the code as below -

from zeep.wsse.signature import BinarySignature
from zeep.wsse import utils
from datetime import datetime, timedelta
import contextlib
import os
import requests
from requests_pkcs12 import Pkcs12Adapter
from zeep.transports import Transport
from zeep import Client, Settings
from pathlib import Path
from tempfile import NamedTemporaryFile
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
import random
import logging.config


# USE THE MOST VERBOSE LOGGING LEVEL
logging.config.dictConfig({
 'version': 1,
 'formatters': {
 'verbose': {
 'format': '%(name)s: %(message)s'
 }
 },
 'handlers': {
 'console': {
 'level': 'DEBUG',
 'class': 'logging.StreamHandler',
 'formatter': 'verbose',
 },
 },
 'loggers': {
 'zeep.transports': {
 'level': 'DEBUG',
 'propagate': True,
 'handlers': ['console'],
 },
 }
})

class BinarySignatureTimestamp(BinarySignature):
    def apply(self, envelope, headers):
        security = utils.get_security_header(envelope)

        created = datetime.utcnow()
        expired = created + timedelta(seconds=1 * 60)

        timestamp = utils.WSU('Timestamp')
        timestamp.append(utils.WSU('Created', created.replace(microsecond=0).isoformat()+'Z'))
        timestamp.append(utils.WSU('Expires', expired.replace(microsecond=0).isoformat()+'Z'))

        security.append(timestamp)

        super().apply(envelope, headers)
        return envelope, headers

    def verify(self, envelope):
        return envelope

@contextlib.contextmanager
def pfx_to_pem(pfx_path, pfx_password):
 ''' Decrypts the .pfx file to be used with requests. '''
 pfx = Path(pfx_path).read_bytes()
 private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)

 with NamedTemporaryFile(suffix='.pem', delete=False) as t_pem:
   with open(t_pem.name, 'wb') as pem_file:
     pem_file.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
     pem_file.write(main_cert.public_bytes(Encoding.PEM))
     for ca in add_certs:
       pem_file.write(ca.public_bytes(Encoding.PEM))
   yield t_pem.name


def generate_nonce(length=15):
 """Generate pseudorandom number."""
 return ''.join([str(random.randint(0, 9)) for i in range(length)])


# CERTIFICATES PATHS

api_p12_key = os.path.join('C:\\ALL\\ERCOT\\API Outplan OSI TCC MOTE.p12')
api_certificate = os.path.join('C:\\ALL\\ERCOT\\OSITCC.crt')
api_pfx_key = os.path.join('C:\\ALL\\ERCOT\\API Outplan OSI TCC MOTE.pfx')

# SETUP
wsdl_file = os.path.join('C:\\ALL\\ERCOT\\Nodal.wsdl')

api_base_url = "https://testmisapi.ercot.com"
session = requests.Session()
session.mount(api_base_url,
 Pkcs12Adapter(pkcs12_filename=api_p12_key, pkcs12_password='AEP'))
session.verify = None

transport = Transport(session=session)
settings = Settings(forbid_entities=False)

# CREATE CLIENT
print("Creating client.")
with pfx_to_pem(pfx_path=api_pfx_key, pfx_password='AEP') as pem_fle:
   client = Client(wsdl=wsdl_file, settings=settings, transport=transport,
   wsse=BinarySignatureTimestamp(pem_fle, api_certificate, "AEP"))

print("Making request.")
request_data = {
 "Header": {
 "Verb": "get",
 "Noun": "SystemStatus",
 "ReplayDetection": {
 "Nonce": generate_nonce(),
 "Created": "2022-09-15T19:56:00-06:00"},
 "Revision": "1",
 "Source": "source",
 "UserID": "user",
 },
 }

print("Call URL")
print(client.service.MarketInfo(**request_data))

Upvotes: 3

Related Questions