Reputation: 308
I have one Hive table as following:
hive> describe stock_quote;
OK
tickerid string
tradeday string
tradetime string
openprice string
highprice string
lowprice string
closeprice string
volume string
Following code of Spark reads csv files and tries to insert records into Hive table:
sc = spark.sparkContext
lines = sc.textFile('file:///<File Location>')
rows = lines.map(lambda line : line.split(','))
rows_map = rows.map(lambda row : Row(TickerId = row[0], TradeDay = row[1], TradeTime = row[2], OpenPrice = row[3], HighPrice = row[4], LowPrice = row[5], ClosePrice = row[6], Volume = row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.mode('append').insertInto('default.stock_quote')
Problem I am facing is that when I call show() function on dataframe, it prints columns in alphabetical order like following
|ClosePrice|HighPrice|LowPrice|OpenPrice|TickerId|TradeDay|TradeTime|Volume|
, and in table, it inserts the value of ClosePrice(1st column in DF) in TickerId(1st column in Hive table) column, value of HighPrice in TradeDay column and so on.
Tried to call select() function on dataframe, didn't help. Tried to put list of column names as following:
rows_df = spark.createDataFrame(rows_map, ["TickerId", "TradeDay", "TradeTime", "OpenPrice", "HighPrice", "LowPrice", "ClosePrice", "Volume"])
Above changes column names order, but values remained at the same position, which is even more incorrect.
Any help would really be appreciated.
Upvotes: 2
Views: 2666
Reputation: 41987
You should go with namedtuple
instead of Row
because 'Row' tries to order the column names. Thus the ordered column names didn't match with the column order of default.stock_quote
table Please check What is the Scala case class equivalent in PySpark? for more details
So you should be doing
from collections import namedtuple
table = namedtuple('table', ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])
rows_map = rows.map(lambda row : table(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
And as @user6910411 suggested, "a normal tuple would do as well"
rows_map = rows.map(lambda row : (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
rows_df = spark.createDataFrame(rows_map, ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])
now the insertInto
should work
Upvotes: 2
Reputation: 10092
You can also use saveAsTable
instead of insertInto
From the docs:
Unlike
insertInto
,saveAsTable
will use the column names to find the correct column positions
Upvotes: 4
Reputation: 6385
How it's happened it was sorted in alphabetical order? Is that how it's in csv file?
Anyway, I'd do it in following steps:
# pyspark below
list_columns = spark.sql('select * from table').columns # there might be simpler way
dataframe.select(*list_columns)
Upvotes: 1