OTM
OTM

Reputation: 318

How do I programmatically append records to a hive table using a loop sparksql?

I wrote the spark logic below.

High level: The code loops through some data, pulls some records back in batches, applies some logic to those records and appends the output to another table created at run time. The job completes successfully but the table is empty.

Detailed: The code should create a spark data frame with 3 names. For each name, the code constructs a query, using the name as a filter condition, applies some logic to the returned data and stores it in a new spark data frame (output_spark_df). This data frame, then, gets converted to a temp table and spark.sql is, then, used to insert the results into my_database.my_results. my_database.my_results should have data loaded into it 3 times. Despite the job completeing successfully, my_database.my_results remains empty.

Any guidance would be greatly appreciated.

if __name__ == "__main__":
    spark = SparkSession.builder.appName('batch_job').config("spark.kryoserializer.buffer.max", "2047mb").config("spark.sql.broadcastTimeout", "-1").config("spark.sql.autoBroadcastJoinThreshold","-1").getOrCreate()

    # Set up hive table to capture results
    #-------------------------------------
    spark.sql("DROP TABLE IF EXISTS my_database.my_results")
    spark.sql("CREATE TABLE IF NOT EXISTS my_database.my_results (field1 STRING, field2 INT) STORED AS PARQUET")

    names = spark.sql("select distinct name from my_database.my_input where name IN ('mike','jane','ryan')")

    for n in names:
        input_spark_df = spark.sql("select * from my_database.my_input where name = '{}'".format(n))
        .
        .
        .
        <APPLY LOGIC>
        .
        .
        .
        output_spark_df = <logic applied>

        # Capture output and append to pre-created hive table
        #----------------------------------------------------
        output_spark_df.registerTempTable("results")
        spark.sql("INSERT INTO TABLE my_database.my_results  SELECT * FROM results")

    spark.stop()

Upvotes: 1

Views: 700

Answers (1)

notNull
notNull

Reputation: 31550

names is still an dataframe in your code as you are looping over dataframe which result no matching records inside your for loop.

To make names variable as list we need to do flatMap and collect to create a list then loop over the list.

Fix:

# create names list
names=spark.sql("select distinct id as id from default.i").\
      rdd.\
      flatMap(lambda z:z).\
      collect()

# to print values in the list

for n in names:
    print(n)

Example with sample data:

#sample data
spark.sql("select distinct id as id from default.i").show()
#+---+
#| id|
#+---+
#|  1|
#|  2|
#|  3|
#+---+

#creating a list
names=spark.sql("select distinct id as id from default.i").flatMap(lambda z:z).collect()

#looping over the list
for n in names:
    spark.sql("select * from default.i where id = '{}'".format(n)).show()

#result 
#+---+
#| id|
#+---+
#|  1|
#+---+
#
#+---+
#| id|
#+---+
#|  2|
#+---+
#
#+---+
#| id|
#+---+
#|  3|
#+---+

Upvotes: 1

Related Questions