Soumen Ghosh
Soumen Ghosh

Reputation: 308

pyspark dataframe column : Hive column

I have one Hive table as following:

hive> describe stock_quote;
OK
tickerid                string                                      
tradeday                string                                      
tradetime               string                                      
openprice               string                                      
highprice               string                                      
lowprice                string                                      
closeprice              string                                      
volume                  string

Following code of Spark reads csv files and tries to insert records into Hive table:

sc = spark.sparkContext
lines = sc.textFile('file:///<File Location>')
rows = lines.map(lambda line : line.split(','))
rows_map = rows.map(lambda row : Row(TickerId = row[0], TradeDay = row[1], TradeTime = row[2], OpenPrice = row[3], HighPrice = row[4], LowPrice = row[5], ClosePrice = row[6], Volume = row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.mode('append').insertInto('default.stock_quote')

Problem I am facing is that when I call show() function on dataframe, it prints columns in alphabetical order like following

|ClosePrice|HighPrice|LowPrice|OpenPrice|TickerId|TradeDay|TradeTime|Volume|

, and in table, it inserts the value of ClosePrice(1st column in DF) in TickerId(1st column in Hive table) column, value of HighPrice in TradeDay column and so on.

Tried to call select() function on dataframe, didn't help. Tried to put list of column names as following:

rows_df = spark.createDataFrame(rows_map, ["TickerId", "TradeDay", "TradeTime", "OpenPrice", "HighPrice", "LowPrice", "ClosePrice", "Volume"])

Above changes column names order, but values remained at the same position, which is even more incorrect.

Any help would really be appreciated.

Upvotes: 2

Views: 2666

Answers (3)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

You should go with namedtuple instead of Row because 'Row' tries to order the column names. Thus the ordered column names didn't match with the column order of default.stock_quote table Please check What is the Scala case class equivalent in PySpark? for more details

So you should be doing

from collections import namedtuple

table = namedtuple('table', ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])
rows_map = rows.map(lambda row : table(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))

And as @user6910411 suggested, "a normal tuple would do as well"

rows_map = rows.map(lambda row : (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
rows_df = spark.createDataFrame(rows_map, ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])

now the insertInto should work

Upvotes: 2

philantrovert
philantrovert

Reputation: 10092

You can also use saveAsTable instead of insertInto

From the docs:

Unlike insertInto, saveAsTable will use the column names to find the correct column positions

Upvotes: 4

vvg
vvg

Reputation: 6385

How it's happened it was sorted in alphabetical order? Is that how it's in csv file?

Anyway, I'd do it in following steps:

  • select columns from your table
  • rearrange dataframe based on columns from table
# pyspark below
list_columns = spark.sql('select * from table').columns # there might be simpler way
dataframe.select(*list_columns)

Upvotes: 1

Related Questions