Reputation: 807
I'm trying to leverage Python's new asyncio library to send asynchronous HTTP requests. I want to wait a few miliseconds (the timeout
variable) before sending each request - but of course - send them all asynchronously and, not wait for a response after each request sent.
I'm doing something like the following:
@asyncio.coroutine
def handle_line(self, line, destination):
print("Inside! line {} destination {}".format(line, destination))
response = yield from aiohttp.request('POST', destination, data=line,
headers=tester.headers)
print(response.status)
return (yield from response.read())
@asyncio.coroutine
def send_data(self, filename, timeout):
destination='foo'
logging.log(logging.DEBUG, 'sending_data')
with open(filename) as log_file:
for line in log_file:
try:
json_event = json.loads(line)
except ValueError as e:
print("Error parsing json event")
time.sleep(timeout)
yield from asyncio.async(self.handle_line(json.dumps(json_event), destination))
loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))
The output that I am getting (by printing the 200 responses) looks like this code is running synchronously. What am I doing wrong?
Upvotes: 3
Views: 2367
Reputation: 94961
There are a couple of issues here:
You should use asyncio.sleep
, not time.sleep
, because the latter will block the event loop.
You shouldn't be using yield from
after the asyncio.async(self.handle_line(...))
call, because that will make the script block until the self.handle_line
coroutine is complete, which means you don't end up doing anything concurrently; you process each line, wait for the processing to complete, then move on to the next line. Instead, you should run all the asyncio.async
calls without waiting, save the Task
objects returned to a list, and then use asyncio.wait
to wait for them all to complete once you've started them all.
Putting that all together:
@asyncio.coroutine
def handle_line(self, line, destination):
print("Inside! line {} destination {}".format(line, destination))
response = yield from aiohttp.request('POST', destination, data=line,
headers=tester.headers)
print(response.status)
return (yield from response.read())
@asyncio.coroutine
def send_data(self, filename, timeout):
destination='foo'
logging.log(logging.DEBUG, 'sending_data')
tasks = []
with open(filename) as log_file:
for line in log_file:
try:
json_event = json.loads(line)
except ValueError as e:
print("Error parsing json event")
yield from asyncio.sleep(timeout)
tasks.append(asyncio.async(
self.handle_line(json.dumps(json_event), destination))
yield from asyncio.wait(tasks)
asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))
Upvotes: 6