Pyspark 2.4.3, Read Avro format message from Kafka - Pyspark Structured streaming

I am trying to read Avro messages from Kafka, using PySpark 2.4.3. Based on the below stack over flow link , Am able to covert into Avro format (to_avro) and code is working as expected. but from_avro is not working and getting below issue.Are there any other modules that support reading avro messages streamed from Kafka? This is Cloudra distribution environment. Please suggest on this .

Reference : Pyspark 2.4.0, read avro from kafka with read stream - Python

Environment Details :

Spark :

 / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1.2.6.1.0-129
      /_/

Using Python version 3.6.1 (default, Jul 24 2019 04:52:09)

Pyspark :

pyspark 2.4.3

Spark_submit :

/usr/hdp/2.6.1.0-129/spark2/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.3 --conf spark.ui.port=4064

to_avro

from pyspark.sql.column import Column, _to_java_column 

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 
from pyspark.sql.functions import col, struct


avro_type_struct = """
{
  "type": "record",
  "name": "struct",
  "fields": [
    {"name": "col1", "type": "long"},
    {"name": "col2", "type": "string"}
  ]
}"""


df = spark.range(10).select(struct(
    col("id"),
    col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)
+----------+
|      avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows

from_avro:

avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)

Error Message :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/2.6.1.0-129/spark2/python/pyspark/sql/dataframe.py", line 993, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/hdp/2.6.1.0-129/spark2/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o61.select.
: java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
        at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:66)
        at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)

Upvotes: 1

Views: 2055

Answers (2)

vanducng
vanducng

Reputation: 1119

Spark 2.4.0 supports to_avro and from_avro functions but only for Scala and Java. Then your approach should be fine as long as using appropriate spark version and spark-avro package.

There is an alternative way that I prefer during using Spark Structure Streaming to consume Kafka message is to use UDF with fastavro python library. fastavro is relative fast as it used C extension. I had been used it for our production for months without any issues.

As denoted in below code snippet, main Kafka message is carried in values column of kafka_df. For a demonstration purpose, I use a simple avro schema with 2 columns col1 & col2. The return of deserialize_avro UDF function is a tuple respective to number of fields described within avro schema. Then write the stream out to console for debugging purpose.

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
from pyspark.sql.types import *
import io
import fastavro

def deserialize_avro(serialized_msg):
    bytes_io = io.BytesIO(serialized_msg)
    bytes_io.seek(0)
    avro_schema = {
                    "type": "record",
                    "name": "struct",
                    "fields": [
                      {"name": "col1", "type": "long"},
                      {"name": "col2", "type": "string"}
                    ]
                  }

    deserialized_msg = fastavro.schemaless_reader(bytes_io, avro_schema)

    return (    deserialized_msg["col1"],
                deserialized_msg["col2"]
            )

if __name__=="__main__":
  spark = SparkSession \
        .builder \
        .appName("consume kafka message") \
        .getOrCreate()

  kafka_df = spark \
              .readStream \
              .format("kafka") \
              .option("kafka.bootstrap.servers", "kafka01-broker:9092") \
              .option("subscribe", "topic_name") \
              .option("stopGracefullyOnShutdown", "true") \
              .load()

  df_schema = StructType([
              StructField("col1", LongType(), True),
              StructField("col2", StringType(), True)
          ])

  avro_deserialize_udf = psf.udf(deserialize_avro, returnType=df_schema)
  parsed_df = kafka_df.withColumn("avro", avro_deserialize_udf(psf.col("value"))).select("avro.*")

  query = parsed_df.writeStream.format("console").option("truncate", "true").start()
  query.awaitTermination()

Upvotes: 2

OneCricketeer
OneCricketeer

Reputation: 191671

Your Spark version is actually 2.1.1, therefore you cannot use a 2.4.3 version of the spark-avro package included in Spark

You'll need to use the one from Databricks

Are there any other modules that support reading avro messages streamed from Kafka?

You could use a plain kafka Python library, not Spark

Upvotes: 1

Related Questions