Reputation: 2158
I'm using PySpark and I'm looking for a way to modify 4 rdds, which are included in a list. When I display my list, I have something like this :
So given :
for r in repartionned_rdd:
print r.collect()
Gives :
[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=', (u'f', u'KSJakOd2|KtC9ZF9h'))]
[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=', (u'f', u'KxrylzuA|KpSXJwH2')), (u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=', (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905')), (u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=', (u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723')), (u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=', (u'f', u'KOQqBzhU|KrDt5GC4')), (u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=', (u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c')), (u'331d8410d4924e72b0f0585e888c85ce', (u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'))]
[(u'28b195c271f14a329235c262e7baecbf', (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'))]
[(u'c65ac2064bc14116a363125392dcc6f7', (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05')), (u'171f92200d634d62bdc6685bdb7a94e3', (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr')), (u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=', (u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0')), (u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=', (u'f', u'KlY10YxX|KyCVx_km'))]
My objectif is to add a kind of new "column" to every rdd in this list. This row will contains a unique index for every rdd. my code :
for i, rdd in enumerate(repartionned_rdd):
new_rdd = rdd.map(lambda x : x + (float(i), ))
print new_rdd.collect()
Which gives :
[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=',
(u'f', u'KSJakOd2|KtC9ZF9h'), 0.0)]
[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=',
(u'f', u'KxrylzuA|KpSXJwH2'), 1.0),
(u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=',
(u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 1.0),
(u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=',
(u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 1.0),
(u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=',
(u'f', u'KOQqBzhU|KrDt5GC4'), 1.0),
(u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=',
(u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c'), 1.0),
(u'331d8410d4924e72b0f0585e888c85ce',
(u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'), 1.0)]
[(u'28b195c271f14a329235c262e7baecbf',
(u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 2.0)]
[(u'c65ac2064bc14116a363125392dcc6f7',
(u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0),
(u'171f92200d634d62bdc6685bdb7a94e3',
(u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr'), 3.0),
(u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=',
(u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0'), 3.0),
(u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=',
(u'f', u'KlY10YxX|KyCVx_km'), 3.0)]
So every row in my new_rdd contains a new columns, which is concretely the rdd's index (as mentioned in the code!)
My objectif now is just to put all those new rdds in one unique rdd. I tried this :
all_rdds_list =[]
for i, rdd in enumerate(repartionned_rdd):
new_rdd = rdd.map(lambda x : x + (float(i), ))
all_rdds_list.append(new_rdd)
But when I tried to display my rdds, I got this :
for x in all_rdds_list:
print x.collect()
Result :
[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=',
(u'f', u'KSJakOd2|KtC9ZF9h'), 3.0)]
[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=',
(u'f', u'KxrylzuA|KpSXJwH2'), 3.0),
(u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=',
(u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 3.0),
(u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=',
(u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 3.0),
(u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=',
(u'f', u'KOQqBzhU|KrDt5GC4'), 3.0),
(u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=',
(u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c'), 3.0),
(u'331d8410d4924e72b0f0585e888c85ce',
(u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'), 3.0)]
[(u'28b195c271f14a329235c262e7baecbf',
(u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 3.0)]
[(u'c65ac2064bc14116a363125392dcc6f7',
(u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0),
(u'171f92200d634d62bdc6685bdb7a94e3',
(u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr'), 3.0),
(u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=',
(u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0'), 3.0),
(u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=',
(u'f', u'KlY10YxX|KyCVx_km'), 3.0)]
Help ? thx !
Upvotes: 1
Views: 2524
Reputation: 2158
The best way to do this :
def get_population_id(repartionned_rdd):
idx = range(len(repartionned_rdd))
FullRDD = sc.emptyRDD()
for (i, rdd) in zip(idx, repartionned_rdd):
FullRDD = FullRDD.union(rdd.map(lambda x: x + (float(i),)))
return FullRDD
Upvotes: 0
Reputation: 8571
You have two problems in your approach. First you use a variable which changes before the method assignment is evaluated. The map calls are transformations and thus only execute when you apply an action (like collect
). This is why when you collect inside the enumerate loop you see the correct additional column, but in your later example it chooses the last value of i
for each mapping.
The second issue is that if you're trying to union rdds together you should use the union
function rather than a list of rdds. If you actually wanted a list of rdds, then you can replace the union below with the list append you had before.
full_rdd = None
for i, rdd in enumerate(repartionned_rdd):
new_rdd = rdd.map(lambda x : x + (float(i),))
if full_rdd is None:
full_rdd = new_rdd
else:
full_rdd = sc.union([full_rdd, new_rdd])
# This will force the lazy evaluation to execute now before `i` changes
full_rdd.count()
Upvotes: 1