Hannes
Hannes

Reputation: 316

Python3 asyncio: Process tasks from dict and store result in dict

I'm trying to learn asyncio. I have a list of sensors which should be pulled. Each sensor takes about 1 second to pull. So asyncio is the right task to do this. The sensors may change dynamically and so asyncio should change.

I have already following code, but now I don't know how to store the fetched values into a resulting dict. Since the consumer stores the value into a dict this very fast task could be also done in the producer, so there would be no need at all for a consumer, but I guess for asyncio paradgim there has to be a consumer.

Perhaps I'm also thinking too complicated and a much easier programming paradigm with less code could be used here? Please see also comments in the code for further detailed questions and suggestions.

#!/usr/bin/python3

import asyncio
import random

sensors={ #This list changes often
"sensor1" : "http://abc.example.org",
"sensor2" : "http://outside.example.org",
"temperature" : "http://xe.example.com",
"outdoor" : "http://anywhere.example.org"
}

results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    q = asyncio.Queue()
    queries = [asyncio.create_task(querySensor(sensorname,q)) for sensorname in sensors]
    process = [asyncio.create_task(storeValues(sensorname, q)) for sensorname in sensors] #Since value is only stored to dict, one consumer should be sufficient, or no consumer at all, since producer could also store variable into dict
    await asyncio.gather(*queries)
    await q.join()
    for c in process:
        c.cancel()

async def querySensor(sensorname: str, q: asyncio.Queue):
    res = str(random.randint(0,100))
    resString = "Result for " + sensorname + " is " + res
    await q.put(resString)

async def storeValues(sensorname: str, q: asyncio.Queue):
    while True:
        res = await q.get()
        print("Value: ", res)
        q.task_done()

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    for result in results: #Now results should be in results
        print(result, "measured:", results[result])

Solution

Thanks for both answers. Resulting code is now:

#!/usr/bin/python3

import asyncio
import random

sensors={ #This list changes often
"sensor1" : "http://abc.example.org",
"sensor2" : "http://outside.example.org",
"temperature" : "http://xe.example.com",
"outdoor" : "http://anywhere.example.org"
}

results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    queries = [asyncio.create_task(querySensor(sensorname)) for sensorname in sensors]
    await asyncio.gather(*queries)

async def querySensor(sensorname: str):
    res = str(random.randint(0,100))
    resString = "Result for " + sensorname + " is " + res
    results[sensorname] = resString

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    for result in results: #Now results should be in results
        print(result, "measured:", results[result])

There is some more complex code behind fetchting the acutual value. So this was only an example. I wanted to hide the additional layer of code.

Both answers are very worthful to me. To support new users reputation I accept Sreejith's answer to mark this question as solved.

Upvotes: 1

Views: 980

Answers (2)

Sreejith
Sreejith

Reputation: 84

It's not clear about what you're exactly looking to achieve. If you only want to update the dictionary, the code could be much simpler. Let me know if you were expecting anything else.

sensors={ #This list changes often
"sensor1" : "http://abc.example.org",
"sensor2" : "http://outside.example.org",
"temperature" : "http://xe.example.com",
"outdoor" : "http://anywhere.example.org"
}

results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    queries = [asyncio.create_task(querySensor(sensorname, results)) for sensorname in sensors]
    await asyncio.gather(*queries)

async def querySensor(sensorname: str, q: dict):
    res = str(random.randint(0, 100))
    resString = "Result for " + sensorname + " is " + res
    q[sensorname] = resString

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    print(results)

Upvotes: 1

jsbueno
jsbueno

Reputation: 110311

The main problem with this code is that you never actually store the sensor values to you result dict. If the code in stooreValues would includ the line results.setdefault(sensorname, []).append(res) you'd see your results already. (The dictionary .setdefault method is a utilityto create a value in the dicionary if it does not exist, and return that value, or the existing one: therefore we create an empty list on the first call for each sensor, and keep appending to it).

But, as you noted, there is no need to have a producer/consumer separated pattern in this code (whatever code will consume the "results" dict is actually the consumer)

...
from aiohttp_requests import requests


results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    queries = [asyncio.create_task(querySensor(sensorname)) for sensorname in sensors]
    await asyncio.gather(*queries)


async def querySensor(sensorname: str):
    res = str(random.randint(0,100))
    # important, when writting the actual call to read the sensor, to use
    # an async expresion and await for it. 
    response = await requests.get(sensors[sensorname], ...)
    text = await response.text()
    resString = f"Result for {sensorname} is {text}"
    results.setdefault(sensorname, []).append(resString)

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    for result in results: #Now results should be in results
        print(result, "measured:", results[result])

    

This example is using https://pypi.org/project/aiohttp-requests/

Upvotes: 1

Related Questions