STORM
STORM

Reputation: 4341

How to convert a JSON result to Parquet?

I have the following code which grabs some data from the Marketo system

from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['[email protected]'], 
                  fields=['BG__c','email','company','createdAt'], batchSize=None)

This returns me the following data

[{'BG__c': 'ABC',
  'company': 'MCS',
  'createdAt': '2016-10-25T14:04:15Z',
  'id': 4,
  'email': '[email protected]'},
 {'BG__c': 'CDE',
  'company': 'MSC',
  'createdAt': '2018-03-28T16:41:06Z',
  'id': 10850879,
  'email': '[email protected]'}]

What i want to do is, to save this returned to a Parquet file. But when i try this with the following code, i receive an error message.

from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
data = mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['[email protected]'], 
                  fields=['BG__c','email','company','createdAt'], batchSize=None)

sqlContext.read.json(data)
data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")

java.lang.ClassCastException: java.util.HashMap cannot be cast to java.lang.String
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-1431708582476650> in <module>()
      7                       fields=['BG__c','email','company','createdAt'], batchSize=None)
      8 
----> 9 sqlContext.read.json(data)
     10 data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, charset)
    261             path = [path]
    262         if type(path) == list:
--> 263             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    264         elif isinstance(path, RDD):
    265             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 

What am i doing wrong?

Upvotes: 4

Views: 12604

Answers (2)

plalanne
plalanne

Reputation: 1030

You have the following data

data = [{'BG__c': 'ABC',
       'company': 'MCS',
       'createdAt': '2016-10-25T14:04:15Z',
       'id': 4,
       'email': '[email protected]'},
       {'BG__c': 'CDE',
       'company': 'MSC',
       'createdAt': '2018-03-28T16:41:06Z',
       'id': 10850879,
       'email': '[email protected]'}]

In order to save it to a parquet file, I would suggest creating a DataFrame to then save it as a parquet.

from pyspark.sql.types import *

df = spark.createDataFrame(data,
                           schema = StructType([
                                    StructField("BC_g", StringType(), True),
                                    StructField("company", StringType(), True),
                                    StructField("createdAt", StringType(), True),
                                    StructField("email", StringType(), True),
                                    StructField("id", IntegerType(), True)]))

This would give the following types :

df.dtypes

[('BC_g', 'string'),
 ('company', 'string'),
 ('createdAt', 'string'),
 ('email', 'string'),
 ('id', 'int')]

You can then save the dataframe as a parquet file

df.show()
+-----+-------+--------------------+----------------+--------+
|BG__c|company|           createdAt|           email|      id|
+-----+-------+--------------------+----------------+--------+
|  ABC|    MCS|2016-10-25T14:04:15Z|[email protected]|       4|
|  CDE|    MSC|2018-03-28T16:41:06Z|[email protected]|10850879|
+-----+-------+--------------------+----------------+--------+

df.write.format('parquet').save(parquet_path_in_hdfs)

Where parquet_path_in_hdfs is the path and name of the desired parquet file

Upvotes: 2

Rishi Saraf
Rishi Saraf

Reputation: 1812

As per below statement in your code you are directly writing data. You have to first create dataframe. You can convert json to df using val df = sqlContext.read.json("path/to/json/file").Then do df.write

data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")

Upvotes: 0

Related Questions