Reputation: 318
I wrote the spark logic below.
High level: The code loops through some data, pulls some records back in batches, applies some logic to those records and appends the output to another table created at run time. The job completes successfully but the table is empty.
Detailed: The code should create a spark data frame with 3 names. For each name, the code constructs a query, using the name as a filter condition, applies some logic to the returned data and stores it in a new spark data frame (output_spark_df). This data frame, then, gets converted to a temp table and spark.sql is, then, used to insert the results into my_database.my_results. my_database.my_results should have data loaded into it 3 times. Despite the job completeing successfully, my_database.my_results remains empty.
Any guidance would be greatly appreciated.
if __name__ == "__main__":
spark = SparkSession.builder.appName('batch_job').config("spark.kryoserializer.buffer.max", "2047mb").config("spark.sql.broadcastTimeout", "-1").config("spark.sql.autoBroadcastJoinThreshold","-1").getOrCreate()
# Set up hive table to capture results
#-------------------------------------
spark.sql("DROP TABLE IF EXISTS my_database.my_results")
spark.sql("CREATE TABLE IF NOT EXISTS my_database.my_results (field1 STRING, field2 INT) STORED AS PARQUET")
names = spark.sql("select distinct name from my_database.my_input where name IN ('mike','jane','ryan')")
for n in names:
input_spark_df = spark.sql("select * from my_database.my_input where name = '{}'".format(n))
.
.
.
<APPLY LOGIC>
.
.
.
output_spark_df = <logic applied>
# Capture output and append to pre-created hive table
#----------------------------------------------------
output_spark_df.registerTempTable("results")
spark.sql("INSERT INTO TABLE my_database.my_results SELECT * FROM results")
spark.stop()
Upvotes: 1
Views: 700
Reputation: 31550
names
is still an dataframe in your code as you are looping over dataframe which result no matching records inside your for loop.
To make names
variable as list we need to do flatMap and collect
to create a list then loop over the list.
Fix:
# create names list
names=spark.sql("select distinct id as id from default.i").\
rdd.\
flatMap(lambda z:z).\
collect()
# to print values in the list
for n in names:
print(n)
Example with sample data:
#sample data
spark.sql("select distinct id as id from default.i").show()
#+---+
#| id|
#+---+
#| 1|
#| 2|
#| 3|
#+---+
#creating a list
names=spark.sql("select distinct id as id from default.i").flatMap(lambda z:z).collect()
#looping over the list
for n in names:
spark.sql("select * from default.i where id = '{}'".format(n)).show()
#result
#+---+
#| id|
#+---+
#| 1|
#+---+
#
#+---+
#| id|
#+---+
#| 2|
#+---+
#
#+---+
#| id|
#+---+
#| 3|
#+---+
Upvotes: 1