Idhem
Idhem

Reputation: 964

Apache beam left join between 2 pcollections

I'm trying to do a left join between a pcollection and its duplicate, so I'm looking for something like this:

((colA, colB, colC, colD))
(a,b,e,f)
(a,b,g,h)
(a,b,i,j)
(c,d,k,l)
(c,d,m,n)

Doing left join on colA and colB, so the result would look like this:

(e,f, g,h)
(e,f, i,j)
(g,h, i,j)

(k,l, m,n)

I came to solve it using apache beam dataframe:

df = to_dataframe(pcol)

with dataframe.allow_non_parallel_operations():
     res = df.merge(right=df, left_on=['colA', 'colB'], right_on=['colA', 'colB'])
pcoll = to_pcollection(res)

and it was working fine, but when I have to process pcollection of larget rows, I have out of memory error (that was expected)

Now I'm looking for an alternative of df.merger() but with pcollection so I don't face the memory error

Upvotes: 0

Views: 406

Answers (1)

Idhem
Idhem

Reputation: 964

If any one interested in this issue

I though about an alternative logic. First I grouped my records by key, like this:

((a,b),(e,f))
((a,b),(g,h))
((a,b),(i,j))
((c,d),(k,l))
((c,d),(m,n))

after that I combined them using GroupByKey
In the next transformation I tried to loop throw all the possible combinations

class combineLev(beam.DoFn):
    #this act like df.merge
    def process(self, element):
        (k, v) = element
        v_ = list(v)
        for i in range(len(v_)):
            for j in range(i,len(v_)):
                if v_[i][1] != v_[j][1]:
                    #print(list_[i][1], list_[j][1])
                    yield (v_[i], v_[j])

Upvotes: 0

Related Questions