SunflowerParty
SunflowerParty

Reputation: 61

Not able to add column to Databricks dataframe using withColumn but schema reflects the additional column

Trying to add an extra 2 columns into my databricks pyspark dataframe but it doesn't show up when I select * from the resulting table.

for file in file_list:
  
  try:
    sql_query = create_sql_statement(file)
    df = spark.sql(sql_query) \
    .withColumn('type', F.lit('animal_type')) \
    .withColumn('timestamp', F.current_timestamp())
    df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(f'{database}.{table}')

  except Exception as e:
    print(e)

Example of create_sql_statement: 'CREATE TABLE database.TABLE_NAME AS SELECT FIELD1, FIELD2, FIELD3, FIELD4, type, timestamp FROM DATABASE.TABLENAME'

When running the code above, the for loop successfully creates the table and I see the pyspark df result below but don't see the new columns implemented in my table.

num_affected_rows:long
num_inserted_rows:long
type:string
timestamp:timestamp

I see one of two results:

  1. When I select * from one of the tables, I see the result 'query returned no results', but if I select * from the source database.table_name the create_sql_statement is built from, there is definitely data there.
  2. Depending on the table, for some when I select * I see the correct output but without the added columns from the 'withColumn' clauses above.

Am I missing something on the syntax? This is a followup question from my previous question 'SQL Error mismatched input 'sql_query' expecting {EOF} when using Create Table in Pyspark' which was solved.

Upvotes: 0

Views: 796

Answers (1)

BeRT2me
BeRT2me

Reputation: 13251

A couple of points:

  • A CREATE TABLE... statement will not return the resulting table. So when you run spark.sql(sql_query) it returns nothing... which you then try to modify and have chosen to ignore the ensuing error.
  • You can't overwrite a table you're currently reading from. It's not clear if this is what you're trying to do or not.

What you likely mean to do is:

df = spark.sql(
    """
    SELECT FIELD1, FIELD2, FIELD3, FIELD4 
      FROM DATABASE.table_you_read_from
    """
)
df = (
    df.withColumn('type', F.lit('animal_type'))
      .withColumn('timestamp', F.current_timestamp())
)
(
    df.write
      .format("delta")
      .option("overwriteSchema", "true")
      .mode("overwrite")
      .saveAsTable('DATABASE.new_table')
)

Alternatively, in pure SQL this would be:

spark.sql("DROP TABLE IF EXISTS DATABASE.new_table")
spark.sql(
    """
    CREATE TABLE DATABASE.new_table AS
    SELECT FIELD1, FIELD2, FIELD3, FIELD4
         , 'animal_type' AS type
         , current_timestamp AS timestamp
      FROM DATABASE.table_you_read_from
    """
)

Upvotes: 1

Related Questions