Savir
Savir

Reputation: 18438

Celery: One subtask in group always timesout

I'm experiencing a rather annoying behavior with Celery's group feature.

I periodically need to check the IPs a bunch of hosts resolve to, just to make sure that said IPs haven't changed. In order to do that, I have a dictionary with the < hostname, IPs > I need to verify. For instance:

REQUIRED_HOSTS = {
    'google.com': {'173.194.46.64', '173.194.46.70', '173.194.46.71'},
    'stackoverflow.com': {'198.252.206.16'}
}

So the only thing to do is periodically iterate the REQUIRED_HOSTS.keys(), resolve the name and see if any of the IPs it resolves to is different to what I have recorded. (Not much of a brainer here)

In order to improve the efficiency a bit, each name is resolved in parallel. I created a subtask for that (it resolves using dnspython):

@my_tasks.task
def resolve_hostname(hostname, resolver=None):
    """ This subtask resolves the 'hostname' to its IP addresses. It's
    intended to be used in the 'compare_required_ips' function to resolve
    names in parallel """
    if resolver is None:
        resolver = dns.resolver.Resolver()
        resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers

    try:
        return (hostname,
                {hst.address for hst in resolver.query(hostname)})
    except Exception, e:
        logger.exception("Got %s when trying to resolve hostname=%s"
                         % (type(e), hostname))
        raise e

Now, the method that queries all the hostnames and spawns subtasks is the following:

@my_taks.task
def compare_required_ips():
    """ This method verifies that the IPs haven't changed. """
    retval = []
    resolver = dns.resolver.Resolver()
    resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
    retrieved_hosts = dict.fromkeys(required_hosts.REQUIRED_HOSTS.keys())
    logger.info("Going to compare IPs for %s hostnames=%s"
                % (len(required_hosts.REQUIRED_HOSTS.keys()),
                   required_hosts.REQUIRED_HOSTS.keys()))
    ip_subtasks = group(
        [resolve_hostname.s(hostname, resolver=resolver)
         for hostname in required_hosts.REQUIRED_HOSTS.keys()]
    )()
    for hostname, ips in ip_subtasks.get(timeout=90):
        retrieved_hosts[hostname] = ips

    for hostname in required_hosts.REQUIRED_HOSTS:
        if (required_hosts.REQUIRED_HOSTS[hostname]
                != retrieved_hosts[hostname]):
            retval.append(hostname)
            logger.error(
                "IP resolution mismatch. hostname=%s resolve_target=%s"
                ", resolve_actual=%s (mismatch=%s)"
                % (hostname,
                   required_hosts.REQUIRED_HOSTS[hostname],
                   retrieved_hosts[hostname],
                   (required_hosts.REQUIRED_HOSTS[hostname]
                    ^ retrieved_hosts[hostname]))
            )
    return retval

Again, fairly easy... Just walk the REQUIRED_HOSTS keys (a.k.a. hostnames), spawn a subtask to resolve each of them and then collect the results with a 90 seconds timeout (which occurs in the line for hostname, ips in ip_subtasks.get(timeout=90) )

Now, the nuisance is that all the subtasks except one are successfully finished within that 90 seconds window. Then the parent task (compare_required_ips) fails because of the timeout=90 and when this happens the subtask is successfully finished (immediately after the parent has failed). I have tried increasing and decreasing the timeout, and the subtask always takes whatever timeout I have specified in the group creation, making the main task report a failure.

I have also run the name resolution manually (without making it celery tasks, but using regular threading) and it resolves in milliseconds. Every time, with every test I try to make. I don't think it's an issue with the dns.resolver.Resolver() class. Everything seems to point that this class resolves blazingly fast, but the subtask, or the group, or... someone in Celery doesn't know about it (one of the subtasks only, though)

I am using celery==3.1.9, celery-with-redis==3.0 and flower==0.6.0 to monitor.

Any help, hint or thing to test will be very appreciated.

Upvotes: 1

Views: 613

Answers (1)

Chillar Anand
Chillar Anand

Reputation: 29594

One problem might be a deadlock due to launching of synchronous sub tasks. compare_required_ips is a celery task. Inside this task you are waiting for a group of resolve_hostname tasks to complete which is really inefficient.

So you have to change this

ip_subtasks = group(
        [resolve_hostname.s(hostname, resolver=resolver)
         for hostname in required_hosts.REQUIRED_HOSTS.keys()]
    )()

to

ip_subtasks = group(
        [resolve_hostname.s(hostname, resolver=resolver)
         for hostname in required_hosts.REQUIRED_HOSTS.keys()]
    ).delay()

which launches all your tasks asynchronously and there by avoiding deadlock.

and

you shouldn't do a ip_subtasks.get() inside compate_required_ips task(even if ip_subtask takes only a nano second). You have to write a new function for that or use celery task_success signal.

Upvotes: 2

Related Questions