Nikolan Asad
Nikolan Asad

Reputation: 178

how do i let pandas working with spark cluster

the main problem with in pandas it can not handle large manipulation data , out of memory for massive CSV file , now I switched to pyspark 1.6 in Hadoop ,I have tried with dask.dataframe but still the problem exist, is there any why let pandas works with Hadoop cluster or pyspark cluster i want to use this functionality with pandas

import pandas as pd
df = pd.read_csv('text1.txt',names =['DATE','IMSI','WEBSITE','LINKUP','LINKDOWN','COUNT','CONNECTION'])
df.columns.str.strip()
df.DATE = pd.to_datetime(df.DATE)
group = df.groupby(['IMSI','WEBSITE']).agg({'DATE':[min,max,'count']
    ,'LINKUP':'sum'
    , 'LINKDOWN':'sum'
    , 'COUNT':'max'
    ,'CONNECTION':'sum'
            })
group.to_csv('finalinfo.txt', index = True, header = False)

Upvotes: 4

Views: 1217

Answers (1)

Tetlanesh
Tetlanesh

Reputation: 403

Read data from HDFS, aggregate and send back to pandas. example below uses inferSchema to get column names and types based on data but you can provide your own schema if your file have no headers or you don't like types it inferred. InferSchema require extra pass over data so depending on data size you may want to provide your own schema regardless:

from pyspark.sql import functions as f

df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';') 
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
                                      f.max('DATE').alias('max of date'),
                                      f.count('DATE').alias('count of date'),
                                      f.sum('LINKUP').alias('sum of linkup'),
                                      f.sum('LINKDOWN').alias('sum of linkdown'),
                                      f.count('COUNT').alias('count of count'),
                                      f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()

Alternatively if file is still to big for pandas you can save to csv using spark. Note that you have no control over the name of the output file - you only specify directory location that will be created and store the output and the filename will follow spark convention for temp file naming:

df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)

coalesce(1) is there to get single file as output as spark will create number of files equal to partitioning (default 200 iirc). For this to work the unpartitioned file have to fit in single worker's memory. It it is still too big dont use coalesce. Spark will save it in multiple files and you can then use HDFS getmerge to join the files afterwords.

Upvotes: 2

Related Questions