Reputation: 21934
I understood the basic idea of an Event Loop. There is a central loop listening to a set of file descriptors, and if it is ready for read or write, the corresponding callback is executed.
We can use co-routines instead of callbacks as they can be paused and resumed. However, this means that there should be some communication protocol between the co-routine and the event loop to make things work right?
I wrote a simple Echo Server with co-routines, which would yield the fd along with the interested action like this yield fd, 'read'
, yield fd, 'write'
etc and then the Event Loop would register the select
accordingly. The callback would be to just resume the co-routine. It works fine and I have added the code below.
Now I am just trying to understand how await
actually works. It doesn't seem to yield fds and the corresponding action like my example code, instead, it gives you a Future
object. So what exactly happens under-the-hood? How is it communicating with the Event Loop?
My guess is that await async.sleep(1)
would be the executed like this:
async.sleep(1)
.Future
Object.timerfd_create
with a callback to complete the Future
.await
will yield the Future
Object to the Event Loop which is executing it.Future
object's callback function to just resume the coroutine.I mean I can make use of Future
like this. But is this what is actually happening? Can someone help me in understand this a little better?
PS: timerfd_create
was just taken as an example because I couldn't understand how timers can be implemented in an Event Loop. For this question's purpose, network fds would be fin too. If someone can help me on how the timer is implemented that would nice too!
Here is my implementation of a simple Echo Server using co-routines:
"""
Tasks are just generators or coroutines
"""
import socket
import selectors
select = selectors.DefaultSelector()
tasks_to_complete = []
def create_server(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
hostname = socket.gethostname()
s.bind((hostname, port))
s.listen(5)
print("Starting server on hostname at port %s %s" % (hostname, port))
return s
def handle_clients(s):
while True:
print("yielding for read on server %s" % id(s))
yield (s, 'read')
c, a = s.accept()
t = handle_client(c)
print("appending a client handler")
tasks_to_complete.append(t)
def handle_client(c):
while True:
print("yielding for read client %s" % id(c))
yield (c, 'read')
data = c.recv(1024)
if len(data) == 0:
return "Connection Closed"
print("yielding for write on client %s" % id(c))
yield (c, 'write')
c.send(bytes(data))
def run(tasks_to_complete):
while True:
while tasks_to_complete:
t = tasks_to_complete.pop(0)
try:
fd, event = t.send(None)
if event == 'read':
event = selectors.EVENT_READ
elif event == 'write':
event = selectors.EVENT_WRITE
def context_callback(fd, t):
def callback():
select.unregister(fd)
tasks_to_complete.append(t)
return callback
select.register(fd, event, context_callback(fd, t))
except StopIteration as e:
print(e.value)
events = select.select()
for key, mask in events:
callback = key.data
callback()
tasks_to_complete.append(handle_clients(create_server(9000)))
run(tasks_to_complete)
Upvotes: 3
Views: 960
Reputation: 155670
I wrote a simple Echo Server with co-routines, which would yield the fd along with the interested action like this
yield fd, 'read'
,yield fd, 'write'
etc and then the Event Loop would register theselect
accordingly.
This is similar to how Dave Beazley's curio works. To learn more about the concept, see this lecture where he builds an event loop from the very basics. (He uses the pre-3.5 yield from
syntax, but it works exactly the same as await
.)
As you discovered, asyncio works a bit differently, although the principle is still similar.
Now I am just trying to understand how
await
actually works. It doesn't seem to yield fds, and string corresponding to the action like the above example, instead it gives you aFuture
object. So what exactly happens under-the-hood? How is it communicating with the Event Loop?
The short version is that blocking coroutines use a global variable (through asyncio.get_event_loop()
) to fetch the event loop. The event loop has methods that schedule callbacks to be invoked when an interesting event occurs. asyncio.sleep
calls loop.call_later
to ensure its resumption when the timeout elapses.
The Future
that is yielded is just a convenient way for the event loop to be notified of the result once it's ready, so that it can correctly resume the Task
(coroutine driven by the event loop) that was awaiting the blocking operation, while also handling exceptions and cancellation. See Task.__step
for the gory details.
timerfd_create
was just taken as an example because I couldn't understand how timers can be implemented in an Event Loop.
Timers are implemented so that the event loop keeps track of both file descriptors and timeouts, and issues a select
that terminates when the earliest timeout elapses. Dave's lecture linked above demonstrates the concept succinctly.
Upvotes: 2