Reputation: 65
I have an Amazon EMR Cluster - 30 nodes My Python code looks like this -
spark = SparkSession \
.builder \
.appName("App") \
.config(conf=sparkConf) \
.getOrCreate()
def fetchCatData(cat, tableName):
df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat))
df_pandas = df_gl.select("*").toPandas()
df_pandas.to_csv("/tmp/split/{}_{}.csv".format(tableName, cat))
catList = [14, 15, 63, 65, 74, 21, 23, 60, 79, 86, 107, 147, 196, 199, 200, 201, 229, 263, 265, 267, 328, 421, 468, 469,504]
tableList = ["Table1","Table2"
,"Table3",
"Table4", "Table5", "Table6",
"Table7"
]
def main(args):
log4jLogger = spark._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
for table in tableList:
LOGGER.info("Starting Split for {}".format(table))
dataLocation = "s3://test/APP/{}".format( table)
df = spark.read.parquet(dataLocation)
df = df.repartition("CATEGORY").cache()
df.createOrReplaceTempView(table)
for cat in catList:
fetchGLData(cat, table)
I want to solve the following problem -
What is the best way to solve this problem in least amount of time ?
Upvotes: 0
Views: 1056
Reputation: 85
Not sure why you want to convert to pandas dataframe but using spark dataframe created from your spark sql you can directly write to csv.
However, if you want csv as one file you will need to repartition to 1 which will not use all nodes. If you not bothered about how many files it generates then you could repartition dataframe to include more partitions. Each partition would then be processed by nodes and outputted until all partitions are complete.
Single file not using all nodes (note .csv will be folder contain actual csv)
df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat))
df_gl.repartition(1).write.mode("overwrite").csv("/tmp/split/{}_{}.csv".format(tableName, cat))
Parallel processing using multiple nodes and outputting as multiple split files (note .csv will be folder contain actual csv's)
df_gl = spark.sql("select * from {} where category = {}".format(tableName, cat)).repartition(10)
df_gl.write.mode("overwrite").csv("/tmp/split/{}_{}.csv".format(tableName, cat))
Upvotes: 1