Anselmo
Anselmo

Reputation: 75

Python requests close connection

I have a python request to capture the token number, after that I use this token to run another API url, afther that I want to close this current session, but I could not, like this:

url = 'https://10.10.20.21/tron/api/v1/tokens'
payload = {'grant_type': 'password', 'username': 'login', 'password': 'test@2021'}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.request("POST", url, headers=headers, data=payload, verify=False)

If I print the text and headers, is possible to see:

print(f'text: {response.text}')
print(f'request.headers: {response.request.headers}')

text: "Maximum number of active sessions reached"
request.headers: {'User-Agent': 'python-requests/2.18.4', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded'}

So, I would like to close this current session/connection, but even when I run response.close() the connection still alive:

response.close()

My desire is to close this session in order to run another one.

Upvotes: 0

Views: 2304

Answers (2)

M Ch
M Ch

Reputation: 21

I've found myself in the same situation with ASUS Aura Rest API where after I don't know the lack of port numbers? Rest API server crashes. Seems that despite that request.Session implementation I had to also add the header: {'Connection':'close'} to the session so now there are only pending connections visible with netstat -aon | findstr 27339 Currently, which fixes the issue, the code looks like this:

def aura_command(self, command):
    aura_session = requests.Session()
    try:
        aura_session.verify = False
        aura_session.headers.update({'Connection':'close'})
        self.aura_connection(command, aura_session)
    except requests.exceptions.ConnectionError:
        print('init_aura error')
    finally:
        print('closing session')
        aura_session.close()

def aura_connection(self, command, aura_session):
    if command == 'init':
        aura_session.post(self.aura_sdk_url, json=self.aura_init)
    elif command == 'stealth':
        aura_session.put(self.aura_sdk_url + '/AuraDevice', json=self.aura_stealth)
    elif command == 'intruder':
        aura_session.put(self.aura_sdk_url + '/AuraDevice', json=self.aura_intruder)
    elif command == 'windows':
        aura_session.put(self.aura_sdk_url + '/AuraDevice', json=self.aura_windows)
    elif command == 'deinit':
        aura_session.delete(self.aura_sdk_url, json=self.aura_init)```

Upvotes: 2

Sparrow1029
Sparrow1029

Reputation: 642

I would suggest looking into requests.Session objects. They might give you the functionality you're looking for.

from requests import Session

url = 'https://10.10.20.21/tron/api/v1/tokens'
payload = {'grant_type': 'password', 'username': 'login', 'password': 'test@2021'}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}

sess = Session()
sess.verify = False
sess.headers.update(headers)

response = sess.post(url, data=payload)

sess.close()

Upvotes: 0

Related Questions