Reputation: 553
I've built a small data pipeline that moves some fictional test data from a local directory (json-format) to hdfs (avro format). This seemed to have worked correctly (no errors shown by flume), but it might be the case that the error lays already here. The next step was to transform an avro file into some pyspark dataframe using the databricks loader (only python library I could found for this). Now let me explain how I did this, so you can see where I might failed:
My goal is to push json data from a local directory to HDFS, so I can analyse it with pySpark. For this I'm using flume. Since json has bad compression on HDFS, I'm also converting everyfile to avro by using the following flume.conf:
agent.sources.tail.type = exec
agent.sources.tail.shell = /bin/bash -c
agent.sources.tail.command = cat /home/user/Data/json/*
agent.sources.tail.batchsize = 10
agent.sources.tail.channels = MemChannel
agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 100
agent.channels.MemChannel.transactionCapacity = 100
agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.fileSuffix=.avro
agent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/home/user/Data/hdfs/test_data
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.serializer=avro_event
agent.sinks.HDFS.serializer.compressionCodec=snappy
This did run without any error, so I assume flume moved every file as an correct avro file to HDFS.
Now comes the part where I'm trying to read a single avro file as a dataframe within pyspark:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
# creates a dataframe by reading a single avro file
df = sqlContext.read.format("com.databricks.spark.avro").load("hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535723039267.avro")
This shows me the following (wrong) output:
df.show()
+-------+--------------------+
|headers| body|
+-------+--------------------+
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
| []|[7B 22 63 61 74 6...|
+-------+--------------------+
only showing top 5 rows
This is obviously not what I want, since the entire code above just seems to read the avro file like a plain text file and hence there's no parsed structure. Before, I was just creating a dataframe that used the same data, but stored within the original json file.
# creates a dataframe by reading a single json file
df = sqlContext.read.json('hdfs://localhost:9000/home/user/Data/hdfs/test_data/FlumeData.1535702513118.json')
So this is how the desired (correct) output should look like:
df.show()
+---------------+--------------------+---+-------------------+-----------------+
| category| content| id| timestamp| user|
+---------------+--------------------+---+-------------------+-----------------+
| A|Million near orde...|801|2018-08-30_16:49:53| Molly Davis|
| D|Determine company...|802|2018-08-30_16:49:53| Ronnie Liu|
| B|Among themselves ...|803|2018-08-30_16:49:53| Lori Brown|
| C|Through various d...|804|2018-08-30_16:49:53| Judith Herrera|
| C|Week toward so co...|805|2018-08-30_16:49:53|Teresa Cunningham|
+---------------+--------------------+---+-------------------+-----------------+
only showing top 5 rows
How can I achieve the same result for my converted avro file?
Upvotes: 1
Views: 5351
Reputation: 133
For spark<= 2.4, it is fully supported.
For spark >= 2.4, even spark3 or above:
spark = SparkSession.builder.master('local[*]')\
.appName('sample')\
.config("spark.jars","YOUR_JAR_PATH/spark-avro_2.12-3.2.1.jar")\
.getOrCreate()
and read your avro data
sample_df = spark.read.format("avro").load("YOUR_AVRO_DATA_PATH")
In fact, add third-party java dependency can always use this method.
Upvotes: 0
Reputation: 1
df=spark.read.format("avro").load("<file-path>")
json_df=df.select(df0.Body.cast("string")).rdd.map(lambda x: x[0]) # parsing body of avro file and converting to json
data=spark.read.json(json_df)
data.show()
Upvotes: 0