M_Gh
M_Gh

Reputation: 1142

Checking 200 million data using Pyspark, takes a lot of time

I have an Oracle table with about 200 million records. I must check these records and find any records which at least one of its field has problem.So, I have list of fields that should not been NULL and have to find records which have the fields NULL and save that records to the Error_table to report. My code is in the following:

 def detect_errors(df, id, part, spark, repeated_id):
  list_repetitived_id = list(repeated_id.keys()
  entity_name = part.split('_')[0]
  # Create an expected schema for df error
  columns = StructType([StructField('date_time', StringType(), True),
                      StructField('entity_name', StringType(), True),
                      StructField('id', StringType(), True),
                      StructField('error_type', StringType(), True),
                      StructField('record', StringType(), True)])
  df_error = spark.createDataFrame(data=[], schema=columns)

  columns_partition = StructType([StructField('date_group', StringType(), True),
                                StructField('ac_id', StringType(), True),
                                StructField('partition_id', StringType(), True)])
  df_partitions = spark.createDataFrame(data=[], schema=columns_partition)
  # writing repetitived id in error table:
  if len(list_repetitived_id) > 0:
     records_repetitived_id = df.filter(col(id).isin(list_repetitived_id))
     # Adding this line for Partition State
     df_partitions = records_repetitived_id.select("date", "ac_id", "partition_id")\
        .withColumnRenamed('DATE','date_group').union(df_partitions)
     error_type = 'Repetitived_' + id
     df_error = filling_df_error(records_repetitived_id, id, entity_name, error_type).union(df_error)
# Extract all columns of the table which must not be null
list_fields = [i for i in df.columns if gf.config['company'][part][i]['check']]

df_check = df.filter(~col(id).isin(list_repetitived_id))

for i in list_fields:
    df_null = df_check.filter(col(i).isNull())
    list_id = list(df_null.select(id).toPandas()[id])
    df_check = df_check.filter(~col(id).isin(list_id))
    if df_null.first() != None:
        df_partitions = df_null.select("date", "ac_id", "partition_id")\
            .withColumnRenamed('DATE','date_group').union(df_partitions)
        error_type = 'Null_Value' + '_' + i
        df_error = filling_df_error(df_null, id, entity_name, error_type).union(df_error)

list_id = list(df_error.select('id').toPandas()['id'])
df = df.filter(~col(id).isin(list_id))

return df_error, df,df_partitions

In the above code, df_partitions is a spark dataframe with State of each partitions which are Finished or not. The code run without any error, but it takes a lot of time. DAG of the code:

TimeLine and DAG

Would you please guide me how to improve the code to run faster for 200 million records?

Any help is really appreciated.

Upvotes: 0

Views: 965

Answers (1)

samkart
samkart

Reputation: 6644

If the intention is to check if a column (from a list of columns) is null, we can use when().otherwise() to flag the records and create a new dataframe using the flagged records.

Here's an example.

data_sdf = spark.sparkContext.parallelize(data_ls). \
    toDF(['col1', 'col2', 'col3', 'col4', 'col5'])

# +----+----+----+----+----+
# |col1|col2|col3|col4|col5|
# +----+----+----+----+----+
# |   1|   1|   1|   1|   1|
# |   1|null|   1|null|null|
# |   1|   0|null|null|   1|
# +----+----+----+----+----+

# list of columns that should not be null
cols_to_check = ['col1', 'col3', 'col5']

# condition to check if any of the columns in `cols_to_check` has null
when_null_condition = reduce(lambda x, y: x | y, [func.col(k).isNull() for k in cols_to_check])
# Column<'(((col1 IS NULL) OR (col3 IS NULL)) OR (col5 IS NULL))'>

data_sdf. \
    withColumn('null_flag',
               func.when(when_null_condition, func.lit(1)).
               otherwise(func.lit(0))
               ). \
    show()

# +----+----+----+----+----+---------+
# |col1|col2|col3|col4|col5|null_flag|
# +----+----+----+----+----+---------+
# |   1|   1|   1|   1|   1|        0|
# |   1|null|   1|null|null|        1|
# |   1|   0|null|null|   1|        1|
# +----+----+----+----+----+---------+

Use the null_flag = 1 filter to create your error_table dataframe.

Upvotes: 1

Related Questions