Reputation: 12343
I am trying to achieve what I think should be very simple but I am unable to think through it and hence needed some help.
I have an RDD where I have :
key1, (val_id1,val11), (val_id2,val12),(val_id3,val13) ...
key2, (val_id5,val25), (val_id7,val27), (val_id2,val22) ...
...
I would like to for each key, all the combination of value pairs such that the second element in the value tuple is multiplied before output. for example:
The output produced by values for key1 above would be:
((val_id1,val_id2), val11 * val12) , ((val_id1,val_id3),val11 * val13) , ((val_id2,val_id3),val12 * val13) ...
I know there's an itertools.combinations module in the itertools package but I don't know how to quite incorporate it in the context of an RDD. Any help would be much appreciated.
Upvotes: 1
Views: 2579
Reputation: 901
I have made this algorithm, but with higher numbers looks like that doesn't work or its very slow, it will run in a cluster of big data(cloudera), so i think that i have to put the function into pyspark, please give a hand if you can.
import pandas as pd import itertools as itts
number_list = [10953, 10423, 10053]
def reducer(nums): def ranges(n): print(n) return range(n, -1, -1)
num_list = list(map(ranges, nums)) return list(itts.product(*num_list))
data=pd.DataFrame(reducer(number_list)) print(data)
Upvotes: 0
Reputation:
Assuming the original RDD is in text format. The following code has not been run on spark, but the solution should be something like this.
from itertools import combinations
import re
def clean(dirtyRecord):
"""
Accept a String value "key1, (val_id1,key11), (val_id2,key22), ..."
and convert it into record of the form
[key1, [(val_id1, key11), (val_id2, key12), ... ]]
"""
splitRecord = dirtyRecord.split(', ')
# The splitRecord[0] is the 'key'
splits = [re.search('\((\w+),(\w+)\)', tuples).groups() for tuples in splitRecord[1:]]
updateSplitsWithFloat = []
for item in splits:
updateSplitsWithFloat.append((item[0], float(item[1]))
splits = splitRecord[0] + updateSplitsWithFloat
return splits
def genCombinations(features):
"""
Accept a list [(val_id1, key11), (val_id2, key12), ... ]
and generate the output asked in question
"""
val_ids, vals = zip(*features)
val_ids = combinations(val_ids, repeat=2)
vals = map(lambda x: x[0] * x[1], combinations(vals, repeat=2))
return [(t0, t1) for t0, t1 in zip(val_ids, vals)]
# Begin processing the original data
valuesPerKeyRDD = (originalRawRDD
.map(lambda x: clean(x))
.map(lambda x: (x[0], genCombinations(x[1])))
.cache())
Upvotes: 2
Reputation: 1257
Here is the functions (it assumes inner sets are dictionaries, as they should be. but if you do not have that, you can always replace .keys() function to your own where you need to loop through inner tuple list and get back the keys)
def get_res(tup):
based_dict = tup[1]
k = tup[0]
generated_tupes = []
for comb in itertools.combinations(based_dict.keys(),2):
value = str(based_dict[comb[0]])+"**"+str(based_dict[comb[1]])
generated_tupes.append((comb,value))
return (k,generated_tupes)
You can test it without spark:
>>> based
[('k1', {'id2': 12, 'id3': 13, 'id1': 11}), ('k2', {'id4': 24, 'id5': 25})]
>>> transformed = map(get_res,based)
>>> transformed
[('k1', [(('id2', 'id3'), '12**13'), (('id2', 'id1'), '12**11'), (('id3', 'id1'), '13**11')]), ('k2', [(('id4', 'id5'), '24**25')])]
In your spark code, just call rdd.map(get_res)
Upvotes: 0