Reputation: 1505
For those of you working into Foundry's environnement, I'm trying to build a pipeline in "Code repositories" to process a raw dataset (from Excel file) into a clean one that I'll analyse in "Contour" later. To that end I used python, except that pipeline seems to be using pyspark and at some point I must convert the dataset I've cleaned with pandas into a pyspark one and that's where i'm stuck.
I've looked at several post on stackover flow to convert Pandas DF to Pyspark DF but none of the solutions seems to be working so far. When I try to run the transform, there's always a datatype failing to be converted evethough I forced a schema.
The Python code section has been tested succefully in Spyder (importing and exporting has an Excel file) and give the expected result. It's only when I need to convert to pyspark that it fails somehow.
@transform_pandas(
Output("/MDM_OUT_OF_SERVICE_EVENTS_CLEAN"),
OOS_raw=Input("/MDM_OUT_OF_SERVICE_EVENTS"),
)
def DA_transform(OOS_raw):
''' Code Section in Python '''
mySchema=StructType([StructField(OOS_dup.columns[0], IntegerType(),
True),
StructField(OOS_dup.columns[1], StringType(), True),
...])
OOS_out=sqlContext.createDataFrame(OOS_dup,schema
=mySchema,verifySchema=False)
return OOS_out
I got this error message at some point :
AttributeError: 'unicode' object has no attribute 'toordinal'.
According to this post : What is causing 'unicode' object has no attribute 'toordinal' in pyspark?
it's because pyspark fail to convert the Data into Datetype
but data is in Datetime64[ns]
in pandas. I've tried converting this columns into string and integer and it fails as well.
Here is a picture of the output dataset from Python :
Here is the datatypes return by pandas once the data set has been cleaned :
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4972 entries, 0 to 4971
Data columns (total 51 columns):
OOS_ID 4972 non-null int64
OPERATOR_CODE 4972 non-null object
ATA_CAUSE 4972 non-null int64
EVENT_CODE 3122 non-null object
AC_MODEL 4972 non-null object
AC_SN 4972 non-null int64
OOS_DATE 4972 non-null datetime64[ns]
AIRPORT_CODE 4915 non-null object
RTS_DATE 4972 non-null datetime64[ns]
EVENT_TYPE 4972 non-null object
CORRECTIVE_ACTION 417 non-null object
DD_HOURS_OOS 4972 non-null float64
EVENT_DESCRIPTION 4972 non-null object
EVENT_CATEGORY 4972 non-null object
ATA_REPORTED 324 non-null float64
TOTAL_CAUSES 4875 non-null float64
EVENT_NUMBER 3117 non-null float64
RTS_TIME 4972 non-null object
OOS_TIME 4972 non-null object
PREV_REPORTED 4972 non-null object
FERRY_IND 4972 non-null object
REPAIR_STN_CODE 355 non-null object
MAINT_DOWN_TIME 4972 non-null float64
LOGBOOK_RECORD_IDENTIFIER 343 non-null object
RTS_IND 4972 non-null object
READY_FOR_USE 924 non-null object
DQ_COMMENTS 2 non-null object
REVIEWED 5 non-null object
DOES_NOT_MEET_SPECS 4 non-null object
CORRECTED 12 non-null object
EDITED_BY 4972 non-null object
EDIT_DATE 4972 non-null datetime64[ns]
OUTSTATION_INDICATOR 3801 non-null object
COMMENT_TEXT 11 non-null object
ATA_CAUSE_CHAPTER 4972 non-null int64
ATA_CAUSE_SECTION 4972 non-null int64
ATA_CAUSE_COMPONENT 770 non-null float64
PROCESSOR_COMMENTS 83 non-null object
PARTS_AVAIL_AT_STATION 4972 non-null object
PARTS_SHIPPED_AT_STATION 4972 non-null object
ENGINEER_AT_STATION 4972 non-null object
ENGINEER_SENT_AT_STATION 4972 non-null object
SOURCE_FILE 4972 non-null object
OOS_Month 4972 non-null float64
OOS_Hour 4972 non-null float64
OOS_Min 4972 non-null float64
RTS_Month 4972 non-null float64
RTS_Hour 4972 non-null float64
RTS_Min 4972 non-null float64
OOS_Timestamp 4972 non-null datetime64[ns]
RTS_Timestamp 4972 non-null datetime64[ns]
dtypes: datetime64[ns](5), float64(12), int64(5), object(29)
Upvotes: 4
Views: 1575
Reputation: 1505
In case it might help some of you I found this in the offical Foundry documentation on how to properly transition between pandas and pyspark DF.
OOS_dup is my Pandas dataframe I want to convert back to Spark.
# Extract the name of each columns with its data type in pandas
col = OOS_dup.columns
col_type = list()
for c in col:
t = OOS_dup[c].dtype.name
col_type.append(t)
df_schema = pd.DataFrame({"field": col, "data_type": col_type})
# Define a function to replace missing (NaN sky coverage cells with Null
def replace_missing(df, col_names):
for col in col_names:
df = df.withColumn("{}".format(col),
F.when(df["{}".format(col)] == "NaN", None).otherwise(df["{}".format(col)]))
return df
# Replace missing values
OOS_dup = replace_missing(OOS_dup, col)
# Define a function to change column types to the proper type in spark
def change_type(df, col_names, dtypes):
for col in col_names:
df = df.withColumn("{}".format(col), F.when(dtypes == "float64", (df["{}".format(col)]).cast("double")).when(dtypes == "int64", (df["{}".format(col)]).cast("int")).when(dtypes == "datetime64[ns]", (df["{}".format(col)]).cast("date")).otherwise((df["{}".format(col)]).cast("string")))
return df
# Cast each columns to the proper data type
OOS_dup = change_type(OOS_dup, df_schema["field"], df_schema["data_type"])
OOS_dup = sqlContext.createDataFrame(OOS_dup)
Upvotes: 0