Reputation: 1542
I have this piece of code which is very long.Can anyone suggest an idea as how to make it small .
I am trying to do it via for loop by taking the number of splits as length and calling the function but not sure how to pass each split .
def triggerexec(df_count,event_df):
if df_count <= 10000:
print("Input partioned into 2 splits.")
df_splits = event_df.randomSplit(([1.0,1.0]))
if df_splits[0].rdd.isEmpty():
print("No data in 1st split")
else:
print ("Input count for 1st split: " + str(df_splits[0].count()))
extract_and_push(df_splits[0])
if df_splits[1].rdd.isEmpty():
print("No data in 2nd split")
else:
print ("Input count for 2nd split: " + str(df_splits[1].count()))
extract_and_push(df_splits[1])
elif 10001 <= df_count <= 50000:
print ("Input partioned into 4 splits.")
df_splits = event_df.randomSplit(([1.0, 1.0, 1.0, 1.0]))
if df_splits[0].rdd.isEmpty():
print("No data in 1st split")
else:
print ("Processing 1st split")# + str(df_splits[0].count()))
extract_and_push(df_splits[0])
if df_splits[1].rdd.isEmpty():
print("No data in 2nd split")
else:
print ("Processing 2nd split ")# + str(df_splits[1].count()))
extract_and_push(df_splits[1])
if df_splits[2].rdd.isEmpty():
print("No data in 3rd split")
else:
print ("Processing 3rd split")# + str(df_splits[2].count()))
extract_and_push(df_splits[2])
if df_splits[3].rdd.isEmpty():
print("No data in 4th split")
else:
print ("Processing 4th split") # + str(df_splits[3].count()))
extract_and_push(df_splits[3])
Upvotes: 0
Views: 104
Reputation: 5286
The easiest solution is to take the repetetive tasks to another funcion and execute it. I made this function general for N splits so theres no need to call it inside the different if
s.
def splits(df_splits):
for i, split in enumerate(df_splits):
if split.rdd.isEmpty():
print("No data in split number " + str(i+1))
else:
print ("Input count for split number " + str(i+1) + ": " + str(split.count()))
extract_and_push(split)
def triggerexec(df_count,event_df):
if df_count <= 10000:
print("Input partioned into 2 splits.")
df_splits = event_df.randomSplit(([1.0,1.0]))
elif df_count <= 50000:
print ("Input partioned into 4 splits.")
df_splits = event_df.randomSplit(([1.0, 1.0, 1.0, 1.0]))
else:
# Either do something in every case posible, return, or throw an error, but make sure that the splits(df_splits) is not called id df_splits is not defined
return
splits(df_splits)
A little more complex solution would accept another argument as a parameter and split it in the outter function:
def split_this(event_df, n):
print("Imput partitioned into " + str(n) + " splits.")
df_splits = event_df.randomSplit(([1.0]*n))
for i, split in enumerate(df_splits):
if split.rdd.isEmpty():
print("No data in split number " + str(i+1))
else:
print ("Input count for split number " + str(i+1) + ": " + str(split.count()))
extract_and_push(split)
def triggerexec(df_count,event_df):
if df_count <= 10000:
splits = 2
elif df_count <= 50000:
splits = 4
else:
# Default case:
splits = 10
split_this(event_df, splits)
Credit to @Lex for the enumerate part and the default case.
Upvotes: 2
Reputation: 512
Python 3 Version:
def triggerexec(df_count,event_df):
if df_count <= 10000:
print("Input partioned into 2 splits.")
df_splits = event_df.randomSplit(([1.0,1.0]))
elif df_count <= 50000:
print("Input partioned into 4 splits.")
df_splits = event_df.randomSplit(([1.0, 1.0, 1.0, 1.0]))
else:
df_splits = []
for i, split in enumerate(df_splits):
if split.rdd.isEmpty():
print("No Data in split Nr. {:d}".format(i))
else:
print("Input count for split Nr. {:d}: {:s}".format(i, split.count()))
extract_and_push(split)
This is all in one function using enumerate for the index, which is more charming way IMO
Upvotes: 0