Luis A.G.
Luis A.G.

Reputation: 1097

Save python data object to file in google storage from a pyspark job running in dataproc

I'm collecting metrics while running a pyspark job with dataproc and I'm unable to persist them in google storage (using only python functions, not Spark).

The point is that I can save them and during the execution, I read and modificate them successfully, but when the job ends, there's nothing in my google storage folder.

Is it possible to persist python objects or this is only possible using pyspark libraries?

Edit: I add a code snippet to clarify the question

# Python
import pandas as pd

# Pyspark
from pyspark.sql import SparkSession

# Google storage filepath
filepath = 'gs://[PATH]/'

spark_session = SparkSession.builder.getOrCreate()

sdf = spark_session.createDataFrame([[1],[2],[3],[4],[5]], ['col'])
pdf = pd.DataFrame([1,2,3,4,5], columns=['col'])

# Save the pandas dataframe (THIS IS NOT PERFORMED IN MY BUCKET)
pdf.to_pickle(filepath + 'pickle.pkl' )

# Save the spark dataframe (THIS IS PERFORMED IN MY BUCKET)
sdf.write.csv(filepath + 'spark_dataframe.csv')

# read pickle (THIS WORKS BUT ONLY DURING THIS JOB EXECUTION, 
# IT'S NOT ACCESSIBLE BY ME, maybe its in some temporal folder only)
df_read = pd.read_pickle(filepath + 'pickle.pkl' )

Upvotes: 2

Views: 3199

Answers (1)

Guillem Xercavins
Guillem Xercavins

Reputation: 7058

Elaborating on my previous comments, I modified your example to copy Pickle objects to GCS:

# Python
import pandas as pd
from subprocess import call
from os.path import join

# Pyspark
from pyspark.sql import SparkSession

# Google storage filepath
filepath = 'gs://BUCKET_NAME/pickle/'
filename = 'pickle.pkl'

spark_session = SparkSession.builder.getOrCreate()

sdf = spark_session.createDataFrame([[1],[2],[3],[4],[5]], ['col'])
pdf = pd.DataFrame([1,2,3,4,5], columns=['col'])

# Save the pandas dataframe locally
pdf.to_pickle('./gsutil/' + filename )
pdf.to_pickle('./distcp/' + filename )

# Synch with bucket
call(["gsutil","-m","cp",'./gsutil/',join(filepath,filename)])

call(["hadoop","fs","-put","./distcp/","/user/test/"])
call(["hadoop","distcp","/user/test/distcp/" + filename,join(filepath,"distcp/" + filename)])

Also, be sure to create the necessary folders (local and HDFS) and replace the correct BUCKET_NAME beforehand for the example to work.

Upvotes: 2

Related Questions