Reputation: 27
I'm trying to read the data from eventhub but the result it's only returning null values.
I converted a dataframe to json to send to the eventhub
dfSource_pandas = df.toPandas()
type(dfSource_pandas)
payload = dfSource_pandas.to_json(orient="records")
print(payload)
and this is the schema of the data
root
|-- alertId: string (nullable = true)
|-- score: double (nullable = true)
|-- severity: double (nullable = true)
I'm trying to read data from eventhub using
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType, StructType, StructField,
StringType, LongType, BooleanType
schema = StructType([
StructField("alertId", StringType(), True),
StructField("score", DoubleType(), True),
StructField("severity", DoubleType(), True),
])
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import *
df = spark.readStream.format("eventhubs").options(**ehConf).load()
stream_data_body=
df.select(from_json(df.body.cast('string'),schema).\
alias('body')).select("body.*")
then the I'm getting null values with the correct schema like this
Upvotes: 1
Views: 450
Reputation: 87204
It looks like that you have a list of records in your payload, not individual records. In this case, you can try following schema instead:
schema = StructType([
StructField("alerts", ArrayType(
StructType([
StructField("alertId", StringType(), True),
StructField("score", DoubleType(), True),
StructField("severity", DoubleType(), True),
])))])
And then to get individual records, you need to explode your payload - something like this:
from pyspark.sql.functions import from_json, explode
stream_data = df.select(from_json(df.body.cast('string'),schema).alias('body')) \
.select("body.*") \
.select(explode("alerts").alias("alert")) \
.select("alert.*")
Upvotes: 1