Henk
Henk

Reputation: 145

Custom Dask traversable object

I used a custom dictionary like object to easily store the results of a Dask-graph, but using the resulting object to compute the Dask graph, doesn't compute its children.

Is it possible to change the custom object, in such a way that it Dask is able to traverse and compute its children?

An example:

import dask
import dask.delayed as delayed
from collections import defaultdict
print('Dask version', dask.__version__)

Dictionary1 = {}
Dictionary1['a'] = delayed(sum)([2,3])

print('Native Dict', dask.compute(Dictionary1) )

Dictionary2 = defaultdict(defaultdict)
Dictionary2['a'] = delayed(sum)([2,3])

print('Custom Dict', dask.compute(Dictionary2) )

The resulting output:

Dask version 0.19.2
Native Dict ({'a': 5},)
Custom Dict (defaultdict(<class 'collections.defaultdict'>, {'a': Delayed('sum-212db0df-1c14-4314-9a56-2eb87ef58abe')}),)

EDIT: Solution based on MRocklin's answer

import dask
import dask.delayed as delayed
from collections import defaultdict
from dask.base import DaskMethodsMixin

class DefaultDictDict(defaultdict, DaskMethodsMixin):
  def __init__(self, *args ): ## Define an infinite nested dict.
    return defaultdict.__init__(self, DefaultDictDict, *args)

  def __dask_graph__(self):
    ## NOTE: Errors in this functions are silent, and disable collections interface
    ## The dask attributes are already a graph with key to itself.
    a = dict()
    self._keys = []
    for x in self.values():
      if not hasattr(x,'dask'): ## Use dummy delayed to convert objects to graphs.
        x = delayed(lambda data:data)(x)
      a.update(x.dask) 
      self._keys.append(x.key)
    return a

  def __dask_keys__(self):
    return self._keys

  __dask_scheduler__ = staticmethod(dask.threaded.get)
  def __dask_postcompute__(self):
      def Reconstruct(results):
        return DefaultDictDict(zip(self.keys(), results))
      return Reconstruct, ()  

Dictionary3 = DefaultDictDict()
Dictionary3['b']['c'] = delayed(sum)([2,3])
print('Collections Dict', dask.compute(Dictionary3)[0] )

Result:

Collections Dict defaultdict(<class '__main__.DefaultDictDict'>, {'b': defaultdict(<class '__main__.DefaultDictDict'>, {'c': 5})})

(It still shows defaultdict, because __repr__ wasn't overridden properly)

Upvotes: 2

Views: 165

Answers (1)

MRocklin
MRocklin

Reputation: 57251

Currently Dask only traverses through standard core Python collections (dicts, lists, ...). This behavior is not extensible as of 2018-10-07.

However, you can make your own Dask collections, which essentially just pass on the graph and keys of their members. See http://docs.dask.org/en/latest/custom-collections.html

Upvotes: 2

Related Questions