Misaki Gome
Misaki Gome

Reputation: 27

no outputs from eventhub

I'm trying to read the data from eventhub but the result it's only returning null values.

I converted a dataframe to json to send to the eventhub

dfSource_pandas = df.toPandas()
type(dfSource_pandas)
payload = dfSource_pandas.to_json(orient="records")
print(payload)

and this is the schema of the data

root
|-- alertId: string (nullable = true)
|-- score: double (nullable = true)
|-- severity: double (nullable = true)

I'm trying to read data from eventhub using

from pyspark.sql import functions as F

from pyspark.sql.types import ArrayType, DoubleType, StructType, StructField,
StringType, LongType, BooleanType

schema = StructType([
StructField("alertId", StringType(), True),
StructField("score", DoubleType(), True),
StructField("severity", DoubleType(), True),
])


from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import *
df = spark.readStream.format("eventhubs").options(**ehConf).load()
stream_data_body=
df.select(from_json(df.body.cast('string'),schema).\
alias('body')).select("body.*")

then the I'm getting null values with the correct schema like this enter image description here

Upvotes: 1

Views: 450

Answers (1)

Alex Ott
Alex Ott

Reputation: 87204

It looks like that you have a list of records in your payload, not individual records. In this case, you can try following schema instead:

schema = StructType([
  StructField("alerts", ArrayType(
    StructType([
      StructField("alertId", StringType(), True),
      StructField("score", DoubleType(), True),
      StructField("severity", DoubleType(), True),
    ])))])

And then to get individual records, you need to explode your payload - something like this:

from pyspark.sql.functions import from_json, explode

stream_data = df.select(from_json(df.body.cast('string'),schema).alias('body')) \
  .select("body.*") \
  .select(explode("alerts").alias("alert")) \
  .select("alert.*")

Upvotes: 1

Related Questions