Reputation: 964
I'm trying to do a left join between a pcollection and its duplicate, so I'm looking for something like this:
((colA, colB, colC, colD))
(a,b,e,f)
(a,b,g,h)
(a,b,i,j)
(c,d,k,l)
(c,d,m,n)
Doing left join on colA and colB, so the result would look like this:
(e,f, g,h)
(e,f, i,j)
(g,h, i,j)
(k,l, m,n)
I came to solve it using apache beam dataframe:
df = to_dataframe(pcol)
with dataframe.allow_non_parallel_operations():
res = df.merge(right=df, left_on=['colA', 'colB'], right_on=['colA', 'colB'])
pcoll = to_pcollection(res)
and it was working fine, but when I have to process pcollection of larget rows, I have out of memory error (that was expected)
Now I'm looking for an alternative of df.merger() but with pcollection so I don't face the memory error
Upvotes: 0
Views: 406
Reputation: 964
If any one interested in this issue
I though about an alternative logic. First I grouped my records by key, like this:
((a,b),(e,f))
((a,b),(g,h))
((a,b),(i,j))
((c,d),(k,l))
((c,d),(m,n))
after that I combined them using GroupByKey
In the next transformation I tried to loop throw all the possible combinations
class combineLev(beam.DoFn):
#this act like df.merge
def process(self, element):
(k, v) = element
v_ = list(v)
for i in range(len(v_)):
for j in range(i,len(v_)):
if v_[i][1] != v_[j][1]:
#print(list_[i][1], list_[j][1])
yield (v_[i], v_[j])
Upvotes: 0