Reputation: 1142
I have an Oracle table with about 200 million records. I must check these records and find any records which at least one of its field has problem.So, I have list of fields that should not been NULL and have to find records which have the fields NULL and save that records to the Error_table to report. My code is in the following:
def detect_errors(df, id, part, spark, repeated_id):
list_repetitived_id = list(repeated_id.keys()
entity_name = part.split('_')[0]
# Create an expected schema for df error
columns = StructType([StructField('date_time', StringType(), True),
StructField('entity_name', StringType(), True),
StructField('id', StringType(), True),
StructField('error_type', StringType(), True),
StructField('record', StringType(), True)])
df_error = spark.createDataFrame(data=[], schema=columns)
columns_partition = StructType([StructField('date_group', StringType(), True),
StructField('ac_id', StringType(), True),
StructField('partition_id', StringType(), True)])
df_partitions = spark.createDataFrame(data=[], schema=columns_partition)
# writing repetitived id in error table:
if len(list_repetitived_id) > 0:
records_repetitived_id = df.filter(col(id).isin(list_repetitived_id))
# Adding this line for Partition State
df_partitions = records_repetitived_id.select("date", "ac_id", "partition_id")\
.withColumnRenamed('DATE','date_group').union(df_partitions)
error_type = 'Repetitived_' + id
df_error = filling_df_error(records_repetitived_id, id, entity_name, error_type).union(df_error)
# Extract all columns of the table which must not be null
list_fields = [i for i in df.columns if gf.config['company'][part][i]['check']]
df_check = df.filter(~col(id).isin(list_repetitived_id))
for i in list_fields:
df_null = df_check.filter(col(i).isNull())
list_id = list(df_null.select(id).toPandas()[id])
df_check = df_check.filter(~col(id).isin(list_id))
if df_null.first() != None:
df_partitions = df_null.select("date", "ac_id", "partition_id")\
.withColumnRenamed('DATE','date_group').union(df_partitions)
error_type = 'Null_Value' + '_' + i
df_error = filling_df_error(df_null, id, entity_name, error_type).union(df_error)
list_id = list(df_error.select('id').toPandas()['id'])
df = df.filter(~col(id).isin(list_id))
return df_error, df,df_partitions
In the above code, df_partitions
is a spark dataframe with State of each partitions which are Finished or not.
The code run without any error, but it takes a lot of time.
DAG of the code:
Would you please guide me how to improve the code to run faster for 200 million records?
Any help is really appreciated.
Upvotes: 0
Views: 965
Reputation: 6644
If the intention is to check if a column (from a list of columns) is null, we can use when().otherwise()
to flag the records and create a new dataframe using the flagged records.
Here's an example.
data_sdf = spark.sparkContext.parallelize(data_ls). \
toDF(['col1', 'col2', 'col3', 'col4', 'col5'])
# +----+----+----+----+----+
# |col1|col2|col3|col4|col5|
# +----+----+----+----+----+
# | 1| 1| 1| 1| 1|
# | 1|null| 1|null|null|
# | 1| 0|null|null| 1|
# +----+----+----+----+----+
# list of columns that should not be null
cols_to_check = ['col1', 'col3', 'col5']
# condition to check if any of the columns in `cols_to_check` has null
when_null_condition = reduce(lambda x, y: x | y, [func.col(k).isNull() for k in cols_to_check])
# Column<'(((col1 IS NULL) OR (col3 IS NULL)) OR (col5 IS NULL))'>
data_sdf. \
withColumn('null_flag',
func.when(when_null_condition, func.lit(1)).
otherwise(func.lit(0))
). \
show()
# +----+----+----+----+----+---------+
# |col1|col2|col3|col4|col5|null_flag|
# +----+----+----+----+----+---------+
# | 1| 1| 1| 1| 1| 0|
# | 1|null| 1|null|null| 1|
# | 1| 0|null|null| 1| 1|
# +----+----+----+----+----+---------+
Use the null_flag = 1
filter to create your error_table
dataframe.
Upvotes: 1