alancalvitti
alancalvitti

Reputation: 476

Map a function by key path in nested dict including slices, wildcards and ragged hierarchies

This question is an extension based on here and here.

What is a good approach to mapping a function to a specified key path in nested dicts, including these path specification:

  1. List of keys at a given path position
  2. Key slices (assuming sorting)
  3. Wildcards (ie all keys at a path position)
  4. Handling ragged hierarchies by ignoring keys that don't appear at a given level

If it is makes is simpler, can assume that only dicts are nested, no lists of dicts, since the former can be obtained with dict(enumerate(...)).

However, the hierarchy can be ragged, eg:

data = {0: {'a': 1, 'b': 2},
 1: {'a': 10, 'c': 13},
 2: {'a': 20, 'b': {'d': 100, 'e': 101}, 'c': 23},
 3: {'a': 30, 'b': 31, 'c': {'d': 300}}}

Would like to be able to specify key path like this:

map_at(f, ['*',['b','c'],'d'])

To return:

{0: {'a': 1, 'b': 2},
     1: {'a': 10, 'c': 13},
     2: {'a': 20, 'b': {'d': f(100), 'e': 101}, 'c': 23},
     3: {'a': 30, 'b': 31, 'c': {'d': f(300)}}}

Here f is mapped to key paths [2,b,d] and [3,c,d].

Slicing would be specified as, eg [0:3,b] for instance.

I think the path spec is unambiguous, though could be generalized to, for example, match key path prefix (in which case, f would also be mapped at [0,b]` and other paths).

Can this be implemented via comprehension and recursion or does it require heavy lifting to catch KeyError etc?

Please do not suggest Pandas as an alternative.

Upvotes: 3

Views: 565

Answers (3)

Mulan
Mulan

Reputation: 135357

I think you might appreciate this refreshing generator implementation -

def select(sel = [], d = {}, res = []):

  # (base case: no selector)
  if not sel:                   
    yield (res, d)

  # (inductive: a selector) non-dict
  elif not isinstance(d, dict): 
    return

  # (inductive: a selector, a dict) wildcard selector
  elif sel[0] == '*':           
    for (k, v) in d.items():
      yield from select \
        ( sel[1:]
        , v
        , [*res, k]
        )

  # (inductive: a selector, a dict) list selector
  elif isinstance(sel[0], list):
    for s in sel[0]:
      yield from select \
        ( [s, *sel[1:]]
        , d
        , res
        )

  # (inductive: a selector, a dict) single selector
  elif sel[0] in d:             
    yield from select \
      ( sel[1:]
      , d[sel[0]]
      , [*res, sel[0]]
      )

  # (inductive: single selector not in dict) no match
  else:                         
    return

It works like this -

data = \
  { 0: { 'a': 1, 'b': 2 }
  , 1: { 'a': 10, 'c': 13 }
  , 2: { 'a': 20, 'b': { 'd': 100, 'e': 101 }, 'c': 23 }
  , 3: { 'a': 30, 'b': 31, 'c': { 'd': 300 } }
  }

for (path, v) in select(['*',['b','c'],'d'], data):
  print(path, v)

# [2, 'b', 'd'] 100
# [3, 'c', 'd'] 300

Because select returns an iterable, you can use conventional map function on it -

s = select(['*',['b','c'],'d'], data)

work = lambda r: f"path: {r[0]}, value: {r[1]}"

for x in map(work, s):
  print(x)

# path: [2, 'b', 'd'], value: 100
# path: [3, 'c', 'd'], value: 300

Upvotes: 0

jferard
jferard

Reputation: 8180

I'm not a big fan of pseudo-code, but in this kind of situation, you need to write down an algorithm. Here's my understanding of your requirements:

map_at(func, path_pattern, data):

  1. if path_pattern is not empty
    • if data is terminal, it's a failure : we did not match the full path_pattern ̀so there is no reason to apply the function. Just return data.
    • else, we have to explore every path in data. We consume the head of path_pattern if possible. That is return a dict data key -> map_at(func, new_path, data value) where new_path is the tail of the path_pattern if the key matches the head, else the `path_pattern itself.
  2. else, it's a success, because all the path_pattern was consumed:
    • if data is terminal, return func(data)
    • else, find the leaves and apply func: return return a dict data key -> map_at(func, [], data value)

Notes:

  • I assume that the pattern *-b-d matches the path 0-a-b-c-d-e;
  • it's an eager algorithm: the head of the path is always consumed when possible;
  • if the path is fully consumed, every terminal should be mapped;
  • it's a simple DFS, thus I guess it's possible to write an iterative version with a stack.

Here's the code:

def map_at(func, path_pattern, data):
    def matches(pattern, value):
        try:
            return pattern == '*' or value == pattern or value in pattern
        except TypeError: # EDIT: avoid "break" in the dict comprehension if pattern is not a list. 
            return False

    if path_pattern:
        head, *tail = path_pattern
        try: # try to consume head for each key of data
            return {k: map_at(func, tail if matches(head, k) else path_pattern, v) for k,v in data.items()}
        except AttributeError: # fail: terminal data but path_pattern was not consumed
            return data
    else: # success: path_pattern is empty.
        try: # not a leaf: map every leaf of every path
            return {k: map_at(func, [], v) for k,v in data.items()}
        except AttributeError: # a leaf: map it
            return func(data)

Note that tail if matches(head, k) else path_pattern means: consume head if possible. To use a range in the pattern, just use range(...).

As you can see, you never escape from case 2. : if the path_pattern is empty, you just have to map all leaves whatever happens. This is clearer in this version:

def map_all_leaves(func, data):
    """Apply func to all leaves"""
    try:
        return {k: map_all_leaves(func, v) for k,v in data.items()}
    except AttributeError:
        return func(data)

def map_at(func, path_pattern, data):
    def matches(pattern, value):
        try:
            return pattern == '*' or value == pattern or value in pattern
        except TypeError: # EDIT: avoid "break" in the dict comprehension if pattern is not a list. 
            return False

    if path_pattern:
        head, *tail = path_pattern
        try: # try to consume head for each key of data
            return {k: map_at(func, tail if matches(head, k) else  path_pattern, v) for k,v in data.items()}
        except AttributeError: # fail: terminal data but path_pattern is not consumed
            return data
    else:
        map_all_leaves(func, data)

EDIT

If you want to handle lists, you can try this:

def map_at(func, path_pattern, data):
    def matches(pattern, value):
        try:
            return pattern == '*' or value == pattern or value in pattern
        except TypeError: # EDIT: avoid "break" in the dict comprehension if pattern is not a list. 
            return False

    def get_items(data):
        try:
            return data.items()
        except AttributeError:
            try:
                return enumerate(data)
            except TypeError:
                raise

    if path_pattern:
        head, *tail = path_pattern
        try: # try to consume head for each key of data
            return {k: map_at(func, tail if matches(head, k) else path_pattern, v) for k,v in get_items(data)}
        except TypeError: # fail: terminal data but path_pattern was not consumed
            return data
    else: # success: path_pattern is empty.
        try: # not a leaf: map every leaf of every path
            return {k: map_at(func, [], v) for k,v in get_items(data)}
        except TypeError: # a leaf: map it
            return func(data)

The idea is simple: enumerate is the equivalent for a list of dict.items:

>>> list(enumerate(['a', 'b']))
[(0, 'a'), (1, 'b')]
>>> list({0:'a', 1:'b'}.items())
[(0, 'a'), (1, 'b')]

Hence, get_items is just a wrapper to return the dict items, the list items (index, value) or raise an error.

The flaw is that lists are converted to dicts in the process:

>>> data2 = [{'a': 1, 'b': 2}, {'a': 10, 'c': 13}, {'a': 20, 'b': {'d': 100, 'e': 101}, 'c': 23}, {'a': 30, 'b': 31, 'c': {'d': 300}}]
>>> map_at(type,['*',['b','c'],'d'],data2)
{0: {'a': 1, 'b': 2}, 1: {'a': 10, 'c': 13}, 2: {'a': 20, 'b': {'d': <class 'int'>, 'e': 101}, 'c': 23}, 3: {'a': 30, 'b': 31, 'c': {'d': <class 'int'>}}}

EDIT

Since you are looking for something like Xpath for JSON, you could try https://pypi.org/project/jsonpath/ or https://pypi.org/project/jsonpath-rw/. (I did not test those libs).

Upvotes: 1

Davis Herring
Davis Herring

Reputation: 40013

This is not very simple and less efficient, but it should work:

def map_at(f,kp,d): return map_at0(f,kp,d,0)
def slice_contains(s,i):  # no negative-index support
  a=s.start or 0
  return i>=a and (s.end is None or i<s.end) and\
    not (i-a)%(s.step or 1)
def map_at0(f,kp,d,i):
  if i==len(kp): return f(d)
  if not isinstance(d,dict): return d  # no such path here
  ret={}
  p=kp[i]
  if isinstance(p,str) and p!='*': p=p,
  for j,(k,v) in enumerate(sorted(d.items())):
    if p=='*' or (slice_contains(p,j) if isinstance(p,slice) else k in p):
      v=map_at0(f,kp,v,i+1)
    ret[k]=v
  return ret

Note that this copies every dictionary it expands (because it matches the key path, even if no further keys match and f is never applied) but returns unmatched subdictionaries by reference. Note also that '*' can be “quoted” by putting it in a list.

Upvotes: 0

Related Questions