Yoan B. M.Sc
Yoan B. M.Sc

Reputation: 1505

Datatype in converting dataframe from pandas to pyspark into foundry

For those of you working into Foundry's environnement, I'm trying to build a pipeline in "Code repositories" to process a raw dataset (from Excel file) into a clean one that I'll analyse in "Contour" later. To that end I used python, except that pipeline seems to be using pyspark and at some point I must convert the dataset I've cleaned with pandas into a pyspark one and that's where i'm stuck.

I've looked at several post on stackover flow to convert Pandas DF to Pyspark DF but none of the solutions seems to be working so far. When I try to run the transform, there's always a datatype failing to be converted evethough I forced a schema.

The Python code section has been tested succefully in Spyder (importing and exporting has an Excel file) and give the expected result. It's only when I need to convert to pyspark that it fails somehow.

@transform_pandas(
    Output("/MDM_OUT_OF_SERVICE_EVENTS_CLEAN"),
    OOS_raw=Input("/MDM_OUT_OF_SERVICE_EVENTS"),
)
def DA_transform(OOS_raw):

''' Code Section in Python '''

  mySchema=StructType([StructField(OOS_dup.columns[0], IntegerType(), 
                   True),
                   StructField(OOS_dup.columns[1], StringType(), True),
                   ...])

  OOS_out=sqlContext.createDataFrame(OOS_dup,schema 
    =mySchema,verifySchema=False)

return OOS_out

I got this error message at some point :

AttributeError: 'unicode' object has no attribute 'toordinal'.

According to this post : What is causing 'unicode' object has no attribute 'toordinal' in pyspark?

it's because pyspark fail to convert the Data into Datetype

but data is in Datetime64[ns] in pandas. I've tried converting this columns into string and integer and it fails as well.

Here is a picture of the output dataset from Python : enter image description here

Here is the datatypes return by pandas once the data set has been cleaned :

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4972 entries, 0 to 4971
Data columns (total 51 columns):
OOS_ID                       4972 non-null int64
OPERATOR_CODE                4972 non-null object
ATA_CAUSE                    4972 non-null int64
EVENT_CODE                   3122 non-null object
AC_MODEL                     4972 non-null object
AC_SN                        4972 non-null int64
OOS_DATE                     4972 non-null datetime64[ns]
AIRPORT_CODE                 4915 non-null object
RTS_DATE                     4972 non-null datetime64[ns]
EVENT_TYPE                   4972 non-null object
CORRECTIVE_ACTION            417 non-null object
DD_HOURS_OOS                 4972 non-null float64
EVENT_DESCRIPTION            4972 non-null object
EVENT_CATEGORY               4972 non-null object
ATA_REPORTED                 324 non-null float64
TOTAL_CAUSES                 4875 non-null float64
EVENT_NUMBER                 3117 non-null float64
RTS_TIME                     4972 non-null object
OOS_TIME                     4972 non-null object
PREV_REPORTED                4972 non-null object
FERRY_IND                    4972 non-null object
REPAIR_STN_CODE              355 non-null object
MAINT_DOWN_TIME              4972 non-null float64
LOGBOOK_RECORD_IDENTIFIER    343 non-null object
RTS_IND                      4972 non-null object
READY_FOR_USE                924 non-null object
DQ_COMMENTS                  2 non-null object
REVIEWED                     5 non-null object
DOES_NOT_MEET_SPECS          4 non-null object
CORRECTED                    12 non-null object
EDITED_BY                    4972 non-null object
EDIT_DATE                    4972 non-null datetime64[ns]
OUTSTATION_INDICATOR         3801 non-null object
COMMENT_TEXT                 11 non-null object
ATA_CAUSE_CHAPTER            4972 non-null int64
ATA_CAUSE_SECTION            4972 non-null int64
ATA_CAUSE_COMPONENT          770 non-null float64
PROCESSOR_COMMENTS           83 non-null object
PARTS_AVAIL_AT_STATION       4972 non-null object
PARTS_SHIPPED_AT_STATION     4972 non-null object
ENGINEER_AT_STATION          4972 non-null object
ENGINEER_SENT_AT_STATION     4972 non-null object
SOURCE_FILE                  4972 non-null object
OOS_Month                    4972 non-null float64
OOS_Hour                     4972 non-null float64
OOS_Min                      4972 non-null float64
RTS_Month                    4972 non-null float64
RTS_Hour                     4972 non-null float64
RTS_Min                      4972 non-null float64
OOS_Timestamp                4972 non-null datetime64[ns]
RTS_Timestamp                4972 non-null datetime64[ns]
dtypes: datetime64[ns](5), float64(12), int64(5), object(29)

Upvotes: 4

Views: 1575

Answers (1)

Yoan B. M.Sc
Yoan B. M.Sc

Reputation: 1505

In case it might help some of you I found this in the offical Foundry documentation on how to properly transition between pandas and pyspark DF.

OOS_dup is my Pandas dataframe I want to convert back to Spark.

# Extract the name of each columns with its data type in pandas
    col = OOS_dup.columns
    col_type = list()

    for c in col:
        t = OOS_dup[c].dtype.name
        col_type.append(t)

    df_schema = pd.DataFrame({"field": col, "data_type": col_type})

    # Define a function to replace missing (NaN sky coverage cells with Null
    def replace_missing(df, col_names):
        for col in col_names:
            df = df.withColumn("{}".format(col),
                               F.when(df["{}".format(col)] == "NaN", None).otherwise(df["{}".format(col)]))
        return df

    # Replace missing values
    OOS_dup = replace_missing(OOS_dup, col)

    # Define a function to change column types to the proper type in spark
    def change_type(df, col_names, dtypes):
        for col in col_names:
            df = df.withColumn("{}".format(col), F.when(dtypes == "float64", (df["{}".format(col)]).cast("double")).when(dtypes == "int64", (df["{}".format(col)]).cast("int")).when(dtypes == "datetime64[ns]", (df["{}".format(col)]).cast("date")).otherwise((df["{}".format(col)]).cast("string")))
        return df

    # Cast each columns to the proper data type
    OOS_dup = change_type(OOS_dup, df_schema["field"], df_schema["data_type"])

    OOS_dup = sqlContext.createDataFrame(OOS_dup)

Upvotes: 0

Related Questions