Arij SEDIRI
Arij SEDIRI

Reputation: 2158

Union RDDs after a loop PySpark

I'm using PySpark and I'm looking for a way to modify 4 rdds, which are included in a list. When I display my list, I have something like this :

enter image description here

So given :

for r in repartionned_rdd:

    print r.collect()

Gives :

[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=', (u'f', u'KSJakOd2|KtC9ZF9h'))]
[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=', (u'f', u'KxrylzuA|KpSXJwH2')), (u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=', (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905')), (u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=', (u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723')), (u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=', (u'f', u'KOQqBzhU|KrDt5GC4')), (u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=', (u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c')), (u'331d8410d4924e72b0f0585e888c85ce', (u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'))]
[(u'28b195c271f14a329235c262e7baecbf', (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'))]
[(u'c65ac2064bc14116a363125392dcc6f7', (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05')), (u'171f92200d634d62bdc6685bdb7a94e3', (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr')), (u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=', (u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0')), (u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=', (u'f', u'KlY10YxX|KyCVx_km'))]

My objectif is to add a kind of new "column" to every rdd in this list. This row will contains a unique index for every rdd. my code :

for i, rdd in enumerate(repartionned_rdd):

    new_rdd = rdd.map(lambda x : x + (float(i), ))

    print new_rdd.collect()

Which gives :

[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=', 
 (u'f', u'KSJakOd2|KtC9ZF9h'), 0.0)]

[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=', 
 (u'f', u'KxrylzuA|KpSXJwH2'), 1.0), 
 (u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=', 
 (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 1.0),
 (u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=', 
 (u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 1.0), 
 (u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=', 
 (u'f', u'KOQqBzhU|KrDt5GC4'), 1.0), 
 (u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=', 
 (u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c'), 1.0), 
 (u'331d8410d4924e72b0f0585e888c85ce', 
 (u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'), 1.0)]

[(u'28b195c271f14a329235c262e7baecbf', 
 (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 2.0)]

[(u'c65ac2064bc14116a363125392dcc6f7', 
 (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0), 
 (u'171f92200d634d62bdc6685bdb7a94e3', 
 (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr'), 3.0),
 (u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=', 
 (u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0'), 3.0),
 (u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=', 
 (u'f', u'KlY10YxX|KyCVx_km'), 3.0)]

So every row in my new_rdd contains a new columns, which is concretely the rdd's index (as mentioned in the code!)

My objectif now is just to put all those new rdds in one unique rdd. I tried this :

all_rdds_list =[]

for i, rdd in enumerate(repartionned_rdd):

    new_rdd = rdd.map(lambda x : x + (float(i), ))

    all_rdds_list.append(new_rdd)

But when I tried to display my rdds, I got this :

for x in all_rdds_list:

    print x.collect()

Result :

[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=', 
 (u'f', u'KSJakOd2|KtC9ZF9h'), 3.0)]

[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=', 
 (u'f', u'KxrylzuA|KpSXJwH2'), 3.0), 
 (u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=', 
 (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 3.0),
 (u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=', 
 (u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 3.0), 
 (u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=', 
 (u'f', u'KOQqBzhU|KrDt5GC4'), 3.0), 
 (u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=', 
 (u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c'), 3.0), 
 (u'331d8410d4924e72b0f0585e888c85ce', 
 (u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'), 3.0)]

[(u'28b195c271f14a329235c262e7baecbf', 
 (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 3.0)]

[(u'c65ac2064bc14116a363125392dcc6f7', 
 (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0), 
 (u'171f92200d634d62bdc6685bdb7a94e3', 
 (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr'), 3.0),
 (u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=', 
 (u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0'), 3.0),
 (u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=', 
 (u'f', u'KlY10YxX|KyCVx_km'), 3.0)]

Help ? thx !

Upvotes: 1

Views: 2524

Answers (2)

Arij SEDIRI
Arij SEDIRI

Reputation: 2158

The best way to do this :

def get_population_id(repartionned_rdd):

    idx = range(len(repartionned_rdd))

    FullRDD = sc.emptyRDD()

    for (i, rdd) in zip(idx, repartionned_rdd):

        FullRDD = FullRDD.union(rdd.map(lambda x: x + (float(i),)))

    return FullRDD

Upvotes: 0

Pyrce
Pyrce

Reputation: 8571

You have two problems in your approach. First you use a variable which changes before the method assignment is evaluated. The map calls are transformations and thus only execute when you apply an action (like collect). This is why when you collect inside the enumerate loop you see the correct additional column, but in your later example it chooses the last value of i for each mapping.

The second issue is that if you're trying to union rdds together you should use the union function rather than a list of rdds. If you actually wanted a list of rdds, then you can replace the union below with the list append you had before.

full_rdd = None
for i, rdd in enumerate(repartionned_rdd):
    new_rdd = rdd.map(lambda x : x + (float(i),))
    if full_rdd is None:
        full_rdd = new_rdd
    else:
        full_rdd = sc.union([full_rdd, new_rdd])
    # This will force the lazy evaluation to execute now before `i` changes
    full_rdd.count()

Upvotes: 1

Related Questions