Reputation: 18438
I'm experiencing a rather annoying behavior with Celery's group feature.
I periodically need to check the IPs a bunch of hosts resolve to, just to make sure that said IPs haven't changed. In order to do that, I have a dictionary with the < hostname, IPs >
I need to verify. For instance:
REQUIRED_HOSTS = {
'google.com': {'173.194.46.64', '173.194.46.70', '173.194.46.71'},
'stackoverflow.com': {'198.252.206.16'}
}
So the only thing to do is periodically iterate the REQUIRED_HOSTS.keys()
, resolve the name and see if any of the IPs it resolves to is different to what I have recorded. (Not much of a brainer here)
In order to improve the efficiency a bit, each name is resolved in parallel. I created a subtask for that (it resolves using dnspython):
@my_tasks.task
def resolve_hostname(hostname, resolver=None):
""" This subtask resolves the 'hostname' to its IP addresses. It's
intended to be used in the 'compare_required_ips' function to resolve
names in parallel """
if resolver is None:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
try:
return (hostname,
{hst.address for hst in resolver.query(hostname)})
except Exception, e:
logger.exception("Got %s when trying to resolve hostname=%s"
% (type(e), hostname))
raise e
Now, the method that queries all the hostnames and spawns subtasks is the following:
@my_taks.task
def compare_required_ips():
""" This method verifies that the IPs haven't changed. """
retval = []
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
retrieved_hosts = dict.fromkeys(required_hosts.REQUIRED_HOSTS.keys())
logger.info("Going to compare IPs for %s hostnames=%s"
% (len(required_hosts.REQUIRED_HOSTS.keys()),
required_hosts.REQUIRED_HOSTS.keys()))
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
)()
for hostname, ips in ip_subtasks.get(timeout=90):
retrieved_hosts[hostname] = ips
for hostname in required_hosts.REQUIRED_HOSTS:
if (required_hosts.REQUIRED_HOSTS[hostname]
!= retrieved_hosts[hostname]):
retval.append(hostname)
logger.error(
"IP resolution mismatch. hostname=%s resolve_target=%s"
", resolve_actual=%s (mismatch=%s)"
% (hostname,
required_hosts.REQUIRED_HOSTS[hostname],
retrieved_hosts[hostname],
(required_hosts.REQUIRED_HOSTS[hostname]
^ retrieved_hosts[hostname]))
)
return retval
Again, fairly easy... Just walk the REQUIRED_HOSTS
keys (a.k.a. hostnames), spawn a subtask to resolve each of them and then collect the results with a 90 seconds timeout (which occurs in the line for hostname, ips in ip_subtasks.get(timeout=90)
)
Now, the nuisance is that all the subtasks except one are successfully finished within that 90 seconds window. Then the parent task (compare_required_ips
) fails because of the timeout=90
and when this happens the subtask is successfully finished (immediately after the parent has failed). I have tried increasing and decreasing the timeout, and the subtask always takes whatever timeout I have specified in the group
creation, making the main task report a failure.
I have also run the name resolution manually (without making it celery tasks, but using regular threading) and it resolves in milliseconds. Every time, with every test I try to make. I don't think it's an issue with the dns.resolver.Resolver()
class. Everything seems to point that this class resolves blazingly fast, but the subtask, or the group, or... someone in Celery doesn't know about it (one of the subtasks only, though)
I am using celery==3.1.9
, celery-with-redis==3.0
and flower==0.6.0
to monitor.
Any help, hint or thing to test will be very appreciated.
Upvotes: 1
Views: 613
Reputation: 29594
One problem might be a deadlock due to launching of synchronous sub tasks. compare_required_ips
is a celery task. Inside this task you are waiting for a group
of resolve_hostname
tasks to complete which is really inefficient.
So you have to change this
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
)()
to
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
).delay()
which launches all your tasks asynchronously and there by avoiding deadlock.
and
you shouldn't do a ip_subtasks.get()
inside compate_required_ips
task(even if ip_subtask
takes only a nano second). You have to write a new function for that or use celery task_success signal.
Upvotes: 2