Nathaniel
Nathaniel

Reputation: 540

How do I get a PySpark DataFrame made using HiveContext in Spark 1.5.2?

Update: It seems my errors are probably because of how I installed Spark and/or Hive. Working with window functions seems pretty straightforward in a Databricks (hosted) notebook. I need to figure out how to set this up locally.

I have a Spark DataFrame that I need to use a Window function on.* I tried following the instructions over here, but I ran into some problems.

Setting up my environment:

import os
import sys
import datetime as dt

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')

import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict

Setting up my data:

test_ts = {'adminDistrict': None,
 'city': None,
 'country': {'code': 'NA', 'name': 'UNKNOWN'},
 'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
  {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
  {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
  {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
  {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
  {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
  {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
  {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
  {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
  {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
  {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
  {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
  {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
  {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
  {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
  {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
  {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
  {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
  {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
  {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
  {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
  {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
  {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
  {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
  {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
  {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
 'maxDate': '2015-12-28T00:00:00Z',
 'minDate': '2005-08-25T00:00:00Z',
 'name': 'S&P GSCI Crude Oil Spot',
 'offset': 0,
 'resolution': 'DAY',
 'sources': ['trf'],
 'subtype': 'Index',
 'type': 'Commodities',
 'uid': 'TRF_INDEX_Z39824_PI'}

A function to turn that json into a DataFrame:

def ts_to_df(ts):
    data = []
    for line in ts['data']:
        data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])

Getting a dataframe and taking a look at what's inside:

test_df = ts_to_df(test_ts)
test_df.show()

That shows me this:

+----------+----------------------+
|      Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25|                369.89|
|2005-08-26|                362.44|
|2005-08-29|                 368.3|
|2005-08-30|                 382.6|
|2005-08-31|                377.84|
|2005-09-01|                380.74|
|2005-09-02|                370.33|
|2005-09-05|                370.33|
|2005-09-06|                 361.5|
|2005-09-07|                352.79|
|2005-09-08|                 354.3|
|2005-09-09|                 353.0|
|2005-09-12|                349.35|
|2005-09-13|                348.82|
|2005-09-14|                360.24|
|2005-09-15|                357.61|
|2005-09-16|                347.14|
|2005-09-19|                 370.0|
|2005-09-20|                362.82|
|2005-09-21|                366.11|
+----------+----------------------+

And here is where I have no idea what I'm doing and everything starts to go wrong:

from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()

That gives me this error:

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

So it looks like I need a HiveContext, right? Do I need to create my DataFrame using a HiveContext? Then let me try to create a DataFrame explicitly using HiveContext:

def ts_to_hive_df(ts):
    data = []
    for line in ts['data']:
        data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
                 ts['name'].replace('&', '').replace(' ', '_'):line['value']})
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
    return hiveContext.createDataFrame(temp_rdd)

test_df = ts_to_hive_df(test_ts)
test_df.show()

But that gives me this error:

TypeError: 'JavaPackage' object is not callable

So how do I use Window functions? Do I need to create the DataFrames using a HiveContext? If so, then how do I do that? Can someone tell me what I'm doing wrong?

*I need to know if there are gaps in my data. I have the column 'Date' and for each row, ordered by Date, I want to know what's on the next row, and if I have missing days or bad data, then I want to use the last day's data on that row. If you know of a better way of doing that, let me know. But I still would like to know how to get these Window functions working.

Upvotes: 6

Views: 3603

Answers (1)

data_steve
data_steve

Reputation: 1648

This is an older question and thus moot since you've probably moved onto new versions of Spark. I'm running spark 2.0 myself, so this may be cheating.

But fwiw: 2 possible issues. In the first example, I think the .toDF() maybe defaulting to SQLContext since you had both called. In the second, when you refactored, could it be that you are calling the hivecontext inside the function?

If I refactor your second ts_to_df function to have hivecontext called outside the function, everything is fine.

def ts_to_df(ts):
    data = []
    for line in ts['data']:
        data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
                 ts['name'].replace('&', '').replace(' ', '_'):line['value']})
    return data

data = ts_to_df(test_ts)
test_rdd = sc.parallelize(data).map(lambda x: Row(**x))
test_df = hiveContext.createDataFrame(test_rdd)

from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()

I get the output

+----------+
| Next_Date|
+----------+
|2005-08-26|
|2005-08-29|
|2005-08-30|
|2005-08-31|
|2005-09-01|
|2005-09-02|
.....

Upvotes: 1

Related Questions