stackoverflowuser2010
stackoverflowuser2010

Reputation: 40869

Spark SQL: How to append new row to dataframe table (from another table)

I am using Spark SQL with dataframes. I have an input dataframe, and I would like to append (or insert) its rows to a larger dataframe that has more columns. How would I do that?

If this were SQL, I would use INSERT INTO OUTPUT SELECT ... FROM INPUT, but I don't know how to do that with Spark SQL.

For concreteness:

var input = sqlContext.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

var output = sqlContext.createDataFrame(Seq(
        (0L, "Jack Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")


scala> input.show()
+---+-----------+---+
| id|       name|age|
+---+-----------+---+
| 10|    Joe Doe| 34|
| 11|   Jane Doe| 31|
| 12|Alice Jones| 25|
+---+-----------+---+

scala> input.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)


scala> output.show()
+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
+---+-----------+---+----+----------+

scala> output.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- init: string (nullable = true)
 |-- ts: long (nullable = false)

I would like to append all the rows of input to the end of output. At the same time, I would like to set the output column of init to be an empty string '' and the ts column to be the current timestamp, e.g. 1461883875L.

Any help would be appreciated.

Upvotes: 14

Views: 116310

Answers (3)

Saikrishna Parshi
Saikrishna Parshi

Reputation: 1

How to copy duplicate rows or data from dataframes using spark sql

First create table of exixting data using spark scala

spark.sql("CREATE TABLE first USING DELTA LOCATION 'path of input file'")

Now Insert the data into the table in what ever way you want to modify the query spark.sql("insert into first select * from first limit 1") or with where condition

Now u will have duplicate data in table 'first'

Upvotes: 0

Fabian
Fabian

Reputation: 3408

I had a similar problem matching to your SQL-Question:

I wanted to append a dataframe to an existing hive table, which is also larger (more columns). To keep your example: output is my existing table and input could be the dataframe. My solution uses simply SQL and for the sake of completeness I want to provide it:

import org.apache.spark.sql.SaveMode

var input = spark.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

//--> just for a running example: In my case the table already exists
var output = spark.createDataFrame(Seq(
        (0L, "Jack Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")

output.write.mode(SaveMode.Overwrite).saveAsTable("appendTest");
//<--

input.createOrReplaceTempView("inputTable");

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, null, null FROM inputTable");
val df = spark.sql("SELECT * FROM appendTest")
df.show()

which outputs:

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|null|      null|
| 11|   Jane Doe| 31|null|      null|
| 10|    Joe Doe| 34|null|      null|
+---+-----------+---+----+----------+

If you may have the problem, that you don't know how much fields are missing, you could use a diff like

val missingFields = output.schema.toSet.diff(input.schema.toSet)

and then (in bad pseudo code)

val sqlQuery = "INSERT INTO TABLE appendTest SELECT " + commaSeparatedColumnNames + commaSeparatedNullsForEachMissingField + " FROM inputTable"

Hope to help people with future problems like that!

P.S.: In your special case (current timestamp + empty field for init) you could even use

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, '' as init, current_timestamp as ts FROM inputTable");

which results in

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|    |1521128513|
| 11|   Jane Doe| 31|    |1521128513|
| 10|    Joe Doe| 34|    |1521128513|
+---+-----------+---+----+----------+

Upvotes: 4

zero323
zero323

Reputation: 330063

Spark DataFrames are immutable so it is not possible to append / insert rows. Instead you can just add missing columns and use UNION ALL:

output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long")))

Upvotes: 28

Related Questions