Mayak
Mayak

Reputation: 553

Loading avro files into pyspark dataframes from hdfs

I've built a small data pipeline that moves some fictional test data from a local directory (json-format) to hdfs (avro format). This seemed to have worked correctly (no errors shown by flume), but it might be the case that the error lays already here. The next step was to transform an avro file into some pyspark dataframe using the databricks loader (only python library I could found for this). Now let me explain how I did this, so you can see where I might failed:

1 ) Creating avro files out of a json file by using flume

My goal is to push json data from a local directory to HDFS, so I can analyse it with pySpark. For this I'm using flume. Since json has bad compression on HDFS, I'm also converting everyfile to avro by using the following flume.conf:

agent.sources.tail.type = exec
agent.sources.tail.shell = /bin/bash -c
agent.sources.tail.command = cat /home/user/Data/json/*
agent.sources.tail.batchsize = 10
agent.sources.tail.channels = MemChannel

agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 100
agent.channels.MemChannel.transactionCapacity = 100


agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.fileSuffix=.avro
agent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/home/user/Data/hdfs/test_data
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.serializer=avro_event
agent.sinks.HDFS.serializer.compressionCodec=snappy

This did run without any error, so I assume flume moved every file as an correct avro file to HDFS.

2) Creating a dataframe by loading an avro file

Now comes the part where I'm trying to read a single avro file as a dataframe within pyspark:

from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec","snappy") 

# creates a dataframe by reading a single avro file 
df = sqlContext.read.format("com.databricks.spark.avro").load("hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535723039267.avro")

This shows me the following (wrong) output:

df.show()
+-------+--------------------+
|headers|                body|
+-------+--------------------+
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
|     []|[7B 22 63 61 74 6...|
+-------+--------------------+
only showing top 5 rows

This is obviously not what I want, since the entire code above just seems to read the avro file like a plain text file and hence there's no parsed structure. Before, I was just creating a dataframe that used the same data, but stored within the original json file.

# creates a dataframe by reading a single json file
df = sqlContext.read.json('hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535702513118.json')

So this is how the desired (correct) output should look like:

df.show()
+---------------+--------------------+---+-------------------+-----------------+
|       category|             content| id|          timestamp|             user|
+---------------+--------------------+---+-------------------+-----------------+
|              A|Million near orde...|801|2018-08-30_16:49:53|      Molly Davis|
|              D|Determine company...|802|2018-08-30_16:49:53|       Ronnie Liu|
|              B|Among themselves ...|803|2018-08-30_16:49:53|       Lori Brown|
|              C|Through various d...|804|2018-08-30_16:49:53|   Judith Herrera|
|              C|Week toward so co...|805|2018-08-30_16:49:53|Teresa Cunningham|
+---------------+--------------------+---+-------------------+-----------------+
only showing top 5 rows

How can I achieve the same result for my converted avro file?

Upvotes: 1

Views: 5351

Answers (2)

Ray
Ray

Reputation: 133

For spark<= 2.4, it is fully supported.
For spark >= 2.4, even spark3 or above:

  1. Download the dependency spark-avro, you can find java dependency on maven. Please find currect version of your spark, and scala. Dismatch version will lead to fail.
  2. In Spark3, use this method to create spark session and add your dependency.
spark = SparkSession.builder.master('local[*]')\
                           .appName('sample')\
                           .config("spark.jars","YOUR_JAR_PATH/spark-avro_2.12-3.2.1.jar")\
                           .getOrCreate()

and read your avro data

sample_df = spark.read.format("avro").load("YOUR_AVRO_DATA_PATH")

In fact, add third-party java dependency can always use this method.

Upvotes: 0

Mohammed Sohail
Mohammed Sohail

Reputation: 1

df=spark.read.format("avro").load("<file-path>")
json_df=df.select(df0.Body.cast("string")).rdd.map(lambda x: x[0])   # parsing body of avro file and converting to json
data=spark.read.json(json_df)
data.show()

Upvotes: 0

Related Questions