Reputation: 109
I am trying to read Avro messages from Kafka, using PySpark 2.4.3. Based on the below stack over flow link , Am able to covert into Avro format (to_avro) and code is working as expected. but from_avro is not working and getting below issue.Are there any other modules that support reading avro messages streamed from Kafka? This is Cloudra distribution environment. Please suggest on this .
Reference : Pyspark 2.4.0, read avro from kafka with read stream - Python
Environment Details :
Spark :
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.1.2.6.1.0-129
/_/
Using Python version 3.6.1 (default, Jul 24 2019 04:52:09)
Pyspark :
pyspark 2.4.3
Spark_submit :
/usr/hdp/2.6.1.0-129/spark2/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.3 --conf spark.ui.port=4064
to_avro
from pyspark.sql.column import Column, _to_java_column
def from_avro(col, jsonFormatSchema):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))
def to_avro(col):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
return Column(f(_to_java_column(col)))
from pyspark.sql.functions import col, struct
avro_type_struct = """
{
"type": "record",
"name": "struct",
"fields": [
{"name": "col1", "type": "long"},
{"name": "col2", "type": "string"}
]
}"""
df = spark.range(10).select(struct(
col("id"),
col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)
+----------+
| avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows
from_avro:
avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)
Error Message :
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.6.1.0-129/spark2/python/pyspark/sql/dataframe.py", line 993, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/2.6.1.0-129/spark2/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o61.select.
: java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:66)
at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
Upvotes: 1
Views: 2055
Reputation: 1119
Spark 2.4.0 supports to_avro and from_avro functions but only for Scala and Java. Then your approach should be fine as long as using appropriate spark version
and spark-avro
package.
There is an alternative way that I prefer during using Spark Structure Streaming to consume Kafka message is to use UDF with fastavro
python library. fastavro
is relative fast as it used C extension. I had been used it for our production for months without any issues.
As denoted in below code snippet, main Kafka message is carried in values
column of kafka_df
. For a demonstration purpose, I use a simple avro schema with 2 columns col1
& col2
. The return of deserialize_avro
UDF function is a tuple respective to number of fields described within avro schema. Then write the stream out to console for debugging purpose.
from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
from pyspark.sql.types import *
import io
import fastavro
def deserialize_avro(serialized_msg):
bytes_io = io.BytesIO(serialized_msg)
bytes_io.seek(0)
avro_schema = {
"type": "record",
"name": "struct",
"fields": [
{"name": "col1", "type": "long"},
{"name": "col2", "type": "string"}
]
}
deserialized_msg = fastavro.schemaless_reader(bytes_io, avro_schema)
return ( deserialized_msg["col1"],
deserialized_msg["col2"]
)
if __name__=="__main__":
spark = SparkSession \
.builder \
.appName("consume kafka message") \
.getOrCreate()
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka01-broker:9092") \
.option("subscribe", "topic_name") \
.option("stopGracefullyOnShutdown", "true") \
.load()
df_schema = StructType([
StructField("col1", LongType(), True),
StructField("col2", StringType(), True)
])
avro_deserialize_udf = psf.udf(deserialize_avro, returnType=df_schema)
parsed_df = kafka_df.withColumn("avro", avro_deserialize_udf(psf.col("value"))).select("avro.*")
query = parsed_df.writeStream.format("console").option("truncate", "true").start()
query.awaitTermination()
Upvotes: 2
Reputation: 191671
Your Spark version is actually 2.1.1, therefore you cannot use a 2.4.3 version of the spark-avro package included in Spark
You'll need to use the one from Databricks
Are there any other modules that support reading avro messages streamed from Kafka?
You could use a plain kafka Python library, not Spark
Upvotes: 1